# Springboot 集成 rabbitmq 之 mandatory 和备份交换机
# mandatory
之前编写的消息队列代码中,通过重写 ConfirmCallback 中的 confirm 方法实现了消息送达的确认以及出错的处理,但是,该方法无法判断消息投递到不存在的队列中导致失败的问题。 mandatory 是 channel.basicPublish 方法中的参数,当 mandatory 设置为 true 时,交换机无法根据自身的类型和路由键找到一个符合条件的队列,那么就会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,出现上述情形,则消息直接丢弃。
# springboot 中使用 mandatory
如果不使用 amqp 框架,那么可以调用 channel.addReturnListener 来添加 ReturnListener 监听器实现。
在 springboot 框架中,可以在 rabbitmqTemplate 中设置 setReturnsCallback()
方法,并且把 rabbitmqTemplate 的 Mandatory
设置为 true. 具体示例如下:
//RabbitConfig.java | |
@Bean | |
public RabbitTemplate createRabbitMq() { | |
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); | |
rabbitTemplate.setMandatory(true); | |
rabbitTemplate.setReturnsCallback(returnedMessage -> log.info("消息发送出错:" + returnedMessage.getReplyText())); | |
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { | |
if (ack) { | |
log.info(correlationData.getId() + "发送成功"); | |
} else { | |
log.info("消息发送失败,{}",cause); | |
} | |
}); | |
return rabbitTemplate; | |
} |
这里在 setReturnCallback 方法中,添加了 log,打印了出错原因;此外还可以打印发送的消息,这样可以更精准的定位问题。
这样,如果消息被发送到不存在的 routingKey,就会触发 ReturnCallBack 方法,打印出对应问题。
2022-10-01 22:36:06.271 INFO 17968 --- [nectionFactory2] c.t.s.config.RabbitMQConfig : 0发送成功
2022-10-01 22:36:06.290 INFO 17968 --- [nectionFactory2] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息1' MessageProperties [headers={spring_returned_message_correlation=1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.290 INFO 17968 --- [nectionFactory3] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息2' MessageProperties [headers={spring_returned_message_correlation=2}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.290 INFO 17968 --- [nectionFactory1] c.t.s.config.RabbitMQConfig : 1发送成功
2022-10-01 22:36:06.291 INFO 17968 --- [nectionFactory4] c.t.s.config.RabbitMQConfig : 2发送成功
2022-10-01 22:36:06.313 INFO 17968 --- [nectionFactory4] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息5' MessageProperties [headers={spring_returned_message_correlation=5}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.313 INFO 17968 --- [nectionFactory3] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息4' MessageProperties [headers={spring_returned_message_correlation=4}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.313 INFO 17968 --- [nectionFactory1] c.t.s.config.RabbitMQConfig : 5发送成功
# 备份交换机
上述的逻辑中提到了,将 Mandatory 参数设置为 True,并且设置了对应的 ReturnBack 回调方法,如果不想使代码变得繁琐,不编写 ReturnBack 方法,没有设置 Mandatory 参数,并且,又想要消息不丢失,那么可以使用备份交换机,这样,未被路由的信息,就会存储在备用交换机中,可以以后再去处理;
# springboot 配置备份交换机
# 声明备用交换机,队列,和绑定关系
注意,备用交换机必须是 Fanout 类型
//RabbitConfig.java | |
public Queue backupQueue() { | |
return new Queue("backup", true,false,false); | |
} | |
@Bean | |
public FanoutExchange fanoutExchange() { | |
return new FanoutExchange("backup", true, false); | |
} | |
@Bean | |
public Binding backupBinding(){ | |
return BindingBuilder.bind(backupQueue()).to(fanoutExchange()); | |
} |
上述代码配置了对应的交换机和队列,并且进行了绑定;
# 给交换机添加备用交换机
这里有个很大的坑,就是已经创建的交换机是不能修改的,所以需要删除原来的交换机或者创建一个新的交换机
所以,这里新创建了一个交换机
@Bean | |
public TopicExchange createExchange() { | |
// 这三个参数,分别是 交换机名称,是否持久化,以及是否自动删除 | |
Map<String, Object> arguments = new HashMap<>(); | |
arguments.put("alternate-exchange", "backup"); | |
TopicExchange topicExchange = new TopicExchange("config_exchange2", true, false,arguments); | |
return topicExchange; | |
} |
这个交换机也需要绑定队列等等,略。
此时,给这个交换机发送消息到不存在的路由键,则会将这些消息转到对应的备份交换机及队列中。
可以看到,如果交换机配置了备份交换机,会有 AE 的标签。
# 总结
介绍了消息在发送给不存在交换机时可能造成的丢失问题,以及如何检测该问题。个人感觉配置备份交换机是一个比较稳妥的方式,但是备用交换机的创建还是有一些坑存在的。
1、备用交换机是 Fanout 模式
2、已经创建的交换机,不能修改添加备用交换机,会报 PRECONDITION_FAILED - inequivalent arg
的错误。