实现可靠的异步下单:引入RabbitMQ

  • 当前V2.0架构的痛点:用于异步下单的BlockingQueue是一个纯粹的Java内存队列
    • 存在数据丢失风险:如果Spring Boot应用因为任何原因(意外的bug/服务器重启/部署更新)而崩溃,所有还存放在这个内存队列中、尚未被处理的订单信息将永久丢失。
    • 存在无法扩展问问题:这个内存队列只存在于单个应用实例中。无法通过增加服务器来提高订单处理速度。
  • V2.1:引入RabbitMQ实现可靠的异步下单

    BlockingQueue: 是JUC包下的一个接口,代表一个线程安全的队列,核心特性是阻塞(当生产者尝试从已满的队列中put队列/当消费者尝试从已空的队列中take元素),极简极快(没有网络开销和外部依赖,纯内存操作),但是无持久化,数据易丢失、无共享能力,无法扩展。
    Kafka: 现在演变成一个成熟的、分布式的流处理平台。是一个分布式提交日志。拥有极致的吞吐量和极强的水平扩展能力,数据可回溯能力和流处理生态。但是延迟相对较高,功能相对单一,不支持复杂的路由,运维复杂。
    RabbitMQ: 一个成熟、稳定、功能极其丰富的消息中间件,是AMQP最著名的实现者。可靠性高、路由灵活、功能全面。但是存在性能瓶颈(为了保证消息投递的可靠性和灵活性)和语言依赖。
    RocketMQ: 是阿里巴巴开源的消息中间件,为了应对极端高并发的电商和金融场景。在模型上借鉴了kafka,但又针对业务场景做了很多优化。具有极高的吞吐量和稳定性,有丰富的业务特性,运维友好。

  • RabbitMQ能解决什么问题?

环境准备与集成

  1. 使用Docker运行RabbitMQ:
  • 在终端中运行以下命令,启动一个RabbitMQ服务器,并且会有一个方便的管理后台界面。
    1
    docker run -d --name seckill-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
  • -p 5672:5672 :这是RabbitMQ服务本身通信的端口
  • -p 15672:15672 : 这回事管理后台的Web界面端口
  1. 访问管理后台:
  1. 项目集成
  • 添加Maven依赖,在pom.xml文件中添加,保存并让Maven重新加载依赖:
    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 配置连接,在application.properties文件中添加:
    1
    2
    3
    4
    5
    # ================= RabbitMQ Configuration =================
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest

改写代码

  1. 创建RabbitMQ配置类,定义好要使用的队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    @Configuration
    public class RabbitMQConfig {
    @Bean
    public Queue seckillOrderQueue() {
    // durable(true) 表示队列是持久化的,即使RabbitMQ重启也不会丢失
    return new Queue("seckill.order.queue", true);
    }
    }
  2. 修改SeckillService.java文件(生产者),改造processSeckill方法,使之不在将订单信息put到内存队列,而是发送给RabbitMQ。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    // ...
    import org.springframework.amqp.rabbit.core.RabbitTemplate;

    @Service
    public class SeckillService {

    // 【移除】不再需要内存队列了
    // private BlockingQueue<SeckillOrder> orderQueue = ...

    @Autowired
    private RabbitTemplate rabbitTemplate; // 注入 RabbitMQ 的操作模板

    private void executeSeckill(Long productId, Long userId) {
    // ... 执行 Lua 脚本 ...
    Long result = redisTemplate.execute(seckillScript, keys, userId.toString());

    if (result == 0) {
    log.info("用户 {} 在Redis中秒杀成功,准备发送订单消息...", userId);

    // 【核心改动】
    // 1. 生成订单对象
    SeckillOrder order = new SeckillOrder();
    order.setUserId(userId);
    order.setProductId(productId);
    // ...

    // 2. 将订单对象作为消息,发送到指定的队列
    rabbitTemplate.convertAndSend("seckill.order.queue", order);

    } else {
    // ... 其他失败逻辑 ...
    }
    }
    }

    RabbitTemplate是Spring AMQP提供的一个专门用来发送消息的高级工具类。当我调用rabbitTemplate.convertAndSend("queue", order)时,它做了:

    • 从连接池获取一个到RabbitMQ的connection,创建一个通信通道,调用我们配置的转换器将SeckillOrder这个Java对象转换成Json字符,再把这个JSON字符串打包成一个符合AMQP规范的Message对象,发送给RabbitMQ Broker,处理连接和信道的关闭和复用,如果发送失败会进行重试或抛出统一的Spring异常。
  3. 修改OrderConsumerService.java(消费者),使之能监听RabbitMQ的队列,而不是从内存队列中取数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;

    @Service
    public class OrderConsumerService {

    // 【移除】不再需要 @PostConstruct 和手动创建的 new Thread()

    @Autowired
    private SeckillOrderRepository orderRepository;

    @Autowired
    private ProductRepository productRepository;

    /**
    * 【核心改动】
    * 使用 @RabbitListener 注解来监听指定的队列
    * Spring AMQP 会自动为我们处理消息的接收、反序列化等工作
    * @param order 从队列中接收到的订单对象
    */
    @RabbitListener(queues = "seckill.order.queue")
    @Transactional // 数据库操作依然需要事务保护
    public void createOrderInDb(SeckillOrder order) {
    try {
    log.info("从RabbitMQ接收到订单消息,准备创建订单: {}", order);

    orderRepository.save(order);

    int result = productRepository.deductStock(order.getProductId());
    if (result == 0) {
    throw new RuntimeException("MySQL库存扣减失败,订单回滚: " + order);
    }

    log.info("数据库订单创建成功");
    } catch (Exception e) {
    // 如果发生异常,Spring AMQP 默认会进行重试,
    // 最终如果还是失败,消息会进入“死信队列”(需要额外配置)
    // 这里我们先简单地打印错误日志
    log.error("消费订单消息时发生异常: {}", order, e);
    // 抛出异常,以便Spring AMQP知道处理失败
    throw e;
    }
    }
    }

    @RabbitListener是一个能将普通方法变成一个功能强大的消息消费者,当打上这个@RabbitListener(queues = "seckill.order.queue")注解后,Spring AMQP就在后台启动了一个永不停止的监听程序。它负责:

    • 连接到RabbitMQ并订阅seckill.order.queue 这个队列,不断检查队列中是否有新消息,当有新消息时,安全的把他取下来,调用转换器反序列化,将收到的JSON字符串转换回SeckillOrder对象。将转换好的对象作为参数调用写的 createOrderInDb 方法。如果方法成功执行没有抛出异常,它会自动告诉 RabbitMQ:“这个消息我已经处理好了,你可以把它删掉了。” 如果方法抛出异常,它会根据配置告诉 RabbitMQ:“处理失败了”,消息可能会被重试或放入“死信队列”。
      只需要要写一个处理业务的普通方法,加上一个注解就拥有了一个可靠的后台消费者,无需手动编写任何循环或连接管理代码。

