您当前的位置:学无止境 > rabbitmq延迟插件的安装和使用网站首页学无止境
rabbitmq延迟插件的安装和使用
发布时间:2024-07-16 14:36:22编辑:三青查看次数:259
注意:默认根目录已经安装composer下载安装rabbitmq(php-amqplib/php-amqplib)扩展
rabbitmq的插件地址:https://www.rabbitmq.com/community-plugins
找到延迟插件rabbitmq_delayed_message_exchange,去github下载
找到对应的版本(tag):https://github.com/rabbitmq/rabbitmq-rtopic-exchange/tags
因为三青下载的rabbitmq版本是3.8,所以rabbitmq_delayed_message_exchange下载的也是3.8版本:https://github.com/rabbitmq/rabbitmq-rtopic-exchange/releases/download/v3.8.0/rabbitmq_rtopic_exchange-3.8.0.ez
切换到插件目录,上传插件,根据前面的安装,三青的插件目录如下
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.19/plugins
启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看插件是否启动成功(对应的插件有E*,说明启动成功)
rabbitmq-plugins list
重启srabbitmq
systemctl restart rabbitmq-server
生产者delay_pub.php
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; use PhpAmqpLibWireAMQPTable; $dbName = 'sanqing'; $dbPwd = '111111'; $tableName = 'order'; $connection = new AMQPStreamConnection('localhost', 5672, $dbName, $dbPwd, $tableName); $channel = $connection->channel(); $exc_name = 'delay_exc_pay'; $routing_key = 'delay_route_pay'; $queue_name = 'delay_queue_pay'; $ttl = 20000; $channel->exchange_declare($exc_name,'x-delayed-message',false,true,false); $args = new AMQPTable(['x-delayed-type'=>'direct']); $channel->queue_declare($queue_name,false,true,false,false,false,$args); $channel->queue_bind($queue_name,$exc_name,$routing_key); $data = 'this is '.$routing_key.' delay message'; $arr = ['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT,'application_headers'=>new AMQPTable(['x-delay'=>$ttl])]; $msg = new AMQPMessage($data,$arr); $channel->basic_publish($msg,$exc_name,$routing_key); $channel->close(); $connection->close();
消费者delay_work.php
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; $dbName = 'sanqing'; $dbPwd = '111111'; $tableName = 'order'; $connection = new AMQPStreamConnection('localhost', 5672, $dbName, $dbPwd, $tableName); $channel = $connection->channel(); $exc_name = 'delay_exc_pay'; $routing_key = 'delay_route_pay'; $queue_name = 'delay_queue_pay'; $channel->exchange_declare($exc_name,'x-delayed-message',false,true,false); $channel->queue_bind($queue_name,$exc_name,$routing_key); $callback = function($msg){ echo 'received '.$msg->body."n"; $msg->ack(); }; $channel->basic_qos(null,1,null); $channel->basic_consume($queue_name,'',false,false,false,false,$callback); while($channel->is_open()){ $channel->wait(); } $channel->close(); $connection->close();
启动消费者delay_work.php和生产delay_pub.php
php delay_work.php
php delay_pub.php
因为消息ttl过期时间设置的是20秒,所以消费者在生产者启动20秒后才能获得数据
rabbitmq管理界面新建一个交换器,不然启动生产者delay_pub.php可能会报错(x-delayed-type must be an existing exchange type)
关键字词:rabbitmq,死信队列,死信,php,消息队列,队列,延迟插件