RabbitMQ实践应用

1 订单延迟关闭

1.1 业务场景

假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能怎么做?

思路:发一条跟订单相关的消息,30分钟后被消费,在消费者的代码中查询订单数据,如果支付状态是未付款,就关闭订单。

RabbitMQ本身不支持延迟投递,总的来说有2种实现方案:

​ 1、先存储到数据库,用定时任务扫描

​ 2、利用RabbitMQ的死信队列(Dead Letter Queue)实现

定时任务比较容易实现,比如每隔一分钟扫描一次,查出30分钟之前未付款的订单,把状态改成关闭。但是如果瞬间要处理的数据量过大,比如10万条,把这些数据查询到内存中逐条处理,也会给服务器带来很大的压力,影响正常业务的运行。

第二点,死信队列

1.2 Message TTL (Time To Live)

1.2.1 队列的属性

首先队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定的消息将会被丢弃。这个属性叫:x-message-ttl

1
2
3
4
5
6
public Queue queue(){
Map<String, Object> map = new HashMap<>();
// 队列中的消息未被消费11秒后过期
map.put("x-message-ttl", 11000);
return new Queue("TTL_Queue", true, false, false, map);
}

但是这种方式不是那么灵活。所以RabbitMQ的消息也有单独的过期事件属性。

1.2.2 消息的属性

在发送消息的时候通过MessageProperties指定消息属性。

1
2
3
4
5
MessageProperties messageProperties = new MessageProperties();
// 消息过期属性,单位ms
messageProperties.setExpiration("4000");
Message message = new Message("这条消息4秒后过期".getBytes(), messageProperties);
rabbitTemplate.send("TTL_EXCHANGE")

如果同时指定了message TTL和Queue TTL,则小的那个时间生效。

有了过期时间还不够,这个消息不能直接丢弃,不然就没办法消费了。最好是丢到一个容器中,这样就可以实现延迟消费了。这里来了解一下死信的概念。

1.3 死信

消息过期后,如果没有任何配置,是会直接丢弃的。可以通过配置让这样的消息变成死信(Dead Letter),在别的地方存储。

1.3.1 死信去哪里

队列在创建的时候可以指定一个死信交换机DLX(Dead Letter Exchange)。死信交换机绑定的队列被称为死信队列DLQ(Dead Letter Queue),DLX实际上也是普通的交换机,DLQ也是普通的队列。

也就是说,如果消息过期了,队列指定了DLX,就会发送到DLX。如果DLX指定DLQ,就会路由到DLQ。路由到DLQ后,就可以消费了。

1.3.2 死信队列的使用

下面通过一个例子来演示死信队列的使用。

​ 1、声明原交换机,原队列,相互绑定。指定原队列的死信交换机。

​ 2、声明死信交换机,死信队列,并且通过“#”绑定,代表无条件路由

​ 3、最终消费者监听死信队列,在这里实现检查订单状态逻辑。

​ 4、生产者发送消息测试,设置消息10秒过期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
![5](E:\Project\blog-hexo\source\image\消息队列\RabbitMQ\5.png)//指定队列的死信交换机
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "DEAD_LETTER_EXCHANGE");
//arguments.put("x-expires", "9000"); //设置队列的TTL
//arguments.put("x-max-length", 4); //如果设置了队列的最大长度,超过长度时,先入队的消息会被发送到DLX

//声明队列(默认交换机AMQP default, Direct)
//String queue, boolean durable, boolean exclusive, boolean antoDelete, Map<String, Object> arguments
channel.queueDeclare("ORI_USE_QUEUE",false,false,false,arguments);
//声明死信交换机
channel.exchangeDeclare("DEAD_LETTER_EXCHANGE","topic",false,false,false,null);
//声明死信队列
channel.queueDeclare("DEAD_LETTER_QUEUE",false,false,false,null);
//绑定
channel.queueBind("DEAD_LETTER_QUEUE","DEAD_LETTER_EXCHANGE","#");
System.out.println("wating for message");

