高级篇

思维导图

https://mcn804sjz49b.feishu.cn/docx/OzUEdkye9oIVNRxWuMIc34xOnjg?fromScene=spaceOverview&openbrd=1&doc_app_id=501&blockId=NE1sdsvQmop3xKxyTUHc4yOBnfg&blockType=whiteboard&blockToken=Litvwsw8bhLda7byVCcc0H5SnBc#NE1sdsvQmop3xKxyTUHc4yOBnfg

前言

在支付成功后利用RabbitMQ通知交易服务,更新业务订单状态为已支付。

大家思考一下,如果这里MQ通知失败,支付服务中支付流水显示支付成功,而交易服务中的订单状态却显示未支付,数据出现了不一致。

此时前端发送请求查询支付状态时,肯定是查询交易服务状态,会发现业务订单未支付,而用户自己知道已经支付成功,这就导致用户体验不一致。

因此,这里我们必须尽可能确保MQ消息的可靠性,即:消息应该至少被消费者处理1次

那么问题来了:

  • 我们该如何确保MQ消息的可靠性
  • 如果真的发送失败,有没有其它的兜底方案?

1.发送者的可靠性

如何确保消息从「生产者 ➜ MQ ➜ 消费者」整个链路上不丢失消息,这是 RabbitMQ 业务改造中非常核心的一环

首先,我们一起分析一下消息丢失的可能性有哪些。

消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:

img

消息从生产者到消费者的每一步都可能导致消息丢失:

  • 发送消息时丢失:
    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

img

综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

解决思路:确保三端可靠性

位置 对策
生产者 开启重试 + 消息确认机制
RabbitMQ 开启消息持久化 + 高可用部署(镜像队列)
消费者 ack 机制 + 异常处理

这一章我们先来看如何确保生产者一定能把消息发送到MQ。

1.1.生产者重试机制

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

修改publisher模块的application.yaml文件,添加下面的内容:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数

注意:

  • 这是同步阻塞式重试,期间线程会被阻塞,适合非高并发场景。
  • 建议高性能系统用异步方式手动重试(比如失败加入重试队列)。

我们利用命令停掉RabbitMQ服务:

1
docker stop mq

然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!

img

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

1.2.生产者确认机制

(Publisher Confirm确保消息到达服务器,Publisher Return处理路由失败的情况。)

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

具体如图所示:

img

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

img

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

img

默认两种机制都是关闭状态,需要通过配置文件来开启。

img

1.3.实现生产者确认

RabbitMQ 中实现 生产者消息确认机制(Publisher Confirm),确保消息可靠地发送到 MQ,避免因为路由失败、交换机不存在等问题导致消息“悄无声息地丢失”。

1.3.1.开启生产者确认

在publisher模块的application.yaml中添加配置:

1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制

img

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

1.3.2.定义ReturnCallback:消息无法路由时触发

img

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

img

img

img

内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.itheima.publisher.config;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;

@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}

关于@PostConstruct

img

img

1.3.3.定义ConfirmCallback:确认消息是否被 MQ 接收(ack/nack)

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。

confirm针对消息,每条消息都要定义一次,return机制在整个mq只需定义一次。

img

具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

img

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

img

