阅读量:3
在PHP中,可以使用多线程来开启多个进程,以实现同时处理多个任务。以下是一个使用php-amqplib库和多线程的示例代码:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; use PhpAmqpLib\Wire\AMQPTableFlags; // AMQP连接参数 $host = 'localhost'; $port = 5672; $user = 'guest'; $password = 'guest'; $vhost = '/'; $exchange = 'my_exchange'; $routingKey = 'my_routing_key'; $queue = 'my_queue'; // 创建AMQP连接 $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost); $channel = $connection->channel(); // 声明交换机和队列 $channel->exchange_declare($exchange, 'direct', false, true); // 设置队列参数,开启多个消费者进程 $args = new AMQPTable(); $args->set('x-max-priority', 10); // 设置队列最大优先级为10 $args->set('x-max-length', 1000); // 设置队列最大长度为1000 $args->set('x-overflow', 'drop-head'); // 队列溢出策略为删除头部消息 // 声明队列 $channel->queue_declare($queue, false, true, false, false, false, $args); // 将队列绑定到交换机 $channel->queue_bind($queue, $exchange, $routingKey); // 设置消费者回调函数 $callback = function (AMQPMessage $message) { // 处理消息 echo 'Received message: ' . $message->getBody() . PHP_EOL; $message->ack(); }; // 开启多个消费者进程 $processes = 5; for ($i = 0; $i < $processes; $i++) { $pid = pcntl_fork(); if ($pid == -1) { die('Could not fork'); } elseif ($pid) { // 父进程,继续创建下一个子进程 continue; } else { // 子进程,创建新的连接和通道 $connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost); $channel = $connection->channel(); // 设置消费者回调函数 $channel->basic_consume($queue, '', false, false, false, false, $callback); // 循环接收消息 while (count($channel->callbacks)) { $channel->wait(); } // 关闭连接和通道 $channel->close(); $connection->close(); // 子进程退出 exit(0); } } // 等待子进程退出 while (pcntl_waitpid(0, $status) != -1) { $status = pcntl_wexitstatus($status); echo "Child process $status completed" . PHP_EOL; } // 关闭连接和通道 $channel->close(); $connection->close();
以上代码使用pcntl_fork()
函数创建了多个子进程,每个子进程都拥有自己的AMQP连接和通道,并通过设置不同的消费者回调函数来处理消息。请根据实际需求修改代码中的连接参数和回调函数。
注意:使用多线程时,需要将代码保存为独立的PHP文件,并通过命令行来运行,如php filename.php
。