# springboot 集成 rabbitMQ2

上篇文章,用 springboot 的默认配置,完成了 rabbitMQ 消息的发送,但是缺点也很明显,就是需要事先在管理页面创建好对应的 Exchange 和 Queue 以及 Binding 关系。但是其实 rabbitMQ 可以在发送的时候进行检查,如不存在,则创建。这个就需要结合 @Configration 和 @ @Bean 进行配置

# rabbitMQ 自定义配置

# 通过 config 文件进行配置

依然按照之前的配置,使用 topic 交换机,交换机名字叫做 config_exchange,并且使用名叫 test.topic 的队列,使用 *.b.* 进行 binding。则相关代码如下:

@Configuration
public class RabbitMQConfig {
    // 参数可以放在环境变量中
    @Bean
    public Queue createQueue() {
        Queue msgQueue = new Queue("test.topic", true);
        return msgQueue;
    }
    @Bean
    public TopicExchange createExchange() {
        // 这三个参数,分别是 交换机名称,是否持久化,以及是否自动删除
        TopicExchange topicExchange = new TopicExchange("config_exchange", true, false);
        return topicExchange;
    }
    
    @Bean
    public Binding createBinding() {
        return BindingBuilder.bind(createQueue()).to(createExchange()).with("*.b.*");
    }
}

# 生产者代码编写

上述代码完成了对 rabbitmq 基本信息的声明,然后就可以向消息队列发送消息了,如果消息队列中不存在,则会创建对应的交换机、queue,并通过路由键进行绑定。

@Test
    public void testMQ() {
        rabbitTemplate.setExchange("config_exchange");
        rabbitTemplate.setRoutingKey("a.b.c");
        for (int i = 0; i < 10000; i++) {
            rabbitTemplate.convertAndSend("发送消息" + i);
        }
    }

可以看到交换机,队列以及绑定关系都正确,并且消息成功送达了。


# 消息送达确认

虽然消息发送成功了,但是我们并不能看到相关的日志和信息表示发送的消息已经送达到 rabbitMQ 服务器。那么如何开启消息送达确认呢?在 rabbitMQTemplate 中,有 setConfirmCallback() ,通过这个可以指定一个回调函数,在消息送达后,执行回调函数,我们可以在回调函数中打印日志,或者做其他操作。

# 配置代码:

@Bean
    public RabbitTemplate createRabbitMq() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info(correlationData.getId() + "发送成功");
            } else {
                log.info("消息发送失败");
            }
        });
        return rabbitTemplate;
    }

这里我们人为创建并且返回了一个 rabbitTemplate,并且指定了 ConfirmCallbback 回调函数。这三个参数分别是 correlationData(在发送消息时随消息指定,这样,回调中就可以知道是哪条消息),ack 表示送达状态,如果是 true,则表示送达成功,否则,则送达失败,同时错误原因在第三个参数 cause 中。

# 生产者代码

@Test
    public void testMQ() {
        for (int i = 0; i < 10000; i++) {
            rabbitTemplate.convertAndSend("config_exchange", "a.b.c", "发送消息" + i, new CorrelationData(String.valueOf(i)));
        }
    }

这里我们简单的随着消息新建了一个 CorrelationData,并且传入了当前消息的 id;只所以用四个参数的发送方法,是因为只有这种方法可以指定对应的交换机,路由键以及 CorrelationData。(可以点进去简单看下源码)。
最终的效果如下:

2022-09-22 22:22:41.530  INFO 71421 --- [nectionFactory4] c.t.s.config.RabbitMQConfig              : 5119发送成功
2022-09-22 22:22:41.530  INFO 71421 --- [ectionFactory18] c.t.s.config.RabbitMQConfig              : 5125发送成功
2022-09-22 22:22:41.530  INFO 71421 --- [nectionFactory8] c.t.s.config.RabbitMQConfig              : 5129发送成功
2022-09-22 22:22:41.530  INFO 71421 --- [ectionFactory20] c.t.s.config.RabbitMQConfig              : 5117发送成功
2022-09-22 22:22:41.530  INFO 71421 --- [nectionFactory9] c.t.s.config.RabbitMQConfig              : 5120发送成功
2022-09-22 22:22:41.530  INFO 71421 --- [nectionFactory1] c.t.s.config.RabbitMQConfig              : 5123发送成功
2022-09-22 22:22:41.530  INFO 71421 --- [nectionFactory7] c.t.s.config.RabbitMQConfig              : 5116发送成功
2022-09-22 22:22:41.534  INFO 71421 --- [nectionFactory9] c.t.s.config.RabbitMQConfig              : 5135发送成功
2022-09-22 22:22:41.534  INFO 71421 --- [ectionFactory20] c.t.s.config.RabbitMQConfig              : 5136发送成功
2022-09-22 22:22:41.534  INFO 71421 --- [ectionFactory18] c.t.s.config.RabbitMQConfig              : 5138发送成功
2022-09-22 22:22:41.534  INFO 71421 --- [nectionFactory1] c.t.s.config.RabbitMQConfig              : 5140发送成功
2022-09-22 22:22:41.534  INFO 71421 --- [nectionFactory9] c.t.s.config.RabbitMQConfig              : 5132发送成功
2022-09-22 22:22:41.534  INFO 71421 --- [ectionFactory20] c.t.s.config.RabbitMQConfig              : 5130发送成功
2022-09-22 22:22:41.534  INFO 71421 --- [nectionFactory8] c.t.s.config.RabbitMQConfig              : 5131发送成功
2022-09-22 22:22:41.534  INFO 71421 --- [nectionFactory4] c.t.s.config.RabbitMQConfig              : 5134发送成功

总结:通过编写 config 文件,实现了在代码中配置交换机,队列,已经对应的绑定关系,并且实现了生产者的送达确认。其实实现方式不仅有这一种方式,还可以通过设置 ConnectFactory 的方式进行配置。