【RabbitMQ】Spring Boot 整合 RabbitMQ

RabbitMQ 配置

  1. 导入 Maven 依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

该场景启动器将导入 RabbitMQ 的自动配置类 RabbitAutoConfiguration

image-20220115204414635

image-20220115210448861

其中,RabbitProperties 类绑定了配置文件中的 spring.rabbitmq 前缀:

image-20220115210648640
  1. 在配置文件中配置 RabbitMQ
1
2
3
4
5
6
7
8
9
10
11
12
spring:
rabbitmq:
host: yuyunzhao.cn
port: 5672
virtual-host: / # 设置虚拟主机
publisher-confirm-type: correlated # 是否启用【发布确认】:发布消息成功到交换器后会触发回调方法
publisher-returns: true # 启用【消息回退】:发送端消息抵达 Queue 失败时进行回调
template:
mandatory: true # 开启强制消息投递:发送端消息抵达 Queue 失败时进行回调,二者需要同时开启
listener:
simple:
acknowledge-mode: manual # 开启消费者手动应答。默认为自动应答

注意:rabbitmq 的属性需要配置在 spring.rabbitmq 下,否则会走默认的本地端口

  1. 在主启动类上添加 @EnableRabbit 注解:
1
2
3
4
5
6
7
@EnableRabbit
@SpringBootApplication
public class MallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(MallOrderApplication.class, args);
}
}
  1. 在配置类中注入 RabbitTemplate,定制化其消息转换器为 JSON 格式转换器,并设置确认回调失败返回回调方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Configuration
public class MyRabbitConfig {

private RabbitTemplate rabbitTemplate;

/**
* 消息转换器:使用 JSON 序列化方式将 POJO 以 JSON 形式保存到 RabbitMQ 中
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

/**
* 定制 RabbitTemplate,为其设置 JSON 消息转换器
*/
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}

/**
* 定制 RabbitTemplate
* 1. broker 收到消息就会回调
* 1.1 设置 spring.rabbitmq.publisher-confirm-type: correlated
* 1.2 设置确认回调
* 2. 消息无法正常抵达队列就会进行回调
* 2.1 设置 spring.rabbitmq.publisher-returns: true
* 设置 spring.rabbitmq.template.mandatory: true
* 2.2 设置确认回调 ReturnCallback
* 3. 消费端确认(保证每个消息都被正确消费,此时才可以从队列中删除这个消息)
*/
public void initRabbitTemplate() {
/**
* 1. 只要消息抵达 Broker 就 ack = true。并设置确认回调
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});

/**
* 2. 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}

绑定交换机和队列

  1. 可以使用 AmqpAdmin 对象创建交换机、创建队列并绑定交换机和队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Autowired
AmqpAdmin amqpAdmin;

/**
* 创建交换机
*/
@Test
public void contextLoads() {
DirectExchange directExchange = new DirectExchange("hello-java.exchange", true, false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功:","hello-java.exchange");
}

/**
* 创建队列
*/
@Test
public void createQueue() {
Queue queue = new Queue("hello-java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]:","创建成功");
}

/**
* 绑定队列
*/
@Test
public void createBinding() {
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java.exchange",
"hello.java", null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功","hello-java-binding");
}
  1. 更推荐使用 Spring 自动注入的方式创建交换机、队列并绑定二者(注意这种方式注入的队列和交换机不会立即创建,而是在由消费者监听时才会创建):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
@Configuration
public class MyRabbitMQConfig {
/**
* 订单服务总交换机
* @return
*/
@Bean
public Exchange orderEventExchange() {
/**
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
*/
return new TopicExchange("order-event-exchange", true, false);
}

/**
* 延迟队列
* @return
*/
@Bean
public Queue orderDelayQueue() {
/**
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 属性
*/
HashMap<String, Object> arguments = new HashMap<>();
// 死信交换机
arguments.put("x-dead-letter-exchange", "order-event-exchange");
// 死信路由键
arguments.put("x-dead-letter-routing-key", "order.release.order");
// 消息过期时间 1分钟
arguments.put("x-message-ttl", 60000);
return new Queue("order.delay.queue", true, false, false, arguments);
}

/**
* 普通队列(死信队列,负责存放过期的消息)
* @return
*/
@Bean
public Queue orderReleaseQueue() {
return new Queue("order.release.order.queue", true, false, false);
}

/**
* 创建订单的binding
*
* @return
*/
@Bean
public Binding orderCreateBinding() {
/**
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
*/
return new Binding("order.delay.queue", Binding.DestinationType.QUEUE,
"order-event-exchange", "order.create.order", null);
}

@Bean
public Binding orderReleaseBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}

@Bean
public Binding orderReleaseOrderBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}

/**
* 商品秒杀队列
*
* @return
*/
@Bean
public Queue orderSecKillOrrderQueue() {
Queue queue = new Queue("order.seckill.order.queue", true, false, false);
return queue;
}

@Bean
public Binding orderSecKillOrrderQueueBinding() {
Binding binding = new Binding(
"order.seckill.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.seckill.order",
null);

return binding;
}
}

