<?php
namespace app\controller\mq;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\facade\Log;
class MessageConsume
{
const exchange = 'router';
const queue = 'liusongs';
const consumerTag = 'consumer';
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
Log::write('强制结束了!');
}
function process_message($message)
{
if ($message->body !== 'quit') {
$obj = json_decode($message->body);
//$obj 就是你的 data数据
//逻辑主要判定
if (!isset($obj->id)) {
echo 'error data\n';
Log::write("error data:" . $message->body);
} else {
Log::write("data:" . json_encode($message,JSON_UNESCAPED_UNICODE));
}
}
//手动确认消息 处理
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
}
}
/**
* 启动
* http://tp6.com/mq.message_consume/start
*
*/
public function start()
{
Log::write("开始执行");
$HOST = '127.0.0.1';
$PORT = '5672';
$USER = 'liusongs';
$PASS = 'liusongs';
$VHOST = 'vhost_lss';
$connection = new AMQPStreamConnection($HOST, $PORT, $USER, $PASS,$VHOST);
/**
* Channel一些Api解释
`basicNack` 不确认消息
`basicReject` 拒绝消息
`RecoverOk` 是否恢复消息到队列
`exchangeDeclare` 声明交换机
`queueDeclare` 声明队列
`queueBind` 绑定队列
`queueUnbind` 解绑队列
`exchangeBind` 绑定交换机
`exchangeUnbind` 解绑交换机
`basicQos` 消息流量
`basicAck` 消息确认
`basicConsume` 消息消费
`basicPublish` 发布消息
`basicGet` 主动拉取队列中的一条消息
`basicCancel` 取消消费者对队列的订阅关系
*/
$channel = $connection->channel();
/**
* 声明队列
* queue: 队列名称
* passive 只判断不创建(一般用于判断该交换机是否存在),如果你希望查询交换机是否存在.而又不想在查询时创建这个交换机.设置此为true即可如果交换机不存在,则会抛出一个错误的异常.如果存在则返回NULL
* durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
* exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的
* autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
* $nowait false 如果为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度。
* $arguments array() 其他一些结构化参数。$arguments = new AMQPTable([
'x-message-ttl' => 10000, // 延迟时间 (毫秒)创建queue时设置该参数可指定消息在该queue中待多久,可根据x-dead-letter-routing-key和x-dead-letter-exchange生成可延迟的死信队列。
'x-expires' => 26000, // 队列存活时间 如果一个队列开始没有设置存活时间,后面又设置是无效的。
'x-dead-letter-exchange' => 'exchange_direct_ttl3', // 延迟结束后指向交换机(死信收容交换机)
'x-dead-letter-queue' => 'queue_ttl3', // 延迟结束后指向队列(死信收容队列),
//'x-dead-letter-routing-key' => 'queue_ttl3', // 设置routing-key
//'x-max-priority'=>'10' //声明优先级队列.表示队列应该支持的最大优先级。建议使用1到10之间.该参数会造成额外的CPU消耗。
]
* $ticket null
*/
$channel->queue_declare(self::queue, false, true, false, false);
/**
* 试探性申请一个交换器,若该交换器不存在,则创建;若存在,则跳过
* $exchange 交换器名称。
* $type 交换器类型,常见的如fanout、direct、topic、headers四种。
* $passive false 只判断不创建(一般用于判断该交换机是否存在),如果你希望查询交换机是否存在.而又不想在查询时创建这个交换机.设置此为true即可如果交换机不存在,则会抛出一个错误的异常.如果存在则返回NULL。
* $durable false 设置是否持久化。设置true表示持久化,反之非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
* $auto_delete true 设置是否自动删除。设置true表示自动删除。自动删除的前提:至少有一个队列或交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或交换器都与此解绑。不要错误的理解:“当与此交换器连接的客户端都断开时,RabbitMQ会自动删除本交换器”。
* $internal false 设置是否是内置的。设置true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这个方式。
* $nowait false 如果为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度。
* $arguments array() 其他一些结构化参数。
* $ticket null
*/
$channel->exchange_declare(self::exchange, 'direct', false, true, false);
//绑定。绑定(Binding)是交换机(Exchange)将消息(Message)路由给队列(Queue)所需遵循的规则。
$channel->queue_bind(self::queue, self::exchange);
//我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),
//直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)
$channel->basic_qos(null, 1, null);
//开启消费
/**
* $queue 队列名称
* $consumer_tag 消费者标签,用来区分多个消费者
* $no_local AMQP的标准
* $no_ack 收到消息后,是否不需要回复确认即被认为被消费;设置为true,表示自动应答;设置为false表示手动应答。
* $exclusive 设置是否排他。排他消费者,即这个队列只能由一个消费者消费,适用于任务不允许进行并发处理的情况。
* $nowait 如果为true则表示不等待服务器回执信息,函数将返回NULL,但若排他开启的话,则必须需要等待结果的,如果两个一起开就会报错。
* $callback callback函数,
* $ticket
* $arguments 一些额外配置
*/
$channel->basic_consume(self::queue, self::consumerTag, false, false, false, false, array($this, 'process_message'));
//注册一个 callback ,它会在脚本执行完成或者 exit() 后被调用。 这里 关闭连接
register_shutdown_function(array($this, 'shutdown'), $channel, $connection);
//正在消费时,则等待
while (count($channel->callbacks)) {
$channel->wait();
}
Log::write("执行结束");
}
}