架构分层-漏洞
| 架构分层 | 技术实现 (V4.3) | 本层目标 (已达成) | 存在的漏洞 |
|---|---|---|---|
| 接入层 | Spring Cloud Gateway (本地, :80) |
1. 智能路由 (从Nacos动态发现服务) 2. 统一鉴权 (JWT全局过滤器) 3. 自动负载均衡 (验证通过) |
漏洞A: 限流策略薄弱 我们的Nginx限流(V3.1)已被移除。Gateway 层暂未配置任何专业的限流(如 Resilience4j 或 Redis 限流),无法抵御恶意攻击。 |
| 流量削峰层 | RabbitMQ (Docker) |
1. 异步解耦 (api 与 order 分离) 2. 消费者并发 (通过 SimpleRabbit...Factory 配置) |
漏洞B: 消息可靠性黑洞 1. 没有死信队列 (DLQ):任何处理失败(业务或系统异常)的消息,最终都会被丢弃,导致订单永久丢失。 2. 没有用户反馈:用户只知道“排队中”,但不知道排了多久、排在哪里。 |
| 业务逻辑层 | seckill-api, order-service (本地) Nacos (Docker) Resilience4j (熔断器) |
1. 服务拆分 (职责分离) 2. 服务治理 (Nacos 注册/发现) 3. 服务容错 (熔断器 dbWrite 已验证) |
漏洞C: 缓存策略缺失 1. 缓存穿透:黑客可用不存在的 productId 刷爆我们的 RedisPreheatService 和数据库。 2. 缓存击穿/雪崩: RedisPreheatService 在启动时全量加载,但没有“懒加载”和“分布式锁”机制,如果缓存失效,数据库会被打垮。 3. 无本地缓存:所有资格校验都直达 Redis,网络开销巨大。 |
| 数据层 | Redis (单点, Docker) MySQL (单点, 本地) |
1. Redis 原子扣库存 (Lua 脚本) 2. MySQL 最终一致性 (唯一索引) |
漏洞D: 性能的“天花板” 1. Redis 锁竞争:所有请求都在争抢同一个商品库存 Key ( seckill:stock:1),这是系统真正的性能瓶颈。 2. DB 单点:所有订单都写入单库单表,这是未来的扩展性瓶颈。 |
Redis库存分片
- 在V2.0中引入了 Redis+Lua 脚本,将“校验+扣库存”变为原子操作。但是由于Redis是单线程的,所以Lua脚本会锁住Redis。当10万个请求同时抢购同一个商品时,它们在seckill-api中是并发的,但是到了Redis层,它们被迫排成一队,一个一个串行执行Lua脚本。系统的性能瓶颈就在这个Lua脚本的执行效率上。如果不解决这个问题,那系统将无法水平扩展,seckill-api增加的每一台服务器,都会全部卡死在同一个资源的锁竞争上。
- 采用Redis库存分片
- 通过将1000个库存分装到10个不同的key中,将对一个key的全局写锁,分散成了对10个不同key的局部写锁。
- 当10万个请求同时涌入时,它不再是排成1队,而是被随机地分流到10个不同的队列中。这使得Redis的单线程CPU可以在不同key的操作上快速切换,系统的并发吞吐能力理论上提升了10倍。
- 采用布隆
配置代码
- 修改 resources/scripts/seckill.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46-- V5.x "库存分片" 版 Lua 脚本
--
-- KEYS[1]: 商品库存Key前缀 (e.g., "seckill:stock:")
-- KEYS[2]: 用户集合Key前缀 (e.g., "seckill:users:")
--
-- ARGV[1]: 用户ID (e.g., "123")
-- ARGV[2]: 商品ID (e.g., "1")
-- ARGV[3]: 随机分片桶序号 (e.g., "7")
--
-- 1. 构造所有需要的 Key
local productId = ARGV[2]
local userId = ARGV[1]
local bucketIndex = ARGV[3]
local totalStockKey = KEYS[1] .. productId .. ":total"
local stockBucketKey = KEYS[1] .. productId .. ":bucket_" .. bucketIndex
local userSetKey = KEYS[2] .. productId
-- 2. 检查总库存 (快速失败)
-- (这里假设 key 存在,由懒加载保证)
local totalStock = tonumber(redis.call("GET", totalStockKey))
if (not totalStock or totalStock <= 0) then
return 2 -- 2 代表【真的】已售罄
end
-- 3. 检查用户是否已购买
if (redis.call("SISMEMBER", userSetKey, userId) == 1) then
return 1 -- 1 代表重复购买
end
-- 4. 【核心】使用 DECR 原子地扣减,并检查结果
local stock = tonumber(redis.call("DECR", stockBucketKey))
if (stock < 0) then
-- 这个桶被扣成了负数 (超卖了),说明这个桶没库存了
-- 需要把库存“还回去”,并返回一个新错误码
redis.call("INCR", stockBucketKey) -- 把库存加回来 (变为 0)
return 3 -- 3 代表【这个桶空了,请重试】
end
-- 5. 扣减总库存
redis.call("DECR", totalStockKey)
redis.call("SADD", userSetKey, userId)
return 0 -- 0 代表成功
缓存策略-防穿透、击穿、雪崩
- 在V4.x版本中,seckill-api启动时,使用 RedisPreheatService(CommandLineRunner)全量预热库存。
- 这会导致:
- 启动即崩溃:如果秒杀商品有100万个,RedisPreheatService 会在启动时循环100万次,这个过程可能长达几十分钟,导致 Spring Boot 应用启动超时,在 K8s 等生产环境中会被“健康检查”反复杀死。
- 缓存穿透:如果使用不存在的productId来攻击,导致请求穿透到Redis,甚至Lua脚本,空耗资源。
- 缓存击穿:如果热点key恰好过期,在过期的瞬间,成千上万的请求会同时未命中缓存,然后去查询数据库,引发数据库雪崩。
- 缓存雪崩:如果所有的商品 Key 都在同一时间(比如凌晨0点)过期,也会导致“数据库雪崩”。
- 所以,重构缓存策略:
- 防穿透:用BloomFilter拦截不存在的key;用缓存空值防止少量漏网之鱼(BloomFilter的误判率)反复穿透。
- 防击穿: 用Redisson分布式锁,确保在热点key失效的瞬间,只有一个线程能够回源数据库,其他线程原地等待。
- 防雪崩:在懒加载写入缓存时,为每个key设置一个随机化的过期时间,打散过期时间,防止集体阵亡。
- 三级缓存:
- L1(Caffeine):JVM堆内存,最快(纳秒级),用于缓存极热的,不常变的元数据(如商品信息)。
- L2(Redis):分布式缓存,快(毫秒级),用于缓存高并发读写的交易数据(如库存)。
- L3(MySQL):磁盘,慢(几十毫秒级),作为最终数据来源。
代码配置
- 在 pom.xml中增加相关依赖
- 在 application.properties中配置本地缓存caffeine
1
2
3
4
5
6
7
8
9
10
11
12
13
14# ================== V5.2 Cache Configuration ==================
# 【核心】1. 告诉 Spring Boot 我们选择 Caffeine 作为 @Cacheable 的实现
spring.cache.type=caffeine
# 【核心】2. 注册一个名叫 'product' 的缓存空间
spring.cache.cache-names=product
# 【核心】3. 配置这个缓存空间的规格(可选,但推荐)
# 最多存储 500 个商品, 写入 10 分钟后自动过期
spring.cache.caffeine.spec=maximumSize=500,expireAfterWrite=10m
# ================== V5.2 Redisson Configuration ==================
# (Redisson 会自动从 spring.redis.* 中读取配置,无需额外配置) - 修改 service/RedisPreheatService.java
- 不再预热库存,只负责预热布隆过滤器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34/**
* V5.2: 这个服务不再负责预热 Redis 库存(改为懒加载)
* 它唯一的职责是预热“布隆过滤器”
*/
public class RedisPreheatService implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(RedisPreheatService.class);
private ProductRepository productRepository;
private BloomFilter<Long> productBloomFilter; // 注入 CacheConfig 中创建的 Bean
public void run(String... args) throws Exception {
log.info("=========================================");
log.info("V5.2 缓存预热任务开始...");
List<Product> products = productRepository.findAll();
if (products.isEmpty()) {
log.warn("【布隆过滤器】没有找到商品,过滤器未预热。");
return;
}
for (Product product : products) {
productBloomFilter.put(product.getId()); // 将每一个合法的 productId 放入过滤器
}
log.info("【布隆过滤器】预热完毕。共加载 {} 个商品ID。", products.size());
log.info("=========================================");
}
}
- 不再预热库存,只负责预热布隆过滤器。
- 新建 config/CacheConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 【关键】启用 Spring 的 @Cacheable 功能
public class CacheConfig {
private static final Logger log = LoggerFactory.getLogger(CacheConfig.class);
private ProductRepository productRepository; // 注入 DB 仓储
/**
* 【V5.2 - 防穿透】
* 创建一个布隆过滤器 Bean,并在应用启动时填充它
*/
public BloomFilter<Long> productBloomFilter() {
// 1. 创建过滤器
// 预期插入 10000 个商品ID, 期望误判率 0.01
BloomFilter<Long> bloomFilter = BloomFilter.create(Funnels.longFunnel(), 10000, 0.01);
// 2. 启动时填充数据
log.info("【布隆过滤器】开始加载商品ID...");
List<Product> products = productRepository.findAll();
for (Product product : products) {
bloomFilter.put(product.getId());
}
log.info("【布隆过滤器】预热完毕,共加载 {} 个商品ID。", products.size());
return bloomFilter;
}
} - 修改 config/RedisConfig.java
- 配置 正确的 RedisTemplate 序列化器
- 加载 配置Redis分片后的Lua脚本
- 配置 MQ消息转化器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class RedisConfig {
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
StringRedisSerializer stringSerializer = new StringRedisSerializer();
// 【V5.2 核心】
// 使用 GenericToStringSerializer 来序列化 Value
// 它会智能地调用 .toString(),将 Long 1000 存为纯字符串 "1000"
// 这样 Lua 的 tonumber("1000") 就能正确解析
GenericToStringSerializer<Object> toStringSerializer = new GenericToStringSerializer<>(Object.class);
// Key 和 HashKey 使用 String 序列化
template.setKeySerializer(stringSerializer);
template.setHashKeySerializer(stringSerializer);
// Value 和 HashValue 使用“转String”序列化
template.setValueSerializer(toStringSerializer);
template.setHashValueSerializer(toStringSerializer);
template.afterPropertiesSet();
return template;
}
/**
* 加载“库存分片”版 Lua 脚本
* @return
*/
// 命名为 "seckillScriptV5"
public DefaultRedisScript<Long> seckillScriptV5() {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("scripts/seckill.lua")));
redisScript.setResultType(Long.class);
return redisScript;
}
// RabbitMQ 消息转换器 (保持不变)
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
- 修改 controller/SeckillController.java
- 负责第一道防线:布隆过滤器拦截
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SeckillController {
private static final Logger log = LoggerFactory.getLogger(SeckillController.class);
private SeckillService seckillService;
private BloomFilter<Long> productBloomFilter; // 【V5.2 新增】注入布隆过滤器
public ResponseEntity<String> submitSeckillOrder( Long productId,
Long userId) {
// 【V5.2 核心改动:防穿透】
if (!productBloomFilter.mightContain(productId)) {
log.warn("【防穿透】布隆过滤器拦截到不存在的商品ID: {}", productId);
return ResponseEntity.badRequest().body("商品不存在或活动未开始");
}
// 通过布隆过滤器后,才进入核心秒杀逻辑
String result = seckillService.submitSeckillOrder(productId, userId);
// 根据结果返回不同的状态码
if (result.contains("排队")) {
return ResponseEntity.accepted().body(result); // 202 Accepted
} else if (result.contains("成功")) {
return ResponseEntity.ok(result); // 理论上不会走到这里,因为是异步
} else {
return ResponseEntity.badRequest().body(result); // 400 Bad Request
}
}
// ... (你其他的接口,比如 /test-feign) ...
}
- 负责第一道防线:布隆过滤器拦截
- 关键:重构 service/SeckillService.java
- 融合分片、懒加载、三级缓存的核心
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
public class SeckillService {
private static final Logger log = LoggerFactory.getLogger(SeckillService.class);
private ExecutorService seckillExecutorService;
private RedisTemplate<String, Object> redisTemplate;
private RabbitTemplate rabbitTemplate;
private ProductRepository productRepository;
private RedissonClient redissonClient;
private SeckillService self;
private DefaultRedisScript<Long> seckillScript;
private static final Random random = new Random();
// --- Key 常量 ---
public static final String STOCK_KEY_PREFIX = "seckill:stock:";
public static final String USER_SET_KEY_PREFIX = "seckill:users:";
public static final String LOCK_KEY_PREFIX = "lock:product:";
public static final int BUCKET_COUNT = 10;
public static final String TOTAL_STOCK_KEY_SUFFIX = ":total";
public static final String BUCKET_KEY_SUFFIX = ":bucket_";
public static final int BUCKET_RETRY_COUNT = 3; // 桶重试次数
/**
* Controller 调用的入口方法
*/
public String submitSeckillOrder(Long productId, Long userId) {
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
try {
log.info("[{}] 收到任务,开始执行 V5.2 缓存检查...", threadName);
// 1. 确保缓存就绪(懒加载 + 防击穿)
boolean cacheReady = ensureStockCacheIsReady(productId, threadName);
if (!cacheReady) {
log.warn("[{}] 商品 {} 缓存未就绪或加载锁超时,拒绝本次秒杀请求。", threadName, productId);
return;
}
// 2. 执行秒杀(库存分片 + 重试)
executeSeckill(productId, userId, threadName);
} catch (Exception e) {
log.error("[{}] 后台秒杀任务执行失败!productId={}, userId={}", threadName, productId, userId, e);
}
};
try {
seckillExecutorService.submit(task);
} catch (RejectedExecutionException e) {
log.warn("线程池繁忙,任务被拒绝。 productId={}, userId={}", productId, userId);
return "当前活动过于火爆,请稍后重试!";
}
return "请求已接收,正在排队处理中...";
}
/**
* 【V5.2】检查并加载库存缓存(防击穿、防雪崩)
*/
private boolean ensureStockCacheIsReady(Long productId, String threadName) {
String totalStockKey = STOCK_KEY_PREFIX + productId + TOTAL_STOCK_KEY_SUFFIX;
// 1. 检查 Redis 缓存
log.info("[{}] 检查 L2 (Redis) 缓存: {}", threadName, totalStockKey);
Object stockInRedis = redisTemplate.opsForValue().get(totalStockKey);
if (stockInRedis != null) {
log.info("[{}] L2 (Redis) 缓存命中!缓存就绪。", threadName);
return true;
}
// 2. 缓存未命中,获取分布式锁
String lockKey = LOCK_KEY_PREFIX + productId;
log.warn("[{}] L2 (Redis) 缓存未命中。准备获取 Redisson 分布式锁: {}", threadName, lockKey);
RLock lock = redissonClient.getLock(lockKey);
try {
if (lock.tryLock(3, 10, TimeUnit.SECONDS)) {
log.info("[{}] 成功获取 Redisson 锁: {}", threadName, lockKey);
try {
// 3. DCL 再次检查 Redis
log.info("[{}] (持锁) DCL 检查 L2 (Redis) 缓存: {}", threadName, totalStockKey);
stockInRedis = redisTemplate.opsForValue().get(totalStockKey);
if (stockInRedis != null) {
log.info("[{}] (持锁) DCL 检查命中!缓存已被加载。", threadName);
return true;
}
// 4. L1 (Caffeine) 缓存回源
log.info("[{}] (持锁) L2 缓存确认不存在,尝试查询 L1 (Caffeine) 缓存...", threadName);
Product product = self.getProductFromCache(productId); // 调用 @Cacheable 方法
if (product == null) {
log.warn("[{}] (持锁) 数据库中未找到商品(ID:{}),缓存“空值”到 Redis 以防穿透。", threadName, productId);
redisTemplate.opsForValue().set(totalStockKey, -1, 5, TimeUnit.MINUTES);
return false;
}
log.info("[{}] 【数据库回源】成功从 DB/Caffeine 获取到 Product: {}", threadName, product.toString());
// 5. 加载缓存 & 防雪崩
long totalStock = product.getStock();
if (totalStock <= 0) {
log.warn("[{}] (持锁) 数据库中库存为 0,缓存“-1”值。", threadName);
redisTemplate.opsForValue().set(totalStockKey, -1, 5, TimeUnit.MINUTES);
return false;
}
log.info("[{}] (持锁) 成功从数据库获取商品,开始加载 L2 (Redis) 缓存...", threadName);
long stockPerBucket = totalStock / BUCKET_COUNT;
long remainder = totalStock % BUCKET_COUNT;
long ttl = 30 * 60 + random.nextInt(30 * 60); // 30-60分钟的随机过期时间
for (int i = 1; i <= BUCKET_COUNT; i++) {
long currentBucketStock = stockPerBucket;
if (i == BUCKET_COUNT) currentBucketStock += remainder;
redisTemplate.opsForValue().set(
STOCK_KEY_PREFIX + productId + BUCKET_KEY_SUFFIX + i,
currentBucketStock,
ttl, TimeUnit.SECONDS
);
}
redisTemplate.opsForValue().set(totalStockKey, totalStock, ttl, TimeUnit.SECONDS);
log.info("[{}] (持锁) 【防击穿】商品 {} 缓存已成功懒加载至 Redis, TTL: {}s", threadName, productId, ttl);
return true;
} finally {
log.info("[{}] 释放 Redisson 锁: {}", threadName, lockKey);
lock.unlock();
}
} else {
log.warn("[{}] 获取 Redisson 锁超时: {}", threadName, lockKey);
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("[{}] Redisson 锁等待被中断", threadName, e);
return false;
}
}
/**
* 【V5.2】L1 本地缓存层
*/
public Product getProductFromCache(Long productId) {
log.info("【三级缓存】L1 (Caffeine) 缓存未命中,穿透到 L2 (DB) 查询商品: {}", productId);
return productRepository.findById(productId).orElse(null);
}
/**
* 【V5.1】库存分片秒杀逻辑
*/
private void executeSeckill(Long productId, Long userId, String threadName) {
log.info("[{}] 缓存就绪,开始执行秒杀 Lua 脚本", threadName);
List<String> keys = Arrays.asList(
STOCK_KEY_PREFIX,
USER_SET_KEY_PREFIX
);
Long result = -1L; // 默认为失败
// 【V5.1 核心】增加桶重试
for (int i = 0; i < BUCKET_RETRY_COUNT; i++) {
int bucketIndex = random.nextInt(BUCKET_COUNT) + 1;
result = (Long) redisTemplate.execute(
seckillScript,
keys,
String.valueOf(userId),
String.valueOf(productId),
String.valueOf(bucketIndex)
);
if (result == null) {
log.error("[{}] Lua 脚本执行异常,终止重试。", threadName);
break;
}
if (result == 0) {
log.info("[{}] 用户 {} 在 桶: {} 中秒杀成功,发送订单消息...", threadName, userId, bucketIndex);
Map<String, Long> orderMessage = new HashMap<>();
orderMessage.put("productId", productId);
orderMessage.put("userId", userId);
// 【V5.3 核心】发送到交换机,而不是队列
rabbitTemplate.convertAndSend("seckill.exchange", "seckill.routing.key", orderMessage);
return;
}
if (result == 1) {
log.warn("[{}] 用户 {} 秒杀失败:重复下单", threadName, userId);
return;
}
if (result == 2) {
log.warn("[{}] 用户 {} 秒杀失败:总库存已售罄", threadName, userId);
return;
}
// result == 3, 桶已空,继续循环
log.warn("[{}] 用户 {} 秒杀失败:桶 {} 已空,正在尝试下一个桶...", threadName, userId, bucketIndex);
}
if (result == 3) {
log.warn("[{}] 用户 {} 尝试 {} 次后,仍未抢到(所有桶都为空),判定为库存不足。", threadName, userId, BUCKET_RETRY_COUNT);
}
}
}
- 融合分片、懒加载、三级缓存的核心
数据一致性-DLQ
- 在V4.x版本,任何处理失败的消息都会被消息队列直接丢弃,导致订单信息会因为一个简单地异常永久丢失,后果十分严重。
- 利用 RabbitMQ 的死信队列机制,为所有死亡的消息提供一个太平间,等待人工或补偿程序来处理,实现数据的最终可靠性。
代码配置
- 修改 config/RabbitMQConfig.java
- 声明所有交换机、队列,并建立“死信”绑定关系
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class RabbitMQConfig {
// --- 常量定义 ---
public static final String MAIN_EXCHANGE = "seckill.exchange";
public static final String MAIN_QUEUE = "seckill.order.queue";
public static final String MAIN_ROUTING_KEY = "seckill.routing.key";
public static final String DLX_EXCHANGE = "dlx.exchange";
public static final String DLQ_QUEUE = "dlq.seckill.order.queue";
public static final String DLQ_ROUTING_KEY = "dlq.routing.key";
// --- 消息转换器 (不变) ---
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
// --- 消费者并发工厂 (不变) ---
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// 并发配置已从 application.properties 自动加载
return factory;
}
// --- 【V5.3】声明主交换机 ---
public DirectExchange mainExchange() {
return new DirectExchange(MAIN_EXCHANGE);
}
// --- 【V5.3 核心】声明主队列,并“绑定”死信交换机 ---
public Queue mainQueue() {
return QueueBuilder.durable(MAIN_QUEUE) // 持久化
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE) // 声明死信交换机
.withArgument("x-dead-letter-routing-key", DLQ_ROUTING_KEY) // 声明死信的路由Key
.build();
}
// --- 【V5.3】将主队列绑定到主交换机 ---
public Binding mainBinding() {
return BindingBuilder.bind(mainQueue()).to(mainExchange()).with(MAIN_ROUTING_KEY);
}
// --- 【V5.3】声明死信交换机 ---
public DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE);
}
// --- 【V5.3】声明死信队列 ---
public Queue dlqQueue() {
return new Queue(DLQ_QUEUE); // 死信队列也是持久化的
}
// --- 【V5.3】将死信队列绑定到死信交换机 ---
public Binding dlxBinding() {
return BindingBuilder.bind(dlqQueue()).to(dlxExchange()).with(DLQ_ROUTING_KEY);
}
}
- 声明所有交换机、队列,并建立“死信”绑定关系
- 修改 consumer/OrderConsumerService.java
- 移除 try-catch,让所有异常都抛出,以便被DLQ捕获。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class OrderConsumerService {
private static final Logger log = LoggerFactory.getLogger(OrderConsumerService.class);
private SeckillOrderRepository orderRepository;
private ProductRepository productRepository;
private OrderConsumerService self; // 用于 AOP 自调用
// 临时的商品价格
private final Map<Long, BigDecimal> productPriceMap = Map.of(1L, new BigDecimal("1.00"));
/**
* 【V5.3 核心】第一层:消费者入口 & 熔断层
* 移除了所有 try-catch 块。
*/
public void receiveOrderMessage(Map<String, Long> orderMessage) {
log.info("从RabbitMQ接收到订单消息: {}", orderMessage);
// 直接调用带事务的内部方法
// 任何异常(业务的、系统的)都会从这里抛出
// 抛出后,@CircuitBreaker 会捕获它们
// 最终,Spring AMQP 也会捕获它们,并根据策略(requeue=false)将消息NACK并送入DLQ
self.createOrderInDb(orderMessage);
}
/**
* 【V5.3 核心】第二层:事务与业务逻辑层
* 负责抛出所有异常
*/
public void createOrderInDb(Map<String, Long> orderMessage) {
Long userId = orderMessage.get("userId");
Long productId = orderMessage.get("productId");
// 1. 业务异常:检查重复下单
if (orderRepository.findByUserIdAndProductId(userId, productId) != null) {
throw new SeckillBusinessException("您已秒杀过此商品");
}
// 2. 业务异常:扣减 MySQL 库存
int result = productRepository.deductStock(productId);
if (result == 0) {
throw new SeckillBusinessException("MySQL库存扣减失败或已售罄");
}
// 3. 业务异常:商品不存在
Product product = productRepository.findById(productId)
.orElseThrow(() -> new SeckillBusinessException("商品不存在: " + productId));
// 4. 创建订单
SeckillOrder order = new SeckillOrder();
order.setUserId(userId);
order.setProductId(productId);
order.setOrderPrice(product.getPrice());
orderRepository.save(order);
log.info("数据库订单创建成功,事务即将提交。");
}
/**
* 【V5.3 核心】降级方法
* 必须也抛出异常,才能触发 DLQ
*/
public void fallbackForCreateOrder(Map<String, Long> orderMessage, Throwable t) {
log.error("数据库写入熔断器已打开!执行降级逻辑。 订单: {}, 异常: {}", orderMessage, t.getMessage());
// 抛出异常,触发 DLQ
throw new RuntimeException("熔断器已打开,执行降级", t);
}
}
- 移除 try-catch,让所有异常都抛出,以便被DLQ捕获。
测试过程
- 一个不存在的productId


- 第一个请求(缓存冷启动)


- 正常用户的请求(“热”启动)

- 系统故障(MySQL故障)






