要对rabbitmq服务器的消息进行订阅处理:
实现逻辑是开启一个任务,不间断的处理消息,充当消费者角色;
<?php
namespace app\common\controller;
use app\common\service\NoticeService;
use think\facade\Config;
use think\facade\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use app\common\service\AliSmsService;
class MqConsumer
{
/**
* 消费者
*/
public function consumer($name)
{
//获取配置
$amqp = Config::get('rabbitmq.AMQP');
$amqpDefail = Config::get('rabbitmq.' . $name . '_queue');
//连接
$connection = new AMQPStreamConnection(
$amqp['host'],
$amqp['port'],
$amqp['username'],
$amqp['password']
);
//建立通道
$channel = $connection->channel();
//流量控制
$channel->basic_qos(null, 1, null);
//初始化交换机
$channel->exchange_declare($amqpDefail['exchange_name'], $amqpDefail['exchange_type'], false, true, false);
//初始化队列
$channel->queue_declare($amqpDefail['queue_name'], false, true, false, false);
//绑定队列与交换机
$channel->queue_bind($amqpDefail['queue_name'], $amqpDefail['exchange_name'], $amqpDefail['route_key']);
//消费消息
$channel->basic_consume($amqpDefail['queue_name'], $amqpDefail['consumer_tag'], false, false, false, false, [$this, 'msgProc']);
//退出
register_shutdown_function([$this, 'shutdown'], $channel, $connection);
//监听
while (count($channel->callbacks)) {
$channel->wait();
}
}
/**
* 消息处理
* @param $msg
*/
public function msgProc($msg)
{
$data = $msg->body;
echo " [x] Received ", $data, "\n";
//队列数据
$arrData = json_decode($data, true);
//获取队列信息
$consumer_tag = $msg->delivery_info['consumer_tag'];
//处理对应消息的业务功能实现
unset($arrData);
}
/**
* 退出
* @param $channel [信道]
* @param $connection [连接]
*/
public function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
}
好了,本文内容全部结束,感谢您的阅读,希望能帮助到你。