1.3.3 消息流转

总结一下,利用消息的过期时间,过期后投递到DLX,路由到DLQ,监听DLQ,实现了延迟队列。

消息的流转流程:生产者——原交换机——原队列(超过TTL之后)——死信交换机——死信队列——最终消费者。

1.4 延迟队列的其他实现

使用死信队列实现延时消息的缺点:

​ (1)如果统一用队列来设置消息的TTL,当梯度非常多的情况下,比如1分钟,2分钟,5分钟,10分钟,20分钟,30分钟……需要创建很多交换机和队列来路由消息。

​ (2)如果单独设置消息的TTL,则可能会造成队列中的消息阻塞——前一条消息没有出队(没有被消费),后面的消息无法投递(比如第一条消息过期TTL是30min,第二条消息TTL是10min。10分钟后,及时第二条消息应该投递了,但是由于第一条消息还未出队,所以无法投递)

​ (3)可能存在一定的时间误差

在RabbitMQ3.5.7及以后的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现演示队列功能(Linux和windows都可用)。同时插件依赖Erlang/OPT18.0及以上。

​ 插件源码地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

​ 插件下载地址:https://bintray.com/rabbitmq/community-plugins.rabbitmq_delayed_message_exchange

使用方法:

  1. 进入目录
1
2
whereis rabbitmq
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins
  1. 下载插件
1
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

如果下载的文件名带问号,则需要改名,如图:

1
mv download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez rabbitmq_delayed_message_exchange-0.0.1.ez
  1. 启用插件
1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. 停用插件
1
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
  1. 插件使用

通过声明一个x-delayed-message类型的Exchange来使用delayed-messageing特性。x-delayed-message是插件提供的类型,并不是rabbitmq本身的(区别于direct,topic,fanout,headers)。

1
2
3
4
5
6
@Bean("delayExchange")
public TopicExchange exchange(){
Map<String, Object> arges = new HashMap<>();
arges.put("x-delay-type", "direct");
return new TopicExchange("DELAY_EXCHANGE",true,false,arges);
}

生产者:消息属性中指定x-delay参数

1
2
3
4
5
MessageProperties messageProperties = new MessageProperties();
// 延迟的间隔时间,目标时刻减去当前时刻
messageProperties.setHeader("x-delay", delayTime.getTime() - now.getTime());
Message message = new Message(msg.getBytes(),messageProperties);
rabbitTempliate.send("DELAY_EXCHANGE", "#", message);

补充:除了消息过期,还有什么情况消息会变成死信?

​ 1)消息被消费者拒绝并且未设置重回队列:(NACK || Reject)&&requeue == false

​ 2)队列达到最大长度,超过了Max length(消息数)或者Max length bytes(字节数),最先入队的消息会被发送到DLX。

RabbitMQ的消息是存在磁盘上的,如果内存节点,会同时存在磁盘和内存中。当RabbitMQ生产MQ消息的速度远大于消费消息的速度时,会产生大量的消息堆积,占用系统资源,导致机器性能下降。流量控制可以从几方面来控制,一个是服务端,一个是消费端。

2 服务端流控

https://www.rabbitmq.com/configure.html

https://www.rabbitmq.com/flow-control.html

https://www.rabbitmq.com/memory.html

https://www.rabbitmq.com/disk-alarms.html

2.1 队列长度

队列有两个控制长度的属性:

x-max-length:队列中最大存储最大消息数,超过这个数量,对头的消息会被丢弃。

x-max-length-bytes:队列存储的最大消息容量(单位bytes),超过这个容量,对头的消息会被丢弃。

需要注意的是,设置队列长度只在消息堆积的情况下有意义,而且会删除先入队的消息,不能真正的实现服务端限流

2.2 内存控制

https://www.rabbitmq.com/configure.html

