# Springboot 集成 rabbitmq 之 mandatory 和备份交换机

# mandatory

之前编写的消息队列代码中,通过重写 ConfirmCallback 中的 confirm 方法实现了消息送达的确认以及出错的处理,但是,该方法无法判断消息投递到不存在的队列中导致失败的问题。 mandatory 是 channel.basicPublish 方法中的参数,当 mandatory 设置为 true 时,交换机无法根据自身的类型和路由键找到一个符合条件的队列,那么就会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,出现上述情形,则消息直接丢弃。

# springboot 中使用 mandatory

如果不使用 amqp 框架,那么可以调用 channel.addReturnListener 来添加 ReturnListener 监听器实现。
在 springboot 框架中,可以在 rabbitmqTemplate 中设置 setReturnsCallback() 方法,并且把 rabbitmqTemplate 的 Mandatory 设置为 true. 具体示例如下:

//RabbitConfig.java
    @Bean
    public RabbitTemplate createRabbitMq() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(returnedMessage -> log.info("消息发送出错:" + returnedMessage.getReplyText()));
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info(correlationData.getId() + "发送成功");
            } else {
                log.info("消息发送失败,{}",cause);
            }
        });
        return rabbitTemplate;
    }

这里在 setReturnCallback 方法中,添加了 log,打印了出错原因;此外还可以打印发送的消息,这样可以更精准的定位问题。
这样,如果消息被发送到不存在的 routingKey,就会触发 ReturnCallBack 方法,打印出对应问题。

2022-10-01 22:36:06.271  INFO 17968 --- [nectionFactory2] c.t.s.config.RabbitMQConfig              : 0发送成功
2022-10-01 22:36:06.290  INFO 17968 --- [nectionFactory2] c.t.s.config.RabbitMQConfig              : 消息发送出错:(Body:'发送消息1' MessageProperties [headers={spring_returned_message_correlation=1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.290  INFO 17968 --- [nectionFactory3] c.t.s.config.RabbitMQConfig              : 消息发送出错:(Body:'发送消息2' MessageProperties [headers={spring_returned_message_correlation=2}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.290  INFO 17968 --- [nectionFactory1] c.t.s.config.RabbitMQConfig              : 1发送成功
2022-10-01 22:36:06.291  INFO 17968 --- [nectionFactory4] c.t.s.config.RabbitMQConfig              : 2发送成功
2022-10-01 22:36:06.313  INFO 17968 --- [nectionFactory4] c.t.s.config.RabbitMQConfig              : 消息发送出错:(Body:'发送消息5' MessageProperties [headers={spring_returned_message_correlation=5}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.313  INFO 17968 --- [nectionFactory3] c.t.s.config.RabbitMQConfig              : 消息发送出错:(Body:'发送消息4' MessageProperties [headers={spring_returned_message_correlation=4}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.313  INFO 17968 --- [nectionFactory1] c.t.s.config.RabbitMQConfig              : 5发送成功

# 备份交换机

上述的逻辑中提到了,将 Mandatory 参数设置为 True,并且设置了对应的 ReturnBack 回调方法,如果不想使代码变得繁琐,不编写 ReturnBack 方法,没有设置 Mandatory 参数,并且,又想要消息不丢失,那么可以使用备份交换机,这样,未被路由的信息,就会存储在备用交换机中,可以以后再去处理;

# springboot 配置备份交换机

# 声明备用交换机,队列,和绑定关系

注意,备用交换机必须是 Fanout 类型

//RabbitConfig.java
public Queue backupQueue() {
        return new Queue("backup", true,false,false);
    }
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("backup", true, false);
    }
    @Bean
    public Binding backupBinding(){
        return BindingBuilder.bind(backupQueue()).to(fanoutExchange());
    }

上述代码配置了对应的交换机和队列,并且进行了绑定;

# 给交换机添加备用交换机

这里有个很大的坑,就是已经创建的交换机是不能修改的,所以需要删除原来的交换机或者创建一个新的交换机
所以,这里新创建了一个交换机

@Bean
    public TopicExchange createExchange() {
        // 这三个参数,分别是 交换机名称,是否持久化,以及是否自动删除
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange", "backup");
        TopicExchange topicExchange = new TopicExchange("config_exchange2", true, false,arguments);
        return topicExchange;
    }

这个交换机也需要绑定队列等等,略。
此时,给这个交换机发送消息到不存在的路由键,则会将这些消息转到对应的备份交换机及队列中。

可以看到,如果交换机配置了备份交换机,会有 AE 的标签。

# 总结

介绍了消息在发送给不存在交换机时可能造成的丢失问题,以及如何检测该问题。个人感觉配置备份交换机是一个比较稳妥的方式,但是备用交换机的创建还是有一些坑存在的。
1、备用交换机是 Fanout 模式
2、已经创建的交换机,不能修改添加备用交换机,会报 PRECONDITION_FAILED - inequivalent arg 的错误。