【RabbitMQ】Spring Boot 整合 RabbitMQ
RabbitMQ 配置
- 导入 Maven 依赖
1 | <dependency> |
该场景启动器将导入 RabbitMQ 的自动配置类 RabbitAutoConfiguration
:
其中,RabbitProperties 类绑定了配置文件中的 spring.rabbitmq
前缀:
- 在配置文件中配置 RabbitMQ
1 | spring: |
注意:
rabbitmq
的属性需要配置在spring.rabbitmq
下,否则会走默认的本地端口
- 在主启动类上添加
@EnableRabbit
注解:
1 |
|
- 在配置类中注入
RabbitTemplate
,定制化其消息转换器为 JSON 格式转换器,并设置确认回调和失败返回回调方法:
1 |
|
绑定交换机和队列
- 可以使用
AmqpAdmin
对象创建交换机、创建队列并绑定交换机和队列。
1 |
|
- 更推荐使用 Spring 自动注入的方式创建交换机、队列并绑定二者(注意这种方式注入的队列和交换机不会立即创建,而是在由消费者监听时才会创建):
1 |
|
注意:这种方式注入的队列和交换机不会立即创建,而是在由消费者监听时才会创建
消息生产者
RabbitTemplate
可用于发送消息:
1 |
|
消息消费者
使用 @RabbitListener
或 @RabbitHandler
注解将消费者与队列进行绑定:
@RabbitListener
:标注在类或方法上。如果标注在类上,则该类所有方法都会进行监听;如果标注在方法上,只有该方法会进行监听@RabbitHandler
:标注在方法上。根据方法中形参的类型不同,会自动决定调用哪个方法
示例:
1 | /** |
发布确认与消息应答
为保证消息能够可靠到达不丢失,需要引入发布确认机制与消息应答机制:
- 生产者发布确认
confirmCallback
:生产者消息成功到交换机就异步回调通知生产者消息发布成功 - 生产者消息回退
returnCallback
:生产者消息如果未能从交换机投递到消息队列就异步回调通知生产者消息投递失败 - 消费者消息应答
ack
:消费者如果从消息队列收到消息并确认无误后手动或自动通知 Broker 删除该消息
生产者发布确认
- 在配置文件中开启发布确认功能:
1 | rabbitmq: |
NONE
值是禁用发布确认模式,是默认值。CORRELATED
值是发布消息成功到交换器后会触发回调方法。SIMPLE
值经测试有两种效果,其一效果和CORRELATED
值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate
调用waitForConfirms
或waitForConfirmsOrDie
方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie
方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。
生产者消息回退
- 在配置文件中开启消息回退功能:
1 | spring: |
- 在配置类中注入
RabbitTemplate
,设置其确认回调(发布确认)和失败返回回调(消息回退)方法:
1 |
|
消费者消息应答
消费者的消息应答有两种类型:
- 自动消息应答:在消费者收到消息后就被认为已经传送成功,并从队列中删除。这种方式容易造成消息丢失,因为消费者可能还没处理完该消息就宕机了,这时消息应该重新投递,如果被删掉了就造成消息丢失了
- 手动消息应答分为:
Channel.basicAck
(肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。Channel.basicNack
(否定确认):可以批量否定确认Channel.basicReject
(否定确认):与Channel.basicNack
相比少一个参数。若指定参数requeue=true
,则该消息重新入队;否则丢弃该消息- 如果消费者一直没有调用
ack/nack
方法,brocker 认为此消息正在被处理,不会投递给其他人也不会删除该消息。如果这时客户端断开,该消息不会被 broker 移除,而是会再次投递到队列中等待其他人消费(原理可能是 broker 会检测到其与消费者间的连接中断了,就重新投递)
- 在配置文件中开启消费者手动应答:
1 | spring: |
- 在通道 Channel 中回复确认收到消息
1 |
|
延迟队列
实现延迟队列有两种方式:
- 给队列设置过期时间
- 给每个消息设置过期时间
给队列设置过期时间:x-message-ttl:300000
,整个队列的所有消息的过期时间都等于该值:
给每个消息设置过期时间:expiration:300000
,只有该消息的过期时间等于该值:
两种方式中应该选择给队列设置过期时间。这是因为 RabbitMQ 的惰性检查机制(懒检查机制):如果给每个消息设置过期时间,则 RabbitMQ 服务器每次只会先检查队首消息是否过期,如果没过期,就不检查后面的其他消息了。这就导致后进的消息如果过期时间更短反而不会被检查到,从而不能及时过期。
配置实战
在项目中使用延迟队列时,不为其设置任何消费者来监听,所以消息必定会过期,从而成为死信。过期后就会被路由到死信交换机,死信交换机会将死信消息路由到死信队列。设置消费者监听死信队列,就可以实现延迟一段时间后再消费这些消息。该过程的示意图:
但以上方式需要创建多个交换机,其实没有必要,完全可以用一个交换机做整个服务的总交换机。例如在云商城项目中的消息队列配置示意图:
整个订单服务只设置了一个总交换机,延迟队列和死信队列都与其绑定,通过不同的路由键实现路由到不同的队列中。
创建延迟队列:
1 | /** |