RabbitMQ会在启动时检测机器的物理内存数值。默认当MQ占用40%以上内存时,MQ会主动抛出一个内存警告并阻塞所有连接(Connections)。可以通过修改rabbitmq.config文件来调整内存阈值,默认值是0.4,如下所示:windows默认配置文件:advanced.config

1
[{rabbit, [{vm_memory_high_watermark, 0.4}]}]

也可以用命令动态设置,如果设置成0,则所有的消息都不能发布。

1
rabbitmqctl set_vm_memory_high_watermark 0.3

2.3 磁盘控制

https://www.rabbitmq.com/configure.html

另一种方式是通过磁盘来控制消息的发布。当磁盘剩余可用空间低于指定的值时(默认50MB),触发流控措施。例如:指定为磁盘的30%或者2GB

1
2
disk_free_limie.ralative=3.0
disk_free_limit.absolute=2GB

还有一种情况,虽然Broker消息存储的过来,但是push模型下(consume,有消息就消费),消费者消费不过来了,这个时候也要对流量进行控制。

3 消费端流控

https://www.rabbitmq.com/consumer-perfetch.html

默认情况下,如果不进行配置,RabbitMQ会尽可能快速地把队列中地消息发送给消费者。因为消费者会在本地缓存消息,如果消息数量过多,可能导致OOM或者影响其他进程地正常运行。

在消费者处理消息地能力有限,例如消费者数量太少,或者单条消息地处理时间过长的情况下,如果希望在一定数量的消息消费完之前,不再推送消息过来,就要用到消费端的流量限制措施。

可以基于Consumer或者channel设置prefetch count的值,含义为Consumer端的最大的unacked messages数目。当超过这个数值的消息未被确认,RabbitMQ会停止投递新的消息给该消费者。

1
2
channel.basicQos(2); //如果超过2条消息没有发送ACK,当前消费者不再接受队列消息
channel.basicConsume(QUEUE_NAME, false, consumer);

启动两个消费者,其中一个Consumer2消费很慢,qos设置为2,最多一次给它发两条消息,其他的消息都被Consumer1接收了。这个叫能者多劳。

4 消息可靠性投递

一个经典的面试题:在使用MQ实现异步通信的过程中,有消息丢了怎么办?或者MQ消息重复了怎么办?

​ 这个就是RabbitMQ的可靠性投递。当然,RabbitMQ在设计的时候其实就考虑了这一点,提供了很多保证消息可靠性投递的机制。这个可以说是RabbitMQ比较突出的一个特性。

​ 可靠性只是问题的一个方面,发送消息的效率同样是需要考虑的问题,而这两个因素是无法兼得的,如果在发送消息的每一个环节都采取相关措施来保证可靠性,势必会对消息的收发效率造成影响。在一些业务实时一致性要求不是很高的场合,可以牺牲一些可靠性来换取效率。

在使用RabbitMQ收发消息的时候,有几个主要环节:

​ 1)消息从生产者发送到Broker,生产者把消息发到Broker之后,怎么知道自己的消息有没有被Broker成功接收?如果Broker不给应答,生产者不断地发送,那有可能是一厢情愿,消息全部进了黑洞。

​ 2)消息从Exchange路由到Queue,Exchange是一个绑定列表,它的职责是分发消息。如果它没有办法履行它地指责怎么办?也就是说,找不到队列或者找不到正确的队列,怎么处理?

​ 3)消息在Queue中存储,队列有自己的数据库(Mnesia),它是真正用来存储消息的。如果还没有消费者来消费,那么消息要一直存储在队列中。怎么保证消息在队列中稳定的存储呢?

​ 4)消费者订阅Queue并消费消息,队列的特性是什么?FIFO。队列中的消息是一条一条的投递的,也就是说,只有上一条消息被消费者接收以后,才能把这一条消息从数据库删掉,继续投递下一条消息。Broker怎么知道消费者已经接受了消息呢?

4.1 消息发送到RabbitMQ服务器

