这里是文章模块栏目内容页
php消息rabbitmq的消费者处理消息功能实现

要对rabbitmq服务器的消息进行订阅处理:

实现逻辑是开启一个任务,不间断的处理消息,充当消费者角色;

<?php

namespace app\common\controller;

use app\common\service\NoticeService;
use think\facade\Config;
use think\facade\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use app\common\service\AliSmsService;

class MqConsumer
{
    /**
     * 消费者
     */
    public function consumer($name)
    {
        //获取配置
        $amqp = Config::get('rabbitmq.AMQP');
        $amqpDefail = Config::get('rabbitmq.' . $name . '_queue');

        //连接
        $connection = new AMQPStreamConnection(
            $amqp['host'],
            $amqp['port'],
            $amqp['username'],
            $amqp['password']
        );

        //建立通道
        $channel = $connection->channel();

        //流量控制
        $channel->basic_qos(null, 1, null);

        //初始化交换机
        $channel->exchange_declare($amqpDefail['exchange_name'], $amqpDefail['exchange_type'], false, true, false);

        //初始化队列
        $channel->queue_declare($amqpDefail['queue_name'], false, true, false, false);

        //绑定队列与交换机
        $channel->queue_bind($amqpDefail['queue_name'], $amqpDefail['exchange_name'], $amqpDefail['route_key']);

        //消费消息
        $channel->basic_consume($amqpDefail['queue_name'], $amqpDefail['consumer_tag'], false, false, false, false, [$this, 'msgProc']);

        //退出
        register_shutdown_function([$this, 'shutdown'], $channel, $connection);

        //监听
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

    /**
     * 消息处理
     * @param  $msg
     */
    public function msgProc($msg)
    {
        $data = $msg->body;

        echo " [x] Received ", $data, "\n";

        //队列数据
        $arrData = json_decode($data, true);

        //获取队列信息
        $consumer_tag = $msg->delivery_info['consumer_tag'];

        //处理对应消息的业务功能实现
        unset($arrData);
    }

    /**
     * 退出
     * @param  $channel [信道]
     * @param $connection [连接]
     */
    public function shutdown($channel, $connection)
    {
        $channel->close();
        $connection->close();
    }

}

好了,本文内容全部结束,感谢您的阅读,希望能帮助到你。