img

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
void testPublisherConfirm() {
// 1.创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

img

img

img

img

img

注意事项:

  • ack: MQ 接收了消息
  • nack: MQ 拒收,可能是交换机错误等
  • SettableListenableFuture 提供异步回调机制

执行结果如下:

img

可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。

当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。

而如果连交换机都是错误的,则只会收到nack。

img

注意

开启生产者确认比较消耗MQ性能,一般不建议开启。

  • 建议在消息可靠性很重要时推荐使用(如支付、订单类)
  • Confirm 回调会影响性能(每条消息都要等 MQ 回执)
  • MQ内部故障,这种需要处理,但概率往往较低。一般情况只开启 ConfirmCallback 即可,监听 nack

img

2.MQ的可靠性

img

img

2.1.数据持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

img

我们以控制台界面为例来说明。

2.1.1.交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

img

设置为Durable就是持久化模式,Transient就是临时模式。

表示交换机会被保存到磁盘中,RabbitMQ 重启后仍然存在

2.1.2.队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

队列重启后依旧存在

img

除了持久化以外,你可以看到队列还有很多其它参数,有一些我们会在后期学习。

2.1.3.消息持久化

Spring AMQP 中:设置消息属性 deliveryMode = 2

否则即便交换机和队列持久化,消息本身依旧会丢失

img

mq发送消息,默认是持久化的,如果要发送非持久化的消息,要自定义构建器

img

img

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。(因为每次进行数据库操作时,需要建立数据库连接,假设建立连接是几百毫秒,在建立长连接之后再进行多次数据库操作,每次就只需要大概几十毫秒,所以批量持久化就比较节省性能)

一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

img

2.2.LazyQueue惰性队列

img

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为称为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。

img

使用惰性队列的性能更好的原因是接收到消息直接存入磁盘,避免了非持久化的情况下,内存满时需要向磁盘中Page out,此时mq不能接收消息。同时对磁盘的io进行了优化,使其效率更高

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载),动态监测消费者处理消息的速度,如果处理的比较慢,那么每次只需要从磁盘加载就可以。如果处理的的快,超过了磁盘加载的速度,那么就提前缓存部分消息到内存中。(最多2048条)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

2.2.1.控制台配置Lazy模式

注:即使是 Lazy Queue,如果消息未标记为持久化,那么这些消息在 RabbitMQ 服务器重启后仍然会丢失。

在添加队列的时候,添加x-queue-mod=lazy参数即可设置队列为Lazy模式:

img

2.2.2.代码配置Lazy模式

在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:

1
2
3
4
5
6
7
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}

这里是通过QueueBuilderlazy()函数配置Lazy模式,底层源码如下:

img

当然,我们也可以基于注解来声明队列并设置为Lazy模式:

1
2
3
4
5
6
7
8
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}

2.2.3.更新已有队列为lazy模式

对于已经存在的队列,也可以配置为lazy模式,但是要通过设置policy实现。

可以基于命令行设置policy:

1
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues:策略的作用对象,是所有的队列

当然,也可以在控制台配置policy,进入在控制台的Admin页面,点击Policies,即可添加配置:

img

img

3.消费者的可靠性

img

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障

这里的网络故障的处理,一般是rabbitMq本身的设计来兜底的。一般会加入心跳机制,如果不跳了的话,那么就将unacked的消息直接变成ready,给其他的消费者消费。不过确实还有其他的解决方法的

  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。

img

3.1.消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。

img

即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

img

Spring AMQP 的确认模式

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

  • auto

    :自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回

    1
    ack

    . 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常(RuntimeException),会自动返回nack
    • 如果是消息处理或校验异常(比如消息转格式失败),自动返回reject;

img

img

通过下面的配置可以修改SpringAMQP的ACK处理方式:

通过消费者端进行配置。可以修改SpringAMQP的ACK处理方式:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:

1
2
3
4
5
6
7
8
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + msg + "】");
if (true) {
throw new MessageConversionException("故意的");
}// Spring 自动返回 nack,消息会重投
log.info("消息处理完成");
}

img

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

我们再次把确认机制修改为auto:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自动ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

img

放行以后,由于抛出的是消息转换异常,因此Spring会自动返回reject,所以消息依然会被删除:

img

我们将异常改为RuntimeException类型:

1
2
3
4
5
6
7
8
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + msg + "】");
if (true) {
throw new RuntimeException("故意的");
}
log.info("消息处理完成");
}

在异常位置打断点,然后再次发送消息测试,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

img

放行以后,由于抛出的是业务异常,所以Spring返回nack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除:

img

当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

img

手动模式(manual)