第一个环节是生产者发送消息到Broker。可能因为网络连接或者Broker的问题(比如硬盘故障,硬盘写满了)导致消息发送失败,生产者不能确定Broker有没有正确的接收。如果去设计,肯定要给生产者发送消息的接口一个应答,生产者才可以明确知道消息有没有发送成功。在RabbitMQ中提供了两种机制服务端确认机制,也就是在生产者发送消息给RabbitMQ的服务端的时候,服务端会通过某种方式返回一个应答,只要生产者收到了这个应答,就知道消息发送成功了。

​ 第一种是Transaction(事务)模式,第二种Confirm(确认)模式。

4.1.1 Transaction(事务)模式

在创建channel的时候,可以把信道设置成事务模式,然后就可以发布消息给RabbitMQ了。如果channel.txCommit();的方法调用成功,就说明事务提交成功,则消息一定到达了RabbitMQ中。

1
2
3
4
5
6
7
8
9
10
try{
channel.txSelect();
// 发送消息
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
channel.txCommit();
System.out.println("消息发送成功");
} catch(Exception e) {
channel.txRollback();
System.out.println("消息已经回滚");
}

如果在事务提交之前由于RabbitMQ异常奔溃或者其他原因抛出异常,这个时候便可以将其进行捕获,进而通过执行channel.txRoolback()方法来实现事务回滚。

在事务模式中,只有收到了服务端的Commit-OK的指令,才能提交成功。所以可以解决生产者和服务端确认的问题。但是事务模式有一个缺点,它是阻塞的,一条消息没有发送完毕,不能发送下一条消息,它会榨干RabbitMQ服务器的性能。所以不建议在生产环境中使用。 SpringBoot中的设置: rabbitTemplate.setChannelTransacted(true);

4.1.2 Confirm(确认)模式

确认模式有三种

  1. 普通确认模式

在生产者这边通过调用channel.confirmSelect()方法将信道设置为Confirm模式,然后发送消息。一旦消息被投递到交换机后(跟是否路由到队列没有关系),RabbitMQ就会发送一个确认(Basic.Ack)个生产者,也就是调用channel.waitForConfirms()返回true,这样生产者就知道消息被服务端接受了。如果网络错误,会抛出连接异常。如果交换机不存在,会抛出404错误。

1
2
3
4
5
6
7
//开启发送方确认模式
channel.confirmSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
//普通confirm模式,发送一条,确认一条
if(channel.waitForConfirms()){
System.out.println("消息发送成功");
}
  1. 批量确认模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
try {
channel.confirmSelect();
for(int i=0;i<5;i++){
channel.basicPublish("",QUEUE_NAME,null,(msg+"-"+i).getBytes());
}
//批量确认结果,ACK如果是Multiple=True,代表ACK中的Delivery-Tag之前的消息都被确认了
//比如5条消息可能只收到1个ACK,也可能收到2个(抓包才能看得到)
//知道所有信息都发布,只要有一个未被Broker确认就会IOException
channel.waitForConfirmsOrDie();
System.out.println("消息发送完毕,批量确认成功");
} catch(Exception e){
//发生异常,可能需要对所有消息进行重发
e.printStackTrace();
}

只要channel.waitForConfirmsOrDie();方法没有抛出异常,就代表消息都被服务端接受了。批量确认的方式比单条确认的方式效率要高,但是也有两个问题:第一个就是批量的数量的确定。对于不同的业务,到底发送多少条消息确认一次?数量太少,效率提升不上去。数量多的话,又会带来另一个问题,比如发1000条消息才确认一次,如果前面999条消息都被服务端接收了,如果第1000条消息被拒绝了,那么前面所有的消息都要重发。