JMeter压测设计与结果分析

  • 运行环境:Spring Boot应用(V2.1)、Redis、RabbitMQ、MYSQL服务都已正常启动。
  • 观测工具:
    • JMeter:用于发起并发请求。
    • RabbitMQ管理后台:用于实时监控队列状态。
    • 应用控制台日志:观察日志输出。
    • 数据库客户端:最终验证数据一致性。

功能与性能回归测试

  1. 准备工作:测试前确保清空数据库、重置Redis(重新运行预热RedisPreheatService,确保Redis中的库存也是100,且已购用户Set为空),清空队列。
  2. JMeter配置:
  • 线程组: 配置一个高并发的“写请求”线程组。
  • 线程数: 500
  • Ramp-up Period: 1
  • 循环次数: 1
  1. 结果:
  • 数据库信息:库存数没有减少,订单没有新建
  • 日志信息:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    2025-10-06 19:21:21.866 ERROR 16988 --- [hread-859862295] c.e.s.demos.web.Service.SeckillService   : 后台秒杀任务执行失败!productId=1, userId=117
    java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: com.example.seckillsystem.demos.web.SeckillOrder
    at org.springframework.amqp.support.converter.SimpleMessageConverter.createMessage(SimpleMessageConverter.java:164) ~[spring-amqp-2.4.7.jar:2.4.7]
    at org.springframework.amqp.support.converter.AbstractMessageConverter.createMessage(AbstractMessageConverter.java:88) ~[spring-amqp-2.4.7.jar:2.4.7]
    at org.springframework.amqp.support.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:70) ~[spring-amqp-2.4.7.jar:2.4.7]
    at org.springframework.amqp.support.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:58) ~[spring-amqp-2.4.7.jar:2.4.7]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertMessageIfNecessary(RabbitTemplate.java:1831) ~[spring-rabbit-2.4.7.jar:2.4.7]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1137) ~[spring-rabbit-2.4.7.jar:2.4.7]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1119) ~[spring-rabbit-2.4.7.jar:2.4.7]
    at com.example.seckillsystem.demos.web.Service.SeckillService.processSeckill(SeckillService.java:116) ~[classes/:na]
    at com.example.seckillsystem.demos.web.Service.SeckillService.lambda$submitSeckillOrder$0(SeckillService.java:67) ~[classes/:na]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
  1. 分析:
  • 从日志信息中可以分析出,真正的错误原因是:非法参数异常,Spring AMQP的默认消息转换器只会打包和发送字符串、字节数组或者实现了Serializable接口的对象。但是现在给到它的是一个SeckillOrder对象,它不知道如何处理。
    1. AMQP(高级消息队列协议):是一套国际标准和行业规范,规定了消息应该如何打包、路由键怎么写、通过什么流程来投递,以及邮局之间应该如何交流。
    2. RabbitMQ(产品): 是一个具体的消息队列中间件软件,遵循AMQP这套准则的“物流公司”,工作是接收、存储、路由和投递消息。
    3. Spring AMQP(框架):是 Spring 为开发者提供的一套便利的工具集,它将操作 AMQP 兼容的消息队列(主要是 RabbitMQ)的复杂过程,简化成了几个简单的 Spring Bean 和注解。
  • 当调用rabbitTemplate.convertAndSend("queue", order)时,Spring AMQP需要将这个Java对象转化成可以在网络上传输的格式(序列化),但是默认的转化器能力非常有限,不知道如何处理自定义的SeckillOrder类。
  1. 解决:配置JSON序列化
  • 修改RabbitMQConfig.java文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @Configuration
    public class RabbitMQConfig {

    @Bean
    public Queue seckillOrderQueue() {
    return new Queue("seckill.order.queue", true);
    }

    /**
    * 【新增】
    * 定义一个消息转换器 Bean,用于将消息序列化为 JSON 格式。
    * Spring Boot 在检测到这个 Bean 后,会自动用它来替换掉默认的 SimpleMessageConverter。
    */
    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
    }
    }
  • 这个配置向Spring容器中注册了一个jackson2JsonMessageConverter实例,Spring AMQP 在初始化 RabbitTemplate 和 @RabbitListener 时,会自动检测到并使用这个更强大的转换器。它会负责将 SeckillOrder 对象自动序列化成 JSON 字符串再发送,并在消费端自动将 JSON 字符串反序列化回 SeckillOrder 对象。
  • 修复后结果:日志信息正常,数据库信息正确。

