本文共 2859 字,大约阅读时间需要 9 分钟。
代码例子是spring boot下的,但是原理都相同。
注意,要使用
例子中没有宏,没有多余代码,没有复杂交换机和路由键,尽量从简。希望大家能看得懂。DLX
实现延迟队列
的前提是,接收的队列无任何消费者监听
,因为一旦有消费者监听
,除非手动打回
消息,消息才会进入死信队列
(因为我们要实现延迟队列
,要么sleep后手动打回
,要么就不要给这个队列设置消费者
),如果只是单纯不处理(不确认也不打回)
只会是Unacked
的状态,并不会进入死信队列,也就无法完成延迟转发的效果了,Unacked
的状态下只有到这个队列的消费者退出(监听器断开连接)
才会进入死信队列
- 创建一个普通的交换机和队列,注意不要给这个队列添加任何消费者 很多文章把死信队列搞混,误人子弟,事实上这个队列就是普通队列,不是死信队列,DLX的直译就是死信交换机。在RabbitMQ管理页看见某个队列有DLX标志,并不代表它是死信队列,只是这个队列设置了x-dead-letter-exchange变量。 我们这个队列只是绑定了这个变量,让消息过期后发送到这个交换机上。 死信队列是和DLX+DLK绑定的队列,才叫死信队列。 设置变量、路由键绑定,这2个概念区分开。
@Configuration@EnableRabbitpublic class RabbitmqConfig { //直连交换机 @Bean Exchange directQueueExchange(){ return ExchangeBuilder.directExchange("directExchange").build(); } //创建一个普通队列 //3个参数,绑定死信交换机和死信队列,还有延迟时间 @Bean Queue normalQueue(){ HashMapmap=new HashMap<>(); //如果消息过期,消息会去哪个交换机 map.put("x-dead-letter-exchange","directExchange"); //消息过期去哪个路由键,为了大家不绕晕,我这里就用delayKey了 map.put("x-dead-letter-routing-key","delayKey"); //消息过期时长 map.put("x-message-ttl",3000); return QueueBuilder.durable("normalQueue").withArguments(map).build(); } //进行绑定 @Bean Binding queueBinding(){ return BindingBuilder.bind(normalQueue()).to(directQueueExchange()).with("normalQueue").noargs(); }}
- 上面的代码,消息进入normalQueue后,如果normalQueue没有任何消费者,消息过期后就会根据绑定好的DLX和DLK寻找队列,然后将消息发送到队列,我们下面就需要创建这个接收死信的队列
//为了方便,直接RabbitListener,监听+创建一步到位@Componentpublic class DelayExchangeQueue{ @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "delayQueue"), exchange = @Exchange(name = "directExchange") ,key = "delayKey"),ackMode = "MANUAL" ) public void delayConsume(Message msg,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel){ System.out.println(msg); try { channel.basicAck(deliveryTag, false); } catch (IOException e) { e.printStackTrace(); } }}
MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定义消息类型.."); Message message = new Message("Hoola".getBytes(),messageProperties); rabbitTemplate.send("directExchange","normalQueue",message);
因为normalQueue没有任何消费者,所以消息进入normalQueue之后,3秒过期,被转发到死信队列(delayQueue),我们就能实现延迟消息了。
上面讲解了死信队列如何设置和绑定,那么现在讲解一下消息打回如何实现延迟,这里还是用死信队列实现,没有用任何插件。
当ackMode(其他语言设置是noack) 为MANUAL的情况下,我们就需要手动确认消息,当我们触发Nack或Reject 时,他就会将消息打回,如果当前队列没有设置DLX和DLK,那么默认是重回自己本身的队列,如果设置了,那么就会根据DLX和DLK寻找队列。
上面已经有了一个延迟队列
那么我们想实现打回延迟,只需要将当前队列的DLX和DLK绑定到刚才的normalQueue(延迟队列) 现在我们将消息打回(Nack或Reject),消息就会进入normalQueue normalQueue没有消费者,过期后就进入了最后的死信队列。 顺便 说一下 如果想自定义延迟时间 msg.getMessageProperties().setExpiration(x),可以设置单独消息的过期时间
转载地址:http://kxlsi.baihongyu.com/