异步确认模式需要添加一个ConfirmListen,并且用一个SortedSet来维护一个批次中没有被确认的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final SortedSet<Long> confirmSet = Collections.sunchronizedSortedSet(new TreeSet<Long>());
//这里不会打印所有响应的ACK,ACK可能有多个,有可能一次确认多条,也有可能一次确认一条
//异步监听确认和未确认的消息
//如果要重复运行,先停掉之前的生产者,清空队列
channel.addConfirmListener(new ConfirmListener(){

public void handleNack(long deliveryTag, boolean multiple) throws Exception{
System.out.println("Broker 未确认消息,标识:" + deliveryTag);
if(multiple){
//headSet表示后面参数之前的所有元素全部删除
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
confirmSet.remove(deliveryTag);
}
//todo 这里添加重发的方法
}

public void handleAck(long deliveryTag, boolean multiple) throws Exception{
//如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认
System.out.println("Broker 已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple);
if(multiple){
//headSet表示后面参数之前的所有元素全部删除
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
confirmSet.remove(deliveryTag);
}
System.out.println("未确认的消息:" + confirmSet);
}
});

//开启发送放确认模式
channel.confirmSelect();
for(int i=0;i<10;i++){
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,(msg+"-"+i).getBytes());
confirmSet.add(nextSeqNo);
}
System.out.println("所有消息:" + confirmSet);

SpringBoot:Confirm模式是在Channel上开启的,RabbitTemplate对Channel进行了封装。

1
2
3
4
5
6
7
8
9
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause){
if(!ack){
System.out.println("发送消息失败");
throw new RuntimeException("发送异常" + cause);
}
}
});

4.2 消息从交换机到队列

第二个环节就是消息从交换机路由到队列。可能因为routingkey错误,或者队列不存在(但是生产环境基本上不会出现这两种问题)。

有两种方式处理无法路由的消息,一种是让服务端重发给生产者,一种是让交换机路由到另一个备份的交换机。

  1. 消息回发
1
2
3
4
5
channel.addReturnListener(new ReturnListener(){
public void handleReturn(int replyCode, String repluText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException{
System.out.println(".....");
}
})

SpringBoot消息回发的方式:使用mandatory参数和ReturnListener(在Spring AMQP中是ReturnCallback)

1
2
3
4
5
6
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
public void returnedMessage(Message message, int replyCode, String repluText, String exchange, String routingKey) {
System.out.println(".....");
}
})
  1. 消息路由到备份交换机的方式。

在创建交换机的时候,从属性中指定备份交换机。

1
2
3
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", "ALTERNATE_EXCHANGE"); //指定交换机的备份交换机
channel.exchangeDeclare("TEST_EXCHANGE","topic",false,false,false,arguments);

(注意区别,队列可以指定死信交换机;交换机可以指定备份交换机)

4.3 消息在队列中存储

第三个环节是消息在队列存储,如果没有消费者的话,队列一直存在数据库中。如果RabbitMQ的服务或者硬件发生故障,比如系统宕机,重启,关闭等等,可能会导致内存中的消息丢失,所以要把消息本身和元数据(队列,交换机,绑定)都保存到磁盘。集群。

4.4 消息投递到消费者

服务端应该以某种方式得知消费者对消息的接收情况,并决定是否重新投递这条消息给其他消费者。RabbitMQ提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送ACK给服务端。

​ 没有收到ACK地消息,消费者断开连接后,RabbitMQ会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。

​ 消费者给Broker应答有两种方式,一种是自动ACK,一种是手动ACK。

​ 首先是自动ACK,这个也是默认的情况。也就是没有在消费者处编写ACK的代码,消费者会在收到消息地时候就自动发送ACK,而不是在方法执行完毕地时候发送ACK(并不关心你有没有正常收到消息)。如果想要等消息消费完毕或者方法执行完毕才发送ACK,需要先把自动ACK设置成手动ACK。把autoAck设置成false。

1
channel.basicConsume(QUEUE_NAME,false,consumer);

这个时候RabbitMQ会等待消费者显示地回复ACK后才从队列中移去消息。

1
channel.basicAck(envelope.getDeliveryTag(),true);

在SpringBoot中:application.properties