注意:这种方式注入的队列和交换机不会立即创建,而是在由消费者监听时才会创建

消息生产者

RabbitTemplate 可用于发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Autowired
@Test
public void sendMessageTest() {
for(int i = 1; i <= 5; i++) {
if(i % 2 == 0) {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1l);
reasonEntity.setCreateTime(new java.util.Date());
reasonEntity.setName("哈哈");

String msg = "Hello World";
// 发送消息(将被转换成 JSON 格式)
rabbitTemplate.convertAndSend("hello-java.exchange", "hello.java", reasonEntity);
} else {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("hello-java.exchange", "hello.java", orderEntity);
}
log.info("消息发送完成");
}
}

消息消费者

使用 @RabbitListener@RabbitHandler 注解将消费者与队列进行绑定:

  • @RabbitListener:标注在类或方法上。如果标注在类上,则该类所有方法都会进行监听;如果标注在方法上,只有该方法会进行监听
  • @RabbitHandler:标注在方法上。根据方法中形参的类型不同,会自动决定调用哪个方法

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 监听死信队列(关单队列) order.release.order.queue,将过期的订单关闭
*/
@Component
@RabbitListener(queues = {"order.release.order.queue"})
public class OrderCloseListener {

@Autowired
private OrderService orderService;

/**
* 从队列中拿到订单实体对象 OrderEntity,调用 OrderService 关闭该订单
* 1. 如果关闭订单成功,就手动回复成功 Ack,从队列中删除该消息;
* 2. 如果关闭订单失败,就回复失败 Reject,并且重新入队:requeue=true,等待其他消费者重新消费该消息
* @param orderEntity
* @param message
* @param channel
* @throws IOException
*/
@RabbitHandler
public void listener(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
orderService.closeOrder(orderEntity);
channel.basicAck(deliveryTag,false);
} catch (Exception e){
channel.basicReject(deliveryTag, true);
}
}
}

发布确认与消息应答

为保证消息能够可靠到达不丢失,需要引入发布确认机制与消息应答机制:

  • 生产者发布确认 confirmCallback:生产者消息成功到交换机异步回调通知生产者消息发布成功
  • 生产者消息回退 returnCallback:生产者消息如果未能从交换机投递到消息队列就异步回调通知生产者消息投递失败
  • 消费者消息应答 ack:消费者如果从消息队列收到消息并确认无误后手动或自动通知 Broker 删除该消息

image-20220118194850488

生产者发布确认

  1. 在配置文件中开启发布确认功能:
1
2
rabbitmq:
publisher-confirm-type: correlated # 开启【发布确认】:发布消息成功到交换器后会触发回调方法,异步通知生产者发布成功
  • NONE 值是禁用发布确认模式,是默认值。
  • CORRELATED 值是发布消息成功到交换器后会触发回调方法。
  • SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirmswaitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

生产者消息回退

  1. 在配置文件中开启消息回退功能:
1
2
3
4
5
spring:
rabbitmq:
publisher-returns: true # 开启【消息回退】:发送端消息抵达 Queue 失败时进行回调
template:
mandatory: true # 开启强制消息投递,需要与上面同时开启
  1. 在配置类中注入 RabbitTemplate,设置其确认回调(发布确认)和失败返回回调(消息回退)方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Configuration
