作为 rabbitMQ 的生产者,发送消息到 MQ 的过程中,是通过 routingkey 发送给交换机,由交换机进行路由,把信息发送的最终的队列中。而 rabbitMQ 消费的时候,是要明确指明消费的队列的。

# 消费模式

rabbitMQ 的消费模式分为两种,推模式和拉模式。推模式使用的是 Basic.Consume 进行消费,而拉模式通过调用 Basic.Get 进行消费。推模式用于持续的获取消息,在推模式中,RabbitMQ 会不断的推送消息给消费者,不过推送的数量可以通过 Basic.Qos 进行限制。拉模式可以单条的获取信息。

# 消费端的确认和拒绝

为了保证消息可以从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制。消费者在订阅队列时,可以指定 autoAck 参数。

  • 如果 autoAck 参数为 true,RabbitMQ 会自动把发送出去的消息置为确认,并且从内存(磁盘)中删除,不关消费者是否真正进行了正确消费。
  • 如果 autoAck 参数设置为 false,RabbitMQ 会等待消费者显式地回复确认信号后才从内存或者磁盘中删除。

采用消息确认机制后,只要设置 autoAck 为 false,消费者就有足够的时间处理消息,不用担心处理消息过程中,消费者进程断掉导致消息丢失的问题。RabbitMQ 会一直等到消费者显式调用 Basic.ack 命令为止。

# Springboot 使用 amqp 进行消费

在 springboot 中,对消息进行消费有两种方式。

  • 轮询:使用 rabbitMQTemplate.receive () 等相关方法进行消费,每次消费一条。
  • 注册侦听器:这个方式也是更为灵活,更为常用的。通过 @Bean 设置监听器端点和 @RabbitListener 注解的方式实现。

# 代码演示

@Test
    public void receive() throws UnsupportedEncodingException {
        Message receive = rabbitTemplate.receive("queue-msg”);// 指定队列名称
        System.out.println(new String(receive.getBody(),"utf-8"));
        Object o = rabbitTemplate.receiveAndConvert("queue-msg”);// 可以支持类型转换
        System.out.println(o);
    }

这是最简单的消费方式,但是可以看到,虽然简单,功能也很少。
那么更常用的是使用配置监听器的方式

@Component
public class RabbitMQReceiver {
    @RabbitListener(queues = "test.topic",ackMode = "MANUAL")
    @RabbitHandler()
    public void receive(String msg,Channel channel, Message message) throws IOException, InterruptedException {
        System.out.println("接收到消息:RabbitMQReceiver"+message);
        Thread.sleep(2000);
        channel.basicQos(2);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
    @RabbitListener(queues = "test.topic",ackMode = "MANUAL")
    @RabbitHandler()
    public void receive2(String msg,Channel channel, Message message) throws IOException, InterruptedException {
        System.out.println("wqerdf:RabbitMQReceiver"+message);
        Thread.sleep(2000);
        channel.basicQos(2);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
/**
这里通过 RabbitListener 注解指定了队列名称,看到 queues 应该已经想到了,可以指定多个; ackMode 指定了消息需要人为的确认
需要注意的是,如果多个消费者方法,每个方法都要有一个 RabbitListener
*/