1
2
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

SimpleRabbitListenerContainer或者SimpleRabbitListenerContainerFactory

1
factory.setAcknowledgeMode(AcknowledgeMode,MANUAL);

注意这三个值的区别:NONE-自动ACK;MANUAL-手动ACK;AUTO-如果方法未抛出异常,则发送ack。如果方法抛出异常,并且不是AmqpRejectAndDontRequeueEsception则发送nack,并且重新入对。如果抛出异常时则发送nack不会重新入队。

1
2
3
4
5
6
public class SecondConsumer{
@RabbitHandler
public void process(String msgContent, Channel channel, Message message) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

如果消费者出了问题,确实是不能发送ACK告诉服务端成功消费了,也有拒绝消息的指令,而且还可以让消息重新入队给其他消费者消费。如果消息无法处理或者消费失败,也有两种拒绝的方式,Basic.Reject()拒绝单条,Basic.Nack()批量拒绝。

1
2
3
4
5
6
7
8
9
if(msg.contains("拒收")){
//拒绝消息
//requeue:是否重新入队列,true-是,false-直接丢弃,相当于告诉队列可以直接删除调
//TODO 如果只有这一个消费者,requeue为true的时候会造成消息重复消费
channel.basicReject(envelop.getDeliveryTag(),false);
} else if(msg.contains("异常")){
//批量拒绝
channel.basicNack(envelope.getDeliveryTag(),true,false);
}

如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无线循环重复消费的情况。可以投递到新的队列中,或者只打印异常的日志)。

​ 简单总结一下:从生产者到Broker,交换机到队列,队列本身,队列到消费者,都有相应的方法知道消费有没有正常流转,或者说当消息没有正常流转的时候采取相关措施。

​ 思考:服务端收到了ACK或者NACK,生产者会知道吗?即使消费者没有接收到消息,或者消费时出现异常,生产者也是完全不知情的。这个是符合解耦思想的。但是如果现在为了保证一致性,生产者必须知道消费者有没有成功消费,怎么办?所以,这个是生产者最终确定消费者有没有消费成功的两种方式:

​ 1)消费者收到消息,处理完毕后,调用生产者的API(是否破坏解耦?)

​ 2)消费者收到消息,处理完毕后,发送一条响应消息给生产者。

4.5 消费者回调

1)调用生产者API

​ 例如:提单系统给其他系统发送了保险消息后,其他系统必须在处理完消息后调用提单系统提供的API,来修改提单系统中这笔数据的状态。只要API没有被调用,数据状态没有被修改,提单系统就认为下游系统没有收到这条消息。

2)发送响应消息给生产者

​ 例如:商业银行与人民银行二代支付通信(使用IBM MQ),无论是人行收到了商业银行的消息,还是商业银行收到了人行的消息,都必须发送一条响应消息(叫做回执报文)。整个通信的流程设计的非常复杂,但是对于金融场景下的消息可靠性保证,是很有用的。

4.6 补偿机制

如果生产者的API就是没有被调用,也没有收到消费者的响应消息,怎么办?生产者和消费者之间应该约定一个超时时间,对于这个时间没有得到响应的消息,才确定为消费失败,比如5分钟。5分钟,对于临时性故障的处理,比如网络恢复,或者重启应用,重启数据库,应该够了。过了5分钟依然没有得到回复的消息,才判断为消费失败。确定失败以后怎么办呢?肯定要重发消息了。不过这里面有几个问题:

​ 1.谁来重发?

​ 假设这个消息是由业务人员操作产生的,对于异步的操作来说,他只要提交了请求就OK了,后面成不成功是不归他管的。所以肯定是后台的代码重发的,不可能让业务人员重新做一笔交易。先创建一个定时任务,比如30秒跑一次,找到业务表里面的这条业务状态是中间状态的记录,查询出来,构建为MQ消息,重新发送。也可以单独设计一张消息表,把本系统所以发送出去的消息全部异步地等级起来,找出状态是未回复地消息发送(注意:这种做法会消耗性能,消耗数据库存储空间)。

