作为 rabbitMQ 的生产者,发送消息到 MQ 的过程中,是通过 routingkey 发送给交换机,由交换机进行路由,把信息发送的最终的队列中。而 rabbitMQ 消费的时候,是要明确指明消费的队列的。
# 消费模式
rabbitMQ 的消费模式分为两种,推模式和拉模式。推模式使用的是 Basic.Consume 进行消费,而拉模式通过调用 Basic.Get 进行消费。推模式用于持续的获取消息,在推模式中,RabbitMQ 会不断的推送消息给消费者,不过推送的数量可以通过 Basic.Qos 进行限制。拉模式可以单条的获取信息。
# 消费端的确认和拒绝
为了保证消息可以从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制。消费者在订阅队列时,可以指定 autoAck 参数。
- 如果 autoAck 参数为 true,RabbitMQ 会自动把发送出去的消息置为确认,并且从内存(磁盘)中删除,不关消费者是否真正进行了正确消费。
- 如果 autoAck 参数设置为 false,RabbitMQ 会等待消费者显式地回复确认信号后才从内存或者磁盘中删除。
采用消息确认机制后,只要设置 autoAck 为 false,消费者就有足够的时间处理消息,不用担心处理消息过程中,消费者进程断掉导致消息丢失的问题。RabbitMQ 会一直等到消费者显式调用 Basic.ack 命令为止。
# Springboot 使用 amqp 进行消费
在 springboot 中,对消息进行消费有两种方式。
- 轮询:使用 rabbitMQTemplate.receive () 等相关方法进行消费,每次消费一条。
- 注册侦听器:这个方式也是更为灵活,更为常用的。通过
@Bean
设置监听器端点和@RabbitListener
注解的方式实现。
# 代码演示
@Test | |
public void receive() throws UnsupportedEncodingException { | |
Message receive = rabbitTemplate.receive("queue-msg”);// 指定队列名称 | |
System.out.println(new String(receive.getBody(),"utf-8")); | |
Object o = rabbitTemplate.receiveAndConvert("queue-msg”);// 可以支持类型转换 | |
System.out.println(o); | |
} |
这是最简单的消费方式,但是可以看到,虽然简单,功能也很少。
那么更常用的是使用配置监听器的方式
@Component | |
public class RabbitMQReceiver { | |
@RabbitListener(queues = "test.topic",ackMode = "MANUAL") | |
@RabbitHandler() | |
public void receive(String msg,Channel channel, Message message) throws IOException, InterruptedException { | |
System.out.println("接收到消息:RabbitMQReceiver"+message); | |
Thread.sleep(2000); | |
channel.basicQos(2); | |
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); | |
} | |
@RabbitListener(queues = "test.topic",ackMode = "MANUAL") | |
@RabbitHandler() | |
public void receive2(String msg,Channel channel, Message message) throws IOException, InterruptedException { | |
System.out.println("wqerdf:RabbitMQReceiver"+message); | |
Thread.sleep(2000); | |
channel.basicQos(2); | |
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); | |
} | |
} | |
/** | |
这里通过 RabbitListener 注解指定了队列名称,看到 queues 应该已经想到了,可以指定多个; ackMode 指定了消息需要人为的确认 | |
需要注意的是,如果多个消费者方法,每个方法都要有一个 RabbitListener | |
*/ |