rabbitmq 消费 MessageConsume

<?php


namespace app\controller\mq;


use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\facade\Log;


class MessageConsume
{

    const exchange = 'router';
    const queue = 'liusongs';
    const consumerTag = 'consumer';

    function shutdown($channel, $connection)
    {
        $channel->close();
        $connection->close();
        Log::write('强制结束了!');
    }

    function process_message($message)
    {
        if ($message->body !== 'quit') {
            $obj = json_decode($message->body);
            //$obj 就是你的 data数据
            //逻辑主要判定
            if (!isset($obj->id)) {
                echo 'error data\n';
                Log::write("error data:" . $message->body);
            } else {
                Log::write("data:" . json_encode($message,JSON_UNESCAPED_UNICODE));
            }
        }
        //手动确认消息 处理
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        // Send a message with the string "quit" to cancel the consumer.
        if ($message->body === 'quit') {
            $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
        }
    }


    /**
     * 启动
     * http://tp6.com/mq.message_consume/start
     *
     */
    public function start()
    {
        Log::write("开始执行");
        $HOST   = '127.0.0.1';
        $PORT   = '5672';
        $USER   = 'liusongs';
        $PASS   = 'liusongs';
        $VHOST  = 'vhost_lss';
        $connection = new AMQPStreamConnection($HOST, $PORT, $USER, $PASS,$VHOST);
        /**
         * Channel一些Api解释
           `basicNack` 不确认消息
           `basicReject` 拒绝消息
           `RecoverOk` 是否恢复消息到队列
           `exchangeDeclare` 声明交换机
           `queueDeclare` 声明队列
           `queueBind` 绑定队列
           `queueUnbind` 解绑队列
           `exchangeBind` 绑定交换机
           `exchangeUnbind` 解绑交换机
           `basicQos` 消息流量
           `basicAck` 消息确认
           `basicConsume` 消息消费
           `basicPublish` 发布消息
           `basicGet` 主动拉取队列中的一条消息
           `basicCancel` 取消消费者对队列的订阅关系
         */
        $channel = $connection->channel();

        /**
         * 声明队列
         * queue: 队列名称
         * passive 只判断不创建(一般用于判断该交换机是否存在),如果你希望查询交换机是否存在.而又不想在查询时创建这个交换机.设置此为true即可如果交换机不存在,则会抛出一个错误的异常.如果存在则返回NULL
         * durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
         * exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的
         * autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
         * $nowait false 如果为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度。
         * $arguments array() 其他一些结构化参数。$arguments = new AMQPTable([
                'x-message-ttl' => 10000,  // 延迟时间 (毫秒)创建queue时设置该参数可指定消息在该queue中待多久,可根据x-dead-letter-routing-key和x-dead-letter-exchange生成可延迟的死信队列。
                'x-expires' => 26000,  // 队列存活时间  如果一个队列开始没有设置存活时间,后面又设置是无效的。
                'x-dead-letter-exchange' => 'exchange_direct_ttl3',  // 延迟结束后指向交换机(死信收容交换机)
                'x-dead-letter-queue' => 'queue_ttl3',  // 延迟结束后指向队列(死信收容队列),
                //'x-dead-letter-routing-key' => 'queue_ttl3',  // 设置routing-key
                //'x-max-priority'=>'10' //声明优先级队列.表示队列应该支持的最大优先级。建议使用1到10之间.该参数会造成额外的CPU消耗。
                ]
         * $ticket null
         */
        $channel->queue_declare(self::queue, false, true, false, false);

        /**
         * 试探性申请一个交换器,若该交换器不存在,则创建;若存在,则跳过
         * $exchange   交换器名称。
         * $type   交换器类型,常见的如fanout、direct、topic、headers四种。
         * $passive false 只判断不创建(一般用于判断该交换机是否存在),如果你希望查询交换机是否存在.而又不想在查询时创建这个交换机.设置此为true即可如果交换机不存在,则会抛出一个错误的异常.如果存在则返回NULL。
         * $durable false 设置是否持久化。设置true表示持久化,反之非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
         * $auto_delete true 设置是否自动删除。设置true表示自动删除。自动删除的前提:至少有一个队列或交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或交换器都与此解绑。不要错误的理解:“当与此交换器连接的客户端都断开时,RabbitMQ会自动删除本交换器”。
         * $internal false 设置是否是内置的。设置true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这个方式。
         * $nowait false 如果为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度。
         * $arguments array() 其他一些结构化参数。
         * $ticket null
         */
        $channel->exchange_declare(self::exchange, 'direct', false, true, false);

        //绑定。绑定(Binding)是交换机(Exchange)将消息(Message)路由给队列(Queue)所需遵循的规则。
        $channel->queue_bind(self::queue, self::exchange);


        //我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),
        //直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)
        $channel->basic_qos(null, 1, null);

        //开启消费
        /**
         * $queue 队列名称
         * $consumer_tag 消费者标签,用来区分多个消费者
         * $no_local AMQP的标准
         * $no_ack 收到消息后,是否不需要回复确认即被认为被消费;设置为true,表示自动应答;设置为false表示手动应答。
         * $exclusive 设置是否排他。排他消费者,即这个队列只能由一个消费者消费,适用于任务不允许进行并发处理的情况。
         * $nowait  如果为true则表示不等待服务器回执信息,函数将返回NULL,但若排他开启的话,则必须需要等待结果的,如果两个一起开就会报错。
         * $callback  callback函数,
         * $ticket
         * $arguments 一些额外配置
         */
        $channel->basic_consume(self::queue, self::consumerTag, false, false, false, false, array($this, 'process_message'));

        //注册一个 callback ,它会在脚本执行完成或者 exit() 后被调用。 这里 关闭连接
        register_shutdown_function(array($this, 'shutdown'), $channel, $connection);

        //正在消费时,则等待
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        Log::write("执行结束");
    }
}

 

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注