​ 2.隔多久重发一次?

​ 假如消费者一直没有回复,比如它重启要20分钟,你5分钟之内尝试重发,肯定还不能正常消费。所以重发肯定不止发一次,要尝试多次,但是又不能发的太频繁,给它一点恢复的时间。不可以设置为1分钟重发一次。也可以设置衰减机制,第一次隔一分钟,第二次隔两分钟。时间由定时任务的执行时间决定。

​ 3.一共重发几次?

​ 重发消息务必控制次数,比如设置3次。这个要在消息表中记录次数来实现,发一次就加一。

​ 4.重发什么内容?

​ 重发,是否发送一摸一样的内容?参考:ATM机上运行的系统叫C端(ATMC)银行的前置系统或者渠道系统叫P端,它接收ATMC的消息,再转发给卡系统或者核心系统。

​ 1)如果客户存款,没有收到核心系统的应答,怎么处理?

​ 因为不知道有没有记账成功,不能给客户吐钞,否则会造成银行短款。因为已经吞钞了,所以要保证成功。最多发送3次存款确认报文。

​ 2)如果客户取款,ATMC未得到核心系统的应答时,怎么处理?

​ 因为没有吐钞,所以要保证失败。最多发送3次存款冲正报文。

4.7 消息幂等性

如果消费者状态是正常的,每一条消息都可以正常处理。只是在响应或者调用API的时候出了问题,会不会出现消息的重复处理?例如:存款1000元,ATMC重发了3次存款交易,核心系统一共处理了4次,账户余额增加了4000元。所以,为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ服务端是没有这种控制的(同一批的消息有个递增的DeliveryTag),它并不知道对于你的业务来说什么才是重复的消息。所以这个只能在消费端控制。

​ 消息出现重复可能会有两个原因:

​ 1.生产者得问题,环节一重复发送消息,比如在开启了Confirm模式但未收到确认,消费者重复投递。

​ 2.由于消费者未发送ACK或者其他原因,消息重复消费。

​ 3.生产者代码或者网络问题

​ 对于重复发送的消息,可以对每一条消息生成一个唯一的业务员ID,通过日志或者消息落库来做重复控制。

4.8 最终一致

如果确实是消费者宕机了,或者代码出现了BUG导致无法正常消费,在尝试多次重发以后,消息最终也没有得到处理,怎么办?例如存款的场景,客户的钱已经被吞了,但是余额没有增加,这个时候银行出现了长款,要怎么处理?如果客户没有主动通知银行,他没有及时查询余额,这个问题是怎么发现的?银行最终怎么把这个账务做平?

​ 在金融系统中,都会有双方对账或者多方对账的操作,通常是在一天业务结束之后,第二天营业之前,金融系统中,多一分钱少一分钱都是非常严重的问题。

​ 会约定一个标准,比如ATM跟核心系统对账,肯定是以核心系统的账务为准。ATMC获取到的对账文件,然后解析,登记成数据,然后跟自己记录的流水比较,找出有问题的数据。对账之后,再手工平账。比如取款寄了账但是没吐钞的,做一笔冲正。存款吞了钞没记账的,要么把钱退给客户,要么补一笔钱。

4.9 消息的顺序性

消息的顺序性指的是消费者消费消息的顺序跟生产者生产消息的顺序是一致的。

​ 在RabbitMQ中,一个队列有多个消费者,由于不同的消费者消费消息的速度是不一样的,顺序无法保证。只有一个队列仅有一个消费者的情况才能保证顺序消费(不同的业务消息发送到不同的专用的队列)。除非负载均衡,不要用多个消费者消费消息,消费端捕获异常。


RabbitMQ实践应用
http://www.zivjie.cn/2023/06/10/消息队列/RabbitMQ/实践应用/
作者
Francis
发布于
2023年6月10日
许可协议