手动处理回执:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "simple.queue")
public void listenManual(String msg, Channel channel, Message message) throws IOException {
try {
// 业务处理...
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}

优点是可控性强,缺点是编码复杂、易出错。

总结

模式 控制方式 是否推荐 说明
none 无确认 ❌ 不推荐 高风险,可能导致消息丢失
manual 手动确认 ✅ 复杂场景推荐 灵活可控
auto 自动确认 ✅ 推荐默认 配合异常类型做智能处理

3.2.失败重试机制

上述引入了auto之后,对返回的三种状态有了基本的管理的,当返回ack的时候,消息直接删除,当返回reject和nack的时候,重新requeue(默认配置defaultRequeuerejected = true),这是 Spring 对 AMQP 协议的高级封装,不是 RabbitMQ 原生的行为。不过每次失败重新requeue也不是个办法,有没有办法,当消息达到消费者端,占用一个消费者线程的时候,增加几次重试的机会呢??这个就是所谓的失败重试机制,准格尔是Spring提供的retry机制。同时需要注意的是,这里所谓的重试是本地重试的,也就是在jvm中,占用了消费者线程对消费方法进行了不断重试。下面的stateless表示的是每次重试都不保存上下文的。

img

修改consumer服务的application.yml文件,添加内容:

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
参数 含义
initial-interval 第一次重试等待时间
multiplier 每次重试的等待时间 = 上次 × multiplier
max-attempts 最大重试次数
stateless: true 推荐使用无状态,避免事务冲突等问题

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

重试流程对比

阶段 默认行为(未配置 retry) 配置 retry 后的行为
消费失败 消息立即 requeue,MQ 再次投递 在本地线程中 retry,不入队
重试失败 持续 requeue,可能形成死循环 最终返回 reject,消息被丢弃
是否占用 MQ 是,占用 MQ、堆积消息 否,仅本地尝试处理

本地重试机制让 MQ 本身更轻松,消费者自我调节。

3.3.失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。(开启retry之后,都失败reject后,MessageRecover的默认实现使得消息不requeue,从而被丢弃)这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。

img

定制失败补偿策略(MessageRecoverer)

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
实现类 含义 行为
RejectAndDontRequeueRecoverer 默认 失败后拒绝并丢弃
ImmediateRequeueMessageRecoverer 重入队 失败后返回 nack,消息再次投递
RepublishMessageRecoverer ✅推荐 投递到新交换机 失败后将消息转发到专门的“异常队列”

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

自定义异常队列方案(推荐)

1)在consumer服务中定义处理失败消息的交换机和队列

定义新的异常处理交换机、队列:

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,配置重试失败后的处理策略:

1
2
3
4
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

重发,重发三次都失败后会来这里调用RepublishMessageRecoverer

完整代码如下:

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}

1.@ConditionalOnProperty(name = “spring.rabbitmq.listener.simple.retry.enabled”, havingValue = “true”):只有当配置文件中 spring.rabbitmq.listener.simple.retry.enabled=true 时,才会生效被注解的组件 name: 要检查的配置属性名;havingValue: 属性必须匹配的值

2.@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, “error.direct”, “error”);
}
}

思考一下为什么定义了一下这个bean就可以覆盖默认实现了呢?是因为默认实现上使用了@ConditionalOnMissingBean,只要你没有一个返回为MessageRecover的bean方法,那么其就会自动配置默认实现,如果有的话,那么就不会将默认实现放到ioc容器中,以你自己的为主

img

总结

场景 建议方案
避免重复 requeue 启用 retry.enabled: true,减少 MQ 压力
避免失败丢消息 配置 RepublishMessageRecoverer
保证消息可靠性 配合异常队列做人工或定时任务补偿
保证 MQ 健康 拒绝无限重试,合理配置最大次数

3.4.业务幂等性

消息被重复消费,如果消费者和mq之间的网络连接断开,消费者的ack未能成功发送到mq,那么等到连接好了之后,mq又会重新发送消息,此时消息重复被消费。如果这个消息是用于扣减库存的,那么就会出现问题。

前面我们讨论的都是消费者消费且有能力返回响应的情况。而这里的业务幂等性主要解决的是的,消费者消费但没有能力响应的情况,mq的策略会将message重新ready投递给其他的消费者,而如果这个时候消费者已经消费好了,就会导致重复消费,而重复消费需要依赖业务幂等性兜底

img

何为幂等性?

