实现可靠的异步下单:引入RabbitMQ
- 当前V2.0架构的痛点:用于异步下单的BlockingQueue是一个纯粹的Java内存队列
- 存在数据丢失风险:如果Spring Boot应用因为任何原因(意外的bug/服务器重启/部署更新)而崩溃,所有还存放在这个内存队列中、尚未被处理的订单信息将永久丢失。
- 存在无法扩展问问题:这个内存队列只存在于单个应用实例中。无法通过增加服务器来提高订单处理速度。
- V2.1:引入RabbitMQ实现可靠的异步下单
BlockingQueue: 是JUC包下的一个接口,代表一个线程安全的队列,核心特性是阻塞(当生产者尝试从已满的队列中put队列/当消费者尝试从已空的队列中take元素),极简极快(没有网络开销和外部依赖,纯内存操作),但是无持久化,数据易丢失、无共享能力,无法扩展。
Kafka: 现在演变成一个成熟的、分布式的流处理平台。是一个分布式提交日志。拥有极致的吞吐量和极强的水平扩展能力,数据可回溯能力和流处理生态。但是延迟相对较高,功能相对单一,不支持复杂的路由,运维复杂。
RabbitMQ: 一个成熟、稳定、功能极其丰富的消息中间件,是AMQP最著名的实现者。可靠性高、路由灵活、功能全面。但是存在性能瓶颈(为了保证消息投递的可靠性和灵活性)和语言依赖。
RocketMQ: 是阿里巴巴开源的消息中间件,为了应对极端高并发的电商和金融场景。在模型上借鉴了kafka,但又针对业务场景做了很多优化。具有极高的吞吐量和稳定性,有丰富的业务特性,运维友好。 - RabbitMQ能解决什么问题?
环境准备与集成
- 使用Docker运行RabbitMQ:
- 在终端中运行以下命令,启动一个RabbitMQ服务器,并且会有一个方便的管理后台界面。
1
docker run -d --name seckill-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
- -p 5672:5672 :这是RabbitMQ服务本身通信的端口
- -p 15672:15672 : 这回事管理后台的Web界面端口
- 访问管理后台:
- 项目集成
- 添加Maven依赖,在pom.xml文件中添加,保存并让Maven重新加载依赖:
1
2
3
4<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency> - 配置连接,在application.properties文件中添加:
1
2
3
4
5# ================= RabbitMQ Configuration =================
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
改写代码
创建RabbitMQ配置类,定义好要使用的队列
1
2
3
4
5
6
7
8
9
10
11
12import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class RabbitMQConfig {
public Queue seckillOrderQueue() {
// durable(true) 表示队列是持久化的,即使RabbitMQ重启也不会丢失
return new Queue("seckill.order.queue", true);
}
}修改SeckillService.java文件(生产者),改造processSeckill方法,使之不在将订单信息put到内存队列,而是发送给RabbitMQ。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34// ...
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class SeckillService {
// 【移除】不再需要内存队列了
// private BlockingQueue<SeckillOrder> orderQueue = ...
private RabbitTemplate rabbitTemplate; // 注入 RabbitMQ 的操作模板
private void executeSeckill(Long productId, Long userId) {
// ... 执行 Lua 脚本 ...
Long result = redisTemplate.execute(seckillScript, keys, userId.toString());
if (result == 0) {
log.info("用户 {} 在Redis中秒杀成功,准备发送订单消息...", userId);
// 【核心改动】
// 1. 生成订单对象
SeckillOrder order = new SeckillOrder();
order.setUserId(userId);
order.setProductId(productId);
// ...
// 2. 将订单对象作为消息,发送到指定的队列
rabbitTemplate.convertAndSend("seckill.order.queue", order);
} else {
// ... 其他失败逻辑 ...
}
}
}RabbitTemplate是Spring AMQP提供的一个专门用来发送消息的高级工具类。当我调用
rabbitTemplate.convertAndSend("queue", order)时,它做了:- 从连接池获取一个到RabbitMQ的connection,创建一个通信通道,调用我们配置的转换器将SeckillOrder这个Java对象转换成Json字符,再把这个JSON字符串打包成一个符合AMQP规范的Message对象,发送给RabbitMQ Broker,处理连接和信道的关闭和复用,如果发送失败会进行重试或抛出统一的Spring异常。
修改OrderConsumerService.java(消费者),使之能监听RabbitMQ的队列,而不是从内存队列中取数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
public class OrderConsumerService {
// 【移除】不再需要 @PostConstruct 和手动创建的 new Thread()
private SeckillOrderRepository orderRepository;
private ProductRepository productRepository;
/**
* 【核心改动】
* 使用 @RabbitListener 注解来监听指定的队列
* Spring AMQP 会自动为我们处理消息的接收、反序列化等工作
* @param order 从队列中接收到的订单对象
*/
// 数据库操作依然需要事务保护
public void createOrderInDb(SeckillOrder order) {
try {
log.info("从RabbitMQ接收到订单消息,准备创建订单: {}", order);
orderRepository.save(order);
int result = productRepository.deductStock(order.getProductId());
if (result == 0) {
throw new RuntimeException("MySQL库存扣减失败,订单回滚: " + order);
}
log.info("数据库订单创建成功");
} catch (Exception e) {
// 如果发生异常,Spring AMQP 默认会进行重试,
// 最终如果还是失败,消息会进入“死信队列”(需要额外配置)
// 这里我们先简单地打印错误日志
log.error("消费订单消息时发生异常: {}", order, e);
// 抛出异常,以便Spring AMQP知道处理失败
throw e;
}
}
}@RabbitListener是一个能将普通方法变成一个功能强大的消息消费者,当打上这个
@RabbitListener(queues = "seckill.order.queue")注解后,Spring AMQP就在后台启动了一个永不停止的监听程序。它负责:- 连接到RabbitMQ并订阅seckill.order.queue 这个队列,不断检查队列中是否有新消息,当有新消息时,安全的把他取下来,调用转换器反序列化,将收到的JSON字符串转换回SeckillOrder对象。将转换好的对象作为参数调用写的 createOrderInDb 方法。如果方法成功执行没有抛出异常,它会自动告诉 RabbitMQ:“这个消息我已经处理好了,你可以把它删掉了。” 如果方法抛出异常,它会根据配置告诉 RabbitMQ:“处理失败了”,消息可能会被重试或放入“死信队列”。
只需要要写一个处理业务的普通方法,加上一个注解就拥有了一个可靠的后台消费者,无需手动编写任何循环或连接管理代码。
- 连接到RabbitMQ并订阅seckill.order.queue 这个队列,不断检查队列中是否有新消息,当有新消息时,安全的把他取下来,调用转换器反序列化,将收到的JSON字符串转换回SeckillOrder对象。将转换好的对象作为参数调用写的 createOrderInDb 方法。如果方法成功执行没有抛出异常,它会自动告诉 RabbitMQ:“这个消息我已经处理好了,你可以把它删掉了。” 如果方法抛出异常,它会根据配置告诉 RabbitMQ:“处理失败了”,消息可能会被重试或放入“死信队列”。
JMeter压测设计与结果分析
- 运行环境:Spring Boot应用(V2.1)、Redis、RabbitMQ、MYSQL服务都已正常启动。
- 观测工具:
- JMeter:用于发起并发请求。
- RabbitMQ管理后台:用于实时监控队列状态。
- 应用控制台日志:观察日志输出。
- 数据库客户端:最终验证数据一致性。
功能与性能回归测试
- 准备工作:测试前确保清空数据库、重置Redis(重新运行预热RedisPreheatService,确保Redis中的库存也是100,且已购用户Set为空),清空队列。
- JMeter配置:
- 线程组: 配置一个高并发的“写请求”线程组。
- 线程数: 500
- Ramp-up Period: 1
- 循环次数: 1
- 结果:
- 数据库信息:库存数没有减少,订单没有新建
- 日志信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
162025-10-06 19:21:21.866 ERROR 16988 --- [hread-859862295] c.e.s.demos.web.Service.SeckillService : 后台秒杀任务执行失败!productId=1, userId=117
java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: com.example.seckillsystem.demos.web.SeckillOrder
at org.springframework.amqp.support.converter.SimpleMessageConverter.createMessage(SimpleMessageConverter.java:164) ~[spring-amqp-2.4.7.jar:2.4.7]
at org.springframework.amqp.support.converter.AbstractMessageConverter.createMessage(AbstractMessageConverter.java:88) ~[spring-amqp-2.4.7.jar:2.4.7]
at org.springframework.amqp.support.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:70) ~[spring-amqp-2.4.7.jar:2.4.7]
at org.springframework.amqp.support.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:58) ~[spring-amqp-2.4.7.jar:2.4.7]
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertMessageIfNecessary(RabbitTemplate.java:1831) ~[spring-rabbit-2.4.7.jar:2.4.7]
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1137) ~[spring-rabbit-2.4.7.jar:2.4.7]
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1119) ~[spring-rabbit-2.4.7.jar:2.4.7]
at com.example.seckillsystem.demos.web.Service.SeckillService.processSeckill(SeckillService.java:116) ~[classes/:na]
at com.example.seckillsystem.demos.web.Service.SeckillService.lambda$submitSeckillOrder$0(SeckillService.java:67) ~[classes/:na]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
- 分析:
- 从日志信息中可以分析出,真正的错误原因是:非法参数异常,Spring AMQP的默认消息转换器只会打包和发送字符串、字节数组或者实现了Serializable接口的对象。但是现在给到它的是一个SeckillOrder对象,它不知道如何处理。
- AMQP(高级消息队列协议):是一套国际标准和行业规范,规定了消息应该如何打包、路由键怎么写、通过什么流程来投递,以及邮局之间应该如何交流。
- RabbitMQ(产品): 是一个具体的消息队列中间件软件,遵循AMQP这套准则的“物流公司”,工作是接收、存储、路由和投递消息。
- Spring AMQP(框架):是 Spring 为开发者提供的一套便利的工具集,它将操作 AMQP 兼容的消息队列(主要是 RabbitMQ)的复杂过程,简化成了几个简单的 Spring Bean 和注解。
- 当调用
rabbitTemplate.convertAndSend("queue", order)时,Spring AMQP需要将这个Java对象转化成可以在网络上传输的格式(序列化),但是默认的转化器能力非常有限,不知道如何处理自定义的SeckillOrder类。
- 解决:配置JSON序列化
- 修改
RabbitMQConfig.java文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RabbitMQConfig {
public Queue seckillOrderQueue() {
return new Queue("seckill.order.queue", true);
}
/**
* 【新增】
* 定义一个消息转换器 Bean,用于将消息序列化为 JSON 格式。
* Spring Boot 在检测到这个 Bean 后,会自动用它来替换掉默认的 SimpleMessageConverter。
*/
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
} - 这个配置向Spring容器中注册了一个
jackson2JsonMessageConverter实例,Spring AMQP 在初始化 RabbitTemplate 和 @RabbitListener 时,会自动检测到并使用这个更强大的转换器。它会负责将 SeckillOrder 对象自动序列化成 JSON 字符串再发送,并在消费端自动将 JSON 字符串反序列化回 SeckillOrder 对象。 - 修复后结果:日志信息正常,数据库信息正确。
学学八股
RabbitMQ
- 核心组件
- producer :发送消息的应用程序
- connection & channel :connection是物理TCP连接,channel是在connection内部建立的逻辑连接。大多数操作都会在channel上进行,这避免了频繁创建和销毁TCP连接的开销。
- Exchange :接受来自生产者的消息,并根据路由键和自身的类型,将消息路由到一个或多个队列。
- Queue : 存储消息的缓冲区
- Binding :连接Exchange和Queue的规则。
- Consumer :从队列中获取并处理消息的应用程序
- Exchange的类型-体现RabbitMQ的灵活路由能力
- Direct Exchange: 直连交换机;将消息精确的路由到Routing Key和Binding Key 完全匹配的队列。适用于定点投递的场景。
- Fanout Exchange: 扇形/广播交换机:忽略路由键,将接收到的消息广播到所有与它绑定的队列中。
- Topic Exchange: 主题交换机,允许Binding Key和Routing Key之间进行模式匹配。适用于非常灵活的订阅/发布模型。
- Headers Exchange: 头交换机,根据消息头中的headers进行匹配,性能较差。
- 如何保证消息不丢失?
- 生产者–>RabbitMQ Broker:采用发送方确认机制,当生产者发送消息后,可以异步等待Broker的确认回执。如果Broker没有成功接收,生产者可以进行重发或记录日志。
- RabbitMQ Broker自身:消息持久化、队列持久化、交换机持久化,保证即使RabbitMQ服务器重启,消息和队列结构也不会丢失。
- RabbitMQ Broker –> 消费者:消费方确认机制,消费者处理完消息后,手动向Broker发送ACK,如果消费者在处理途中崩溃,Broker会因为没有收到ACK而将该消息重新投递给其他消费者。
- 什么是死信队列?
- 本质上是一个普通的交换机。当一个队列中的消息因为某些原因变成“死信”时,可以被自动的重新投递到这个指定的交换机上,进而路由到死信队列。
- 消息变成死信:消息被消费者拒绝/消息在队列中的存活时间超过了TTL/队列的长度超过了最大限制。
- 应用场景:处理失败的消息、实现延迟队列。
- RabbitMQ的高可用方案
- 普通集群:多个节点组成一个集群,共享元数据。但队列中的消息内容默认只存在于一个节点上。如果该节点宕机,消息会丢失。
- 镜像队列:在集群的基础上,将一个队列设置为镜像模式。这样,度一列中的所有消息都会被完整的复制到集群的多个节点上。任何一个节点宕机,客户端都可以无缝切换到其他节点继续消费,保证了消息的高可用。
- 如何处理消息积压问题?
- 定位原因
- 消费者变慢:检查消费者应用的日志、CPU、内存、数据库连接等,查看是否是下游服务拖慢了消费速度。
- 生产者流量激增:检查上游服务的流量是否异常。
- 解决方案
- 紧急扩容:如果是消费者的处理能力不足,增加消费者实例。
- 优化消费逻辑:检查代码,看是否有可以优化的慢查询、不合理的用户逻辑
- 临时方案:将积压的消息临时转存到另一个“临时队列”中,让主队列恢复正常,再慢慢处理临时队列中的数据。
- 顺序消息如何实现?
- RabbitMQ自身不保证全局的消息顺序,如果要实现严格的顺序,需满足:
- 单一生产者、单一队列、单一消费者。
- 为了保证顺序,牺牲了消费端的并发处理能力,是一个性能和一致性的权衡。