学学八股

RabbitMQ

  1. 核心组件
  • producer :发送消息的应用程序
  • connection & channel :connection是物理TCP连接,channel是在connection内部建立的逻辑连接。大多数操作都会在channel上进行,这避免了频繁创建和销毁TCP连接的开销。
  • Exchange :接受来自生产者的消息,并根据路由键和自身的类型,将消息路由到一个或多个队列。
  • Queue : 存储消息的缓冲区
  • Binding :连接Exchange和Queue的规则。
  • Consumer :从队列中获取并处理消息的应用程序
  1. Exchange的类型-体现RabbitMQ的灵活路由能力
  • Direct Exchange: 直连交换机;将消息精确的路由到Routing Key和Binding Key 完全匹配的队列。适用于定点投递的场景。
  • Fanout Exchange: 扇形/广播交换机:忽略路由键,将接收到的消息广播到所有与它绑定的队列中。
  • Topic Exchange: 主题交换机,允许Binding Key和Routing Key之间进行模式匹配。适用于非常灵活的订阅/发布模型。
  • Headers Exchange: 头交换机,根据消息头中的headers进行匹配,性能较差。
  1. 如何保证消息不丢失?
  • 生产者–>RabbitMQ Broker:采用发送方确认机制,当生产者发送消息后,可以异步等待Broker的确认回执。如果Broker没有成功接收,生产者可以进行重发或记录日志。
  • RabbitMQ Broker自身:消息持久化、队列持久化、交换机持久化,保证即使RabbitMQ服务器重启,消息和队列结构也不会丢失。
  • RabbitMQ Broker –> 消费者:消费方确认机制,消费者处理完消息后,手动向Broker发送ACK,如果消费者在处理途中崩溃,Broker会因为没有收到ACK而将该消息重新投递给其他消费者。
  1. 什么是死信队列?
  • 本质上是一个普通的交换机。当一个队列中的消息因为某些原因变成“死信”时,可以被自动的重新投递到这个指定的交换机上,进而路由到死信队列。
  • 消息变成死信:消息被消费者拒绝/消息在队列中的存活时间超过了TTL/队列的长度超过了最大限制。
  • 应用场景:处理失败的消息、实现延迟队列。
  1. RabbitMQ的高可用方案
  • 普通集群:多个节点组成一个集群,共享元数据。但队列中的消息内容默认只存在于一个节点上。如果该节点宕机,消息会丢失。
  • 镜像队列:在集群的基础上,将一个队列设置为镜像模式。这样,度一列中的所有消息都会被完整的复制到集群的多个节点上。任何一个节点宕机,客户端都可以无缝切换到其他节点继续消费,保证了消息的高可用。
  1. 如何处理消息积压问题?
  • 定位原因
    • 消费者变慢:检查消费者应用的日志、CPU、内存、数据库连接等,查看是否是下游服务拖慢了消费速度。
    • 生产者流量激增:检查上游服务的流量是否异常。
  • 解决方案
    • 紧急扩容:如果是消费者的处理能力不足,增加消费者实例。
    • 优化消费逻辑:检查代码,看是否有可以优化的慢查询、不合理的用户逻辑
    • 临时方案:将积压的消息临时转存到另一个“临时队列”中,让主队列恢复正常,再慢慢处理临时队列中的数据。
  1. 顺序消息如何实现?
  • RabbitMQ自身不保证全局的消息顺序,如果要实现严格的顺序,需满足:
    • 单一生产者、单一队列、单一消费者。
    • 为了保证顺序,牺牲了消费端的并发处理能力,是一个性能和一致性的权衡。

本站由 Xylumina 使用 Stellar 1.30.0 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。

本"页面"访问 次 | 总访问 次 | 总访客