幂等性:无论你调用一个接口多少次,结果都是一样的。

img

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。

在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

img

所以,我们要尽可能避免业务被重复执行。

然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。

举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障,消费者消费了消息之后,传递回执结果给mq过程中因为网络故障导致mq没有收到ack,mq误以为消费者宕机了,于是发生重复投递,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

3.4.1方法一:唯一消息 ID + 数据记录

img

这个思路非常简单:

  1. 生产者在发送消息时附带一个唯一 messageId
  2. 消费者处理完业务逻辑后,把这个 ID 记录到数据库中
  3. 下次收到相同 messageId,先去数据库判断是否已消费过,已消费就跳过

SpringAMQP 怎么做?

开启 Jackson 消息转换器自动生成 ID:

img

以Jackson的消息转换器为例:

1
2
3
4
5
6
7
8
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}

接收消息时,获取 messageId 并校验:

1
2
3
4
5
6
7
String messageId = message.getMessageProperties().getMessageId();
if(messageLogService.isConsumed(messageId)) {
// 已处理,跳过
return;
}
// 正常业务逻辑执行后记录消息
messageLogService.markConsumed(messageId);

jjmc.setCreateMessageIds(true);源码如下:

img

3.4.2.方法二:业务状态判断(更推荐)

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。

更轻量、不需要额外表,只依赖业务表中已有状态字段。

例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

示例:更新订单状态为“已支付”

以支付修改订单的业务为例,我们需要修改OrderServiceImpl中的markOrderPaySuccess方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void markOrderPaySuccess(Long orderId) {
// 1.查询订单
Order old = getById(orderId);
// 2.判断订单状态
if (old == null || old.getStatus() != 1) {
// 已支付或已取消,跳过处理
return;
}
// 3.执行更新:订单未支付时才更新为已支付
Order order = new Order();
order.setId(orderId);
order.setStatus(2);
order.setPayTime(LocalDateTime.now());
updateById(order);
}

img

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作(上面代码块里的步骤2和步骤3),因此在极小概率下可能存在线程安全问题。

线程安全优化:将判断与更新合并为一个 SQL 语句

我们可以合并上述操作为这样:

1
2
3
4
5
6
7
8
9
10
@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

注意看,上述代码等同于这样的SQL语句:

1
UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

只有订单状态为 1(未支付)时才会执行更新,保障幂等性。

对比总结

方案 特点 适用场景
消息唯一 ID 精确可控,但需维护记录表 高精度日志、重要任务
业务状态判断 ✅推荐 无需新表,基于业务逻辑 状态清晰的业务(如订单、库存)

不管消息投递多少次,业务结果都应该保持一致。幂等性不是 MQ 自带的,而是业务端自己兜底保障的逻辑。

3.5.兜底方案

img

虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?

img

有没有其它兜底方案,能够确保订单的支付状态一致呢?

其实思想很简单:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。

img

下单后立刻发送一个延迟消息给mq。

img

img

流程如下:

img

图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

img

不过需要注意的是,交易服务并不知道用户会在什么时候支付,如果查询的时机不正确(比如查询的时候用户正在支付中),可能查询到的支付状态也不正确。

那么问题来了,我们到底该在什么时间主动查询支付状态呢?

这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。

img

定时任务大家之前学习过(@Scheduled),具体的实现这里就不再赘述了。

保证方式 说明
主通知机制 支付成功后发送 MQ 消息通知交易服务
异常兜底方案 MQ 消息失败时,交易服务通过定时任务主动查询支付状态

这种“主动 + 被动”的结合方式,可以最大限度保障 订单状态一致性和支付可靠性

至此,消息可靠性的问题已经解决了。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

4.延迟消息

img

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存

