博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
AMQP RabbitMQ 打回消息后延迟处理
阅读量:4109 次
发布时间:2019-05-25

本文共 2859 字,大约阅读时间需要 9 分钟。

AMQP RabbitMQ 打回消息后延迟处理

代码例子是spring boot下的,但是原理都相同。

1. 使用DLX实现延迟消息

注意,要使用DLX实现延迟队列的前提是,接收的队列无任何消费者监听,因为一旦有消费者监听,除非手动打回消息,消息才会进入死信队列(因为我们要实现延迟队列,要么sleep后手动打回,要么就不要给这个队列设置消费者),如果只是单纯不处理(不确认也不打回)只会是Unacked的状态,并不会进入死信队列,也就无法完成延迟转发的效果了,Unacked的状态下只有到这个队列的消费者退出(监听器断开连接)才会进入死信队列

例子中没有宏,没有多余代码,没有复杂交换机和路由键,尽量从简。希望大家能看得懂。

创建一个普通直连交换机和队列

  • 创建一个普通的交换机和队列,注意不要给这个队列添加任何消费者
    很多文章把死信队列搞混,误人子弟,事实上这个队列就是普通队列,不是死信队列,DLX的直译就是死信交换机。在RabbitMQ管理页看见某个队列有DLX标志,并不代表它是死信队列,只是这个队列设置了x-dead-letter-exchange变量。
    我们这个队列只是绑定了这个变量,让消息过期后发送到这个交换机上。
    死信队列是和DLX+DLK绑定的队列,才叫死信队列。
    设置变量、路由键绑定,这2个概念区分开。
@Configuration@EnableRabbitpublic class RabbitmqConfig {
//直连交换机 @Bean Exchange directQueueExchange(){
return ExchangeBuilder.directExchange("directExchange").build(); } //创建一个普通队列 //3个参数,绑定死信交换机和死信队列,还有延迟时间 @Bean Queue normalQueue(){
HashMap
map=new HashMap<>(); //如果消息过期,消息会去哪个交换机 map.put("x-dead-letter-exchange","directExchange"); //消息过期去哪个路由键,为了大家不绕晕,我这里就用delayKey了 map.put("x-dead-letter-routing-key","delayKey"); //消息过期时长 map.put("x-message-ttl",3000); return QueueBuilder.durable("normalQueue").withArguments(map).build(); } //进行绑定 @Bean Binding queueBinding(){
return BindingBuilder.bind(normalQueue()).to(directQueueExchange()).with("normalQueue").noargs(); }}

创建死信队列

  • 上面的代码,消息进入normalQueue后,如果normalQueue没有任何消费者,消息过期后就会根据绑定好的DLX和DLK寻找队列,然后将消息发送到队列,我们下面就需要创建这个接收死信的队列
//为了方便,直接RabbitListener,监听+创建一步到位@Componentpublic class DelayExchangeQueue{
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "delayQueue"), exchange = @Exchange(name = "directExchange") ,key = "delayKey"),ackMode = "MANUAL" ) public void delayConsume(Message msg,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel){
System.out.println(msg); try {
channel.basicAck(deliveryTag, false); } catch (IOException e) {
e.printStackTrace(); } }}

测试

MessageProperties messageProperties = new MessageProperties();        messageProperties.getHeaders().put("desc", "信息描述..");        messageProperties.getHeaders().put("type", "自定义消息类型..");        Message message = new Message("Hoola".getBytes(),messageProperties);        rabbitTemplate.send("directExchange","normalQueue",message);

因为normalQueue没有任何消费者,所以消息进入normalQueue之后,3秒过期,被转发到死信队列(delayQueue),我们就能实现延迟消息了。

2. 消息打回延迟处理

上面讲解了死信队列如何设置和绑定,那么现在讲解一下消息打回如何实现延迟,这里还是用死信队列实现,没有用任何插件。

当ackMode(其他语言设置是noack) 为MANUAL的情况下,我们就需要手动确认消息,当我们触发Nack或Reject 时,他就会将消息打回,如果当前队列没有设置DLX和DLK,那么默认是重回自己本身的队列,如果设置了,那么就会根据DLX和DLK寻找队列。

上面已经有了一个延迟队列

那么我们想实现打回延迟,只需要将当前队列的DLX和DLK绑定到刚才的normalQueue(延迟队列)
现在我们将消息打回(Nack或Reject),消息就会进入normalQueue
normalQueue没有消费者,过期后就进入了最后的死信队列。
顺便 说一下 如果想自定义延迟时间
msg.getMessageProperties().setExpiration(x),可以设置单独消息的过期时间

转载地址:http://kxlsi.baihongyu.com/

你可能感兴趣的文章
【C#】如何实现一个迭代器
查看>>
【C#】利用Conditional属性完成编译忽略
查看>>
DirectX11 光照演示示例Demo
查看>>
VUe+webpack构建单页router应用(一)
查看>>
Node.js-模块和包
查看>>
(python版)《剑指Offer》JZ01:二维数组中的查找
查看>>
管理用户状态——Cookie与Session
查看>>
Spring MVC中使用Thymeleaf模板引擎
查看>>
PHP 7 的五大新特性
查看>>
深入了解php底层机制
查看>>
PHP中的stdClass 【转】
查看>>
XHProf-php轻量级的性能分析工具
查看>>
OpenCV gpu模块样例注释:video_reader.cpp
查看>>
【增强学习在无人驾驶中的应用】
查看>>
OpenCV meanshift目标跟踪总结
查看>>
就在昨天,全球 42 亿 IPv4 地址宣告耗尽!
查看>>
听说玩这些游戏能提升编程能力?
查看>>
如果你还不了解 RTC,那我强烈建议你看看这个!
查看>>
沙雕程序员在无聊的时候,都搞出了哪些好玩的小玩意...
查看>>
Mysql复制表以及复制数据库
查看>>