public class MyRabbitConfig {

private RabbitTemplate rabbitTemplate;

/**
* 消息转换器:使用 JSON 序列化方式将 POJO 以 JSON 形式保存到 RabbitMQ 中
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

/**
* 定制 RabbitTemplate,为其设置 JSON 消息转换器
*/
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}

/**
* 定制 RabbitTemplate
* 1. 交换机收到消息就会回调:设置确认回调 ConfirmCallback
* 2. 消息无法正常抵达队列就会进行回调:设置确认回调 ReturnCallback
*/
public void initRabbitTemplate() {
/**
* 1. 只要消息抵达 Broker 就 ack = true。并设置确认回调
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});

/**
* 2. 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}

消费者消息应答

消费者的消息应答有两种类型:

  • 自动消息应答:在消费者收到消息后就被认为已经传送成功,并从队列中删除。这种方式容易造成消息丢失,因为消费者可能还没处理完该消息就宕机了,这时消息应该重新投递,如果被删掉了就造成消息丢失了
  • 手动消息应答分为:
    • Channel.basicAck(肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
    • Channel.basicNack(否定确认):可以批量否定确认
    • Channel.basicReject(否定确认):与 Channel.basicNack 相比少一个参数。若指定参数 requeue=true,则该消息重新入队;否则丢弃该消息
    • 如果消费者一直没有调用 ack/nack 方法,brocker 认为此消息正在被处理,不会投递给其他人也不会删除该消息。如果这时客户端断开,该消息不会被 broker 移除,而是会再次投递到队列中等待其他人消费(原理可能是 broker 会检测到其与消费者间的连接中断了,就重新投递)
  1. 在配置文件中开启消费者手动应答:
1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 消费者手动应答
  1. 在通道 Channel 中回复确认收到消息
1
2
3
4
5
6
7
@RabbitHandler
public void recieveMessage(Message message.
OrderReturnEntity content,
Channel channel) throws InterruptedException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
}

延迟队列

实现延迟队列有两种方式:

  • 队列设置过期时间
  • 每个消息设置过期时间

队列设置过期时间:x-message-ttl:300000,整个队列的所有消息的过期时间都等于该值:

image-20220120100007189

每个消息设置过期时间:expiration:300000,只有该消息的过期时间等于该值:

image-20220120100046735

两种方式中应该选择给队列设置过期时间。这是因为 RabbitMQ 的惰性检查机制(懒检查机制):如果给每个消息设置过期时间,则 RabbitMQ 服务器每次只会先检查队首消息是否过期,如果没过期,就不检查后面的其他消息了。这就导致后进的消息如果过期时间更短反而不会被检查到,从而不能及时过期。

配置实战

在项目中使用延迟队列时,不为其设置任何消费者来监听,所以消息必定会过期,从而成为死信。过期后就会被路由到死信交换机,死信交换机会将死信消息路由到死信队列。设置消费者监听死信队列,就可以实现延迟一段时间后再消费这些消息。该过程的示意图:

image-20220120100309381

但以上方式需要创建多个交换机,其实没有必要,完全可以用一个交换机做整个服务的总交换机。例如在云商城项目中的消息队列配置示意图:

image-20220120101110172

整个订单服务只设置了一个总交换机,延迟队列和死信队列都与其绑定,通过不同的路由键实现路由到不同的队列中。

创建延迟队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 订单服务延迟队列 order.delay.queue。
* 每个订单创建成功后,都需要向延迟队列发送消息,等待30分钟后判断是否需要取消订单
* 没有消费者监听该队列
* @return
*/
@Bean
public Queue orderDelayQueue() {
/**
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 属性
*/
HashMap<String, Object> arguments = new HashMap<>();
// 死信交换机
arguments.put("x-dead-letter-exchange", "order-event-exchange");
// 死信路由键
arguments.put("x-dead-letter-routing-key", "order.release.order");
// 消息过期时间 1分钟
arguments.put("x-message-ttl", 60000);
return new Queue("order.delay.queue", true, false, false, arguments);
}