例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第30分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件
方式 说明
① 死信交换机(DLX)+ TTL 利用消息或队列的过期时间,结合死信机制实现延迟
② 延迟插件(rabbitmq_delayed_message_exchange 原生支持延迟,不依赖过期和死信

4.1.死信交换机和延迟消息

4.1.1.死信交换机

img

什么是死信?

死信队列(DLQ)和死信交换机(DLX)中的“死信”其实指的就是无法被正常处理或传递的消息。信:就是消息,信息。

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过**dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机**(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息

4.1.2.延迟消息

img

前面两种作用场景可以看做是把死信交换机(利用死信交换机的机制实现延迟消息)当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用(消费者消费retry多次失败之后投递到指定交换机)类似。

而最后一种场景,大家设想一下这样的场景:

img

img

如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:

img

假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的有效期为5000毫秒:

img

注意:尽管这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。(这里的routingkey不是为前面的fanout交换机设计的,而是为了后续死信交换机可以将消息投递到对应的queue中,需要和queue的bindingkey一致,所以才需要routingke)

消息肯定会被投递到ttl.queue之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信:

img

死信被再次投递到死信交换机hmall.direct,并沿用之前的RoutingKey,也就是blue

img

由于direct.queue1hmall.direct绑定的key是blue,因此最终消息被成功路由到direct.queue1,如果此时有消费者与direct.queue1绑定, 也就能成功消费消息了。但此时已经是5秒钟以后了:

img

img

也就是说,publisher发送了一条消息,但最终consumer在5秒后才收到消息。我们成功实现了延迟消息

img

img

img

4.1.3.总结

1.routingKey 的作用

上图流程中虽然 ttl.fanout 是广播型不需要 routingKey,但死信交换机是直连型的,因此:

  • 消息最初的 routingKey 仍然被携带
  • 确保死信 routingKey 与目标队列绑定的 key 一致(如 blue

2.TTL 的不准确性

RabbitMQ 的过期时间处理是 惰性检查

只有当消息到达队首,才检查 TTL 是否过期

所以:

  • 如果前面有很多未过期消息,后面即使 TTL 到期也不会马上转为死信
  • 延迟处理时间不一定精确(可用插件方案解决)

补充:电商支付场景中 MQ 延迟队列Spring Task(定时任务) 两种方案的 本质差异、实现方式和适用场景

一、实现原理对比

特性 MQ 延迟队列 Spring Task(定时任务)
实现方式 延迟发送消息(如 RabbitMQ + TTL + 死信队列) 定时轮询数据库
驱动机制 事件驱动(消息到期后推送) 定时驱动(周期性主动查询)
粒度控制 每条消息可独立设置延迟时间(精确到毫秒) 全量轮询,按批处理,粒度粗
消息处理 一条消息对应一个订单 每次任务处理多个订单

二、示例流程理解

img

img

三、使用场景与适配建议

维度 MQ 延迟队列 Spring Task
实时性要求 高(秒级精度) 低(分钟级)
系统规模 中大型电商系统 中小型或单体服务
延迟任务量 大(每个订单单独触发) 小(集中查询处理)
系统耦合 解耦良好(基于 MQ) 紧耦合(依赖本地数据库)
容错能力 需额外保障 MQ 可用性 可作为兜底方案
实现复杂度 高(配置 TTL、DLX、消费者逻辑) 低(只需写定时任务逻辑)

四、延迟队列 vs 定时任务的配合使用(最佳实践)

实际生产中建议二者组合使用

  • 主方案:使用 MQ 延迟队列实现实时取消订单
  • 兜底方案:使用定时任务扫描订单表,防止消息丢失或消费者异常

📌 举个例子:

  • 用户下单,发送一条 30 分钟延迟消息到 MQ;
  • 正常情况下,30分钟后消费者执行取消逻辑;
  • 万一延迟消息未投递或消费失败(比如宕机);
  • 定时任务每 5 分钟兜底检查,保证最终一致性。

img

4.2.DelayExchange插件

完整的步骤在这:https://developer.aliyun.com/article/1482289

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。

img

4.2.1.下载

插件下载地址:

GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

4.2.2.安装

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

1
docker volume inspect mq-plugins

结果如下:

img

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

1
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在名为 mq 的 RabbitMQ Docker 容器中,启用 rabbitmq_delayed_message_exchange 插件,使其支持延迟消息功能。

运行结果如下

img

img

4.2.3.声明延迟交换机

RabbitMQ 延迟插件启用后,需要告诉 RabbitMQ “我有一个可以延迟投递的交换机”。
在 Spring Boot 里有两种方式:

1.基于注解方式:

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
  • delayed = "true" 表示这是一个 延迟交换机(必须加这个才会生效)。
  • delay.queue 是延迟队列的名字。
  • delay.direct 是延迟交换机的名字,绑定的路由键是 delay
  • 当有消息路由到这个交换机时,插件会先延迟,再投递给 delay.queue

2.基于@Bean的方式(推荐):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.itheima.consumer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class DelayExchangeConfig {

@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}

@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}

@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
  • 使用 Java Config 更清晰,而且不依赖 Spring AMQP 的注解魔法。
  • 生产和消费分离,不会因为监听类加载而自动创建队列、交换机。

4.2.4.发送延迟消息

发送的时候 必须 指定 x-delay 消息头(延迟时间,毫秒)。
在 Spring Boot 中可以用 MessagePostProcessor 来加这个属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}

5000 表示延迟 5 秒。

消息会先被放在交换机的延迟存储中,到时间后再投递到 delay.queue

消息头和消息体是分开的:

  • 消息体会被序列化(例如 JSON → byte[])
  • 消息头(如 x-delay)由协议传输,RabbitMQ 原生识别。

一个消息对象是分为消息头和消息体的,消息体走json序列化成byte[],而消息头的话,底层的协议自己会识别传输。所以消息体的话,你用Sting类型或者是其他的类型都是可以的,都可以json序列化,这里使用MessagePostProcessor纯属是为了在消息发送给rabbitMq客户端传输之前改消息头

注意:

img

4.3.超时订单问题

接下来,我们就在交易服务中利用延迟消息实现订单超时取消功能。其大概思路如下:

img

查询支付状态有两次:

1.查询本地订单状态,如果已经正常通知了,支付和交易服务的通知正常着,订单状态已经修改为已支付了,此时直接结束即可。

2.如果本地查询到的订单状态不是已支付,那么有可能是没能通知到,此时需要去向支付服务查询支付流水状态,如果是已支付,则修改,如果不是,那么就说明超时了,则取消订单

假如订单超时支付时间为30分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为30分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。

4.3.1.定义常量

无论是消息发送还是接收都是在交易服务完成,因此我们在trade-service中定义一个常量类,用于记录交换机、队列、RoutingKey等常量:

img

内容如下:

1
2
3
4
5
6
7
packagecom.hmall.trade.constants;

publicinterfaceMQConstants {
String DELAY_EXCHANGE_NAME= "trade.delay.direct";// 延迟交换机
String DELAY_ORDER_QUEUE_NAME= "trade.delay.order.queue";// 延迟队列
String DELAY_ORDER_KEY= "delay.order.query";// 路由键
}

4.3.2.配置MQ

trade-service模块的pom.xml中引入amqp的依赖:

1
2
3
4
5
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

trade-serviceapplication.yaml中添加MQ的配置:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.150.101
port: 5672
virtual-host: /hmall
username: hmall
password: 123

4.3.3.改造下单业务,发送延迟消息

思路:就是支付之后,正常情况下支付服务会将支付结果(成功或失败)通过mq告诉订单服务(这个服务里记录了订单的状态,已支付或未支付之类的)。但是如果出现了特殊情况mq没有能通知到订单服务,那就需要订单服务自己去查一下结果

接下来,我们改造下单业务,在下单完成后,发送延迟消息,查询支付状态。

修改trade-service模块的com.hmall.trade.service.impl.OrderServiceImpl类的createOrder方法,在下单成功的 createOrder() 方法里,加一段延迟消息发送的逻辑:

1
2
3
4
5
6
7
8
9
rabbitTemplate.convertAndSend(
MQConstants.DELAY_EXCHANGE_NAME, // 交换机
MQConstants.DELAY_ORDER_KEY, // 路由键
orderId, // 消息内容(订单 ID)
message -> {
message.getMessageProperties().setDelay(10 * 1000); // 延迟 10 秒(测试用)
return message;
}
);

这里延迟消息的时间应该是15分钟,不过我们为了测试方便,改成10秒。

4.3.4.编写查询支付状态接口

由于MQ消息处理时需要查询支付状态,因此我们要在pay-service模块定义一个这样的接口,并提供对应的FeignClient.

首先,在hm-api模块定义三个类:

img

说明:

  • PayOrderDTO(支付订单信息的数据结构):支付单的数据传输实体
  • PayClient(Feign 客户端):支付系统的Feign客户端
  • PayClientFallback(熔断时的降级逻辑):支付系统的fallback逻辑

PayOrderDTO代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.hmall.api.dto;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.time.LocalDateTime;

/**
* <p>
* 支付订单
* </p>
*/
@Data
@ApiModel(description = "支付单数据传输实体")
public class PayOrderDTO {
@ApiModelProperty("id")
private Long id;
@ApiModelProperty("业务订单号")
private Long bizOrderNo;
@ApiModelProperty("支付单号")
private Long payOrderNo;
@ApiModelProperty("支付用户id")
private Long bizUserId;
@ApiModelProperty("支付渠道编码")
private String payChannelCode;
@ApiModelProperty("支付金额,单位分")
private Integer amount;
@ApiModelProperty("付类型,1:h5,2:小程序,3:公众号,4:扫码,5:余额支付")
private Integer payType;
@ApiModelProperty("付状态,0:待提交,1:待支付,2:支付超时或取消,3:支付成功")
private Integer status;
@ApiModelProperty("拓展字段,用于传递不同渠道单独处理的字段")
private String expandJson;
@ApiModelProperty("第三方返回业务码")
private String resultCode;
@ApiModelProperty("第三方返回提示信息")
private String resultMsg;
@ApiModelProperty("支付成功时间")
private LocalDateTime paySuccessTime;
@ApiModelProperty("支付超时时间")
private LocalDateTime payOverTime;
@ApiModelProperty("支付二维码链接")
private String qrCodeUrl;
@ApiModelProperty("创建时间")
private LocalDateTime createTime;
@ApiModelProperty("更新时间")
private LocalDateTime updateTime;
}

PayClient代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.hmall.api.client;

import com.hmall.api.client.fallback.PayClientFallback;
import com.hmall.api.dto.PayOrderDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {
/**
* 根据交易订单id查询支付单
* @param id 业务订单id
* @return 支付单信息
*/
@GetMapping("/pay-orders/biz/{id}")
PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}

PayClientFallback代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.hmall.api.client.fallback;

import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;

@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {
@Override
public PayClient create(Throwable cause) {
return new PayClient() {
@Override
public PayOrderDTO queryPayOrderByBizOrderNo(Long id) {
return null;
}
};
}
}

最后,在pay-service模块的PayController中实现该接口:

1
2
3
4
5
6
@ApiOperation("根据id查询支付单")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){
PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();
return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}

4.3.5.监听消息,查询支付状态

接下来,我们在trader-service编写一个监听器,监听延迟消息,查询订单支付状态:

img

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
@RequiredArgsConstructor
public class OrderDelayMessageListener {

private final IOrderService orderService;
private final PayClient payClient;

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MQConstants.DELAY_ORDER_QUEUE_NAME),
exchange = @Exchange(name = MQConstants.DELAY_EXCHANGE_NAME, delayed = "true"),
key = MQConstants.DELAY_ORDER_KEY))
public void listenOrderDelayMessage(Long orderId) {
// 1. 查订单
Order order = orderService.getById(orderId);
if (order == null || order.getStatus() != 1) {
return; // 订单不存在或已支付
}

// 2. 查支付状态
PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);

if (payOrder != null && payOrder.getStatus() == 3) {
// 已支付 → 更新订单状态
orderService.markOrderPaySuccess(orderId);
} else {
// 未支付 → 取消订单 + 恢复库存
orderService.cancelOrder(orderId);
}
}
}

注意:

cancelOrder() 要取消订单并加回库存。

markOrderPaySuccess() 要在订单服务里实现。

高级篇完

RocketMQ完结