1. 为什么要使用 RabbitMQ
🤔
异步通信:允许系统中的不同部分通过消息传递进行通信,而无需实时直接交互。这种异步通信可以提高系统的可伸缩性和灵活性。
解耦系统组件:消息代理系统可以在系统内部或跨系统之间充当中介,从而实现系统组件的解耦。这意味着每个组件可以独立地工作,而无需直接了解其他组件的实现细节。
消息队列:RabbitMQ 等消息代理系统利用消息队列的方式来存储和传递消息。这可以帮助处理系统中的大量请求或数据,确保消息在需要时得到处理,而不会因为某个组件繁忙而丢失。
异步任务处理:允许将耗时的任务从主应用程序中分离出来并异步处理。通过将这些任务发送到队列中,可以提高应用程序的性能和响应性。
消息持久化:消息代理系统通常支持消息持久化,即使在代理或消费者宕机后,消息也不会丢失。这有助于确保数据的安全性和可靠性。
负载均衡:使用消息队列可以实现负载均衡,多个消费者可以同时从队列中获取消息并处理,提高系统的吞吐量和效率。
处理失败和重试机制:消息代理系统通常具备处理失败消息和重试机制的能力,可以有效处理由于错误而导致的消息处理失败的情况。
2. 关于使用
简单封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| <?php
namespace App\Logic;
use PhpAmqpLib\Channel\AbstractChannel; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage;
class RabbitMqClient { private AMQPStreamConnection $conn; private AbstractChannel|AMQPChannel $chan; private string $queueName;
public function __construct() { $this->conn = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin'); $this->chan = $this->conn->channel(); }
public function setQueue(string $queueName): self { $this->chan->queue_declare($this->queueName = $queueName, auto_delete: false); return $this; }
public function send(mixed $data): void { $this->chan->basic_publish(new AMQPMessage(json_encode($data, JSON_UNESCAPED_UNICODE)), '', $this->queueName); }
public function recv(callable $callback): void { $this->chan->basic_consume($this->queueName, '', false, false, false, false, $callback); while (count($this->chan->callbacks)) { $this->chan->wait(); } }
public function getQueueName(): string { return $this->queueName; }
public function getChan(): AbstractChannel|AMQPChannel { return $this->chan; }
public function close (): void { $this->conn->close(); $this->chan->close(); } }
|
发送端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| <?php
namespace App\Console\Commands\RabbitMQ;
use App\Logic\RabbitMqClient; use Illuminate\Console\Command;
class RabbitMqSend extends Command {
protected $signature = 'rabbit-mq:send';
protected $description = 'Command description';
public function handle(): void { $mq = new RabbitMqClient();
$mq->setQueue('test'); $mq->send(['a' => 1]); } }
|
接收端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| <?php
namespace App\Console\Commands\RabbitMQ;
use App\Logic\RabbitMqClient; use Illuminate\Console\Command; use PhpAmqpLib\Message\AMQPMessage;
class RabbitMqRecv extends Command {
protected $signature = 'rabbit-mq:recv';
protected $description = 'Command description';
public function handle(): void { $mq = new RabbitMqClient();
$mq->setQueue('test');
while (true) { $mq->recv(function (AMQPMessage $data) use ($mq) { $this->queueHandler($mq, json_decode($data->getBody()), true); $mq->getChan()->basic_ack($data->getDeliveryTag()); }); } }
private function queueHandler(RabbitMqClient $mq, mixed $data): void { dump($data); }
}
|
3. 函数讲解
基础依赖
1
| composer require php-amqplib/php-amqplib
|
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| use App\Logic\RabbitMqClient;
$rabbitMqClient = new RabbitMqClient();
$rabbitMqClient->setQueue('your_queue_name');
$rabbitMqClient->send(['message' => 'Hello, RabbitMQ!']);
$rabbitMqClient->recv(function ($msg) { echo "Received: ", $msg->body, "\n"; });
$rabbitMqClient->close();
|
方法
__construct()
初始化 RabbitMqClient 类并建立与 RabbitMQ 代理的连接。
setQueue(string $queueName): self
设置要发送/接收消息的队列名称。
send(mixed $data): void
向队列发送消息。
recv(callable $callback): void
从队列接收消息。使用回调函数处理接收到的消息。
getQueueName(): string
获取当前队列的名称。
getChan(): AbstractChannel|AMQPChannel
获取当前的 AMQP 通道。
close(): void
关闭与 RabbitMQ 的连接和通道。
注意事项
- 请确保在使用
send()
或 recv()
方法之后调用 close()
方法以关闭连接,避免资源泄露。