RocketMQ组件介绍与基本使用

https://github.com/alibaba/spring-cloud-alibaba/wiki/版本说明#组件版本关系

1 组件介绍

1.1 Message Queue

RocketMQ支持多master的架构。思考一个问题:当有多个master的时候,发往Topic的多条消息会在多个master的Broker上存储。那么,发往某一个Topic的多条消息,是不是在所有的Broker上存储完全相同的内容?

肯定不是的。如果所有的master存储相同的内容,而slave又跟master存储相同的内容:第一个,浪费了存储空间。第二个,无法通过增加机器数量线性的提升Broker的性能,也就是只能垂直扩展,通过升级硬件的方式提升性能,无法实现横向(水平)扩展。那么在分布式的环境中,RocketMQ的性能肯定会受到非常大的限制。一句话,不符合分片的思想。

在kafka中设计了一个partiton,一个topic可以拆分成多个partition,这些partition可以分布在不同的Broker上,这样就实现了数据的分片。也决定了kafka可以实现横向扩展。

RocketMQ中设计了一个叫做Message Queue的逻辑概念,左右跟partition类似。

首先,创建Topic的时候会指定队列的数量,一个叫writeQueueNums(写队列数量),一个readQueueNums(读队列数量)。写队列的数量决定了有几个Message Queue,读队列的数量决定了有一个线程来消费这些Message Queue(只是用来负载)。服务端创建一个Topic默认8个队列:

1
private int defaultTopicQueueNums = 8;

topic不存在,生产者发送消息时创建默认4个队列:

1
private volatile int defaultTopicNums = 4;

1.2 Producer and Consumer

1.2.1 Producer

生产者,用于生产消息,会定时从NameServer拉取路由信息(不用配置RocketMQ的服务地址),然后路由信息与指定的Broker建立TCP长连接,从而将消息发送到Broker中。发送逻辑一致的Producer可以组成一个Group。RocketMQ的生产者同样支持批量发送,不过List要自己传进去。Producer写数据只能操作master节点。

1.2.2 Consumer

消息的消费者,通过NameServer 集群获得Topic的路由信息,连接到对应的Broker上消费消息。消费逻辑一致的Consumer可以组成一个Group,这时候消息会在Consumer之间负载。由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。

注意:同一个consumer group内的消费者应该订阅同一个topic。或者反过来,消费不同topic的消费者不应该采用相同的consumer group名字。如果不一样,后面的消费者的订阅,会覆盖前面的订阅。

消费者有两种消费方式:一种是集群消费(消息轮询),一种是广播消费(全部收到相同副本)。从消费模型来说,RocketMQ支持pull和push两种模式。

  1. pull模式

    Pull模式是consumer 轮询从broker拉取消息。pull有两种实现范式:

    一种是普通轮询(Polling)。不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。普通轮询的缺点:因为大部分时候没有数据,这些无效的请求会大大的浪费服务器的资源。而且定时请求的间隔过长的时候,会导致消息延迟。

    另一种是长轮询,RocketMQ的pull用长轮询来实现。客户端发起Long Polling,如果此时服务端没有相关数据,会hold住请求,直到服务端有相关数据,或者等待一定时间超时才会返回。返回后,客户端又会立即再次发起下一次Long Polling(所谓的hold住请求指的服务端暂时不回复结果,保存相关请求,不关闭请求连接,等相关数据准备好,写回客户端)。长轮询解决了轮询的问题,唯一的缺点是服务器在挂起的时候比较消耗内存。

  2. push模式

    push模式是Broker推送消息给consumer,RocketMQ的push模式实际上是基于pull模式实现的,只不过是在pull模式上封装了一层,所以RocketMQ push模式并不是真正意义上的“推模式”。在RokcetMQ中,PushConsumer会注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

1.3 Topic

Topic用于将消息按主题做划分,比如订单消息、物流消息。注意,跟kafka不同的是,在RocketMQ中,topic是一个逻辑概念,消息不是按topic划分存储的。

Producer将消息发往指定的topic,Consumer订阅这个topic就可以收到相应的消息。跟kafka一样,如果topic不存在,会自动创建.

1
2
//BrokerConfig
private boolean autoCreateTopicEnable = true;

Topic跟生产者和消费者都是多对多的关系,一个生产者可以发送消息到多个topic,一个消费者可以订阅多个topic。

1.4 Tag

1.5 Broker

RocketMQ的服务,或者说一个进程,叫做Broker,Broker的作用是存储和转发消息。RocketMQ单机大约能承受10万QPS的请求。

为了提升Broker的可用性(防止单点故障),以及提升服务器的性能(实现负载),通常会做集群的部署。跟kafka获取redis cluster一样,RocketMQ集群的每个Broker节点保存总数据的一部分,因此可以实现横向扩展。为了提高可靠性(防止数据丢失),每个Broker可以有自己的副本(slave)。

默认情况下,读写都发生在master上。在slaveReadEnable=true的情况下,slave也可以参与读负载。但是默认只有BrokerId=1的slave才会参与读负载,而且是在master消费慢的情况下,由whichBrokerWhenConsumeSlowly这个参数决定。

1
private long whichBrokerWhenConsumeSlowly = 1;

1.6 NameServer

当不同的消息存储在不同的Broker上,生产者和消费者对于Broker的选取,或者说路由选择是一个非常关键的问题。所以,跟分布式的服务调用的场景需要一个注册中心一样,在RocketMQ中需要有一个角色来管理Broker的信息。kafka是用Zookeeper管理的,RocketMQ是用NameServer。

可以把NameServer理解是RocketMQ的路由中心,每一个NameServer节点都保存着全量的路由信息,为了保证高可用,nameServer自身也可以做集群的部署,它的作用有点像Eureka或者Redis的Sentinel。也就是说,Broker会在NameServer上注册自己,Producer和Consumer用NameServer来发现Broker。

1.7 RocketMQ Cluster

2 基本使用

2.1 简单案例

(1)引入rocketmq-client包

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>

(2)创建Consumer并启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Consumer01 {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("simple_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("simple_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printLn("%s Receive New Message: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printLn("Consumer Started.%n");
}
}

(3)创建Producer并启动

1
2
3
4
5
6
7
8
9
10
11
12
13
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("simple_consumer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for(int i=0;i<3;i++){
Message message = new Message("simple_topic", "TAGA", ("Hello ROCKETMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.printLn(sendResult);
}
producer.shutdown();
}
}

2.2 生产者发送消息方式

2.2.1 同步

producer发送消息给broker时,只有当broker服务器刷盘成功,返回sendResult.OK才算成 功,例如上述简单案例中的producer#send(msg)方法

2.2.2 异步

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ的异步发送,需要实现异步发送回调接口(SendCallback)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//Consumer
public class Consumer02 {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("async_topic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
System.out.println(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

//ASyncProducer
public class ASyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("async_producer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 6; i++) {
Message message = new Message("async_topic","tag_sync","OrderID188", ("Hello ASync " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步回调
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}
}
}

2.2.3 单向

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//Consumer
public class Consumer03 {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oneway_consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("oneway_topic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
System.out.println(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

//OnewayProducer
public class OnewayProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("oneway_producer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("oneway_topic" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
}
Thread.sleep(5000);
producer.shutdown();
}
}

2.2.4 三种发送模式对比

发送方式 TPS 可靠性 返回值 应用场景
Sync 重要消息,比如:短信、邮件等
ASync 较快 较高 对并发要求和响应时间比较高的场景,同时需要回调函数
Oneway 最快 不高 对于可靠性要求不高的场景,也不需要返回值,比如日志收集

2.3 消费消息模式

2.3.1 集群模式(负载均衡模式)

Clustering:也是默认的模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//Consumer
public class Consumer04 {
public static void main(Strin[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_model_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("consumer_model_topic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
System.out.println(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

//Producer
public class Producer01 {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test_producer01");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i=0;i<10;i++) {
Message message = new Message("consumer_model_topic", "TAGA", ("consumer model "+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
producer.shutdown();
}
}

观察producer返回结果,可以发现,10条消息被发送到了consumer_model_topic下的4个队列上。

启动两个Consumer04实例,观察两个Consumer实例的输出,可以发现其中一个consumer获取到了2个队列[2和3]上的5条消 息,另外一个consumer获取到了2个队列[0和1]上的5条消息

2.3.2 广播模式

将Consumer的consumer.setMessageModel(MessageModel.CLUSTERING)修改成 consumer.setMessageModel(MessageModel.BROADCASTING)

(1)将Consumer04的消费模式更改为BROADCASTING

(2)重启Consumer04的两个实例

(3)通过Producer01再次发送10条消息,观察Consumer04中的console打印,可以发现,每个Consumer04实例都会消费所有的消息

2.3.3 集群模式与广播模式思考

(1)负载均衡模式下,某个topic下有4个队列,consumer有5个实例

(2)负载均衡模式下,某个topic下有4个队列,consumer有2个实例

(3)负载均衡模式下,某个topic下有4个队列,consumer有3个实例

(4)广播模式下,某个topic下有4个队列,consumer有多少个实例

2.4 消息类型

2.4.1 普通消息-Normal Message

普通消息

2.4.2 延迟消息-Delay Message

msg通过producer发送到broker之后,不会立即被consumer消息,而是要等待到指定的时间 之后才能被消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//Consumer
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("schedule_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("schedule_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

//Producer
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("schedule_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
int totalMessagesToSend = 8;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("schedule_topic", ("Hello scheduled message " + i).getBytes());
// String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
message.setDelayTimeLevel(4);
// Send the message
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
Thread.sleep(5000);
producer.shutdown();
}
}

延迟等级对应时间:https://rocketmq.apache.org/docs/4.x/producer/04message3/

2.4.3 批量消息-Batch Message

https://rocketmq.apache.org/docs/4.x/producer/05message4

Producer发送单个消息的最大限制是4M,可以通过maxMessageSize进行设置,同时broker对于单个消息的最大限制也是4M,在MessageStoreConfig中的maxMessageSize属性可以看到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//Consumer
public class BatchConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("batch_topic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

//Producer
public class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("batch_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
List<Message> messages = new ArrayList<>();
messages.add(new Message("batch_topic", "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message("batch_topic", "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message("batch_topic", "TagA", "OrderID003", "Hello world 2".getBytes()));
SendResult sendResult = producer.send(messages);
System.out.println(sendResult);
}
}

2.4.4 过滤消息-Filter Message

https://rocketmq.apache.org/docs/featureBehavior/07messagefilter

可以根据TAG和SQL语句进行过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//Consumer
public class FilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("filter_topic","TagB");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody());
System.out.println(msg);
System.out.println(body+" "+msg.getTags());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

//Producer
public class FilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("filter_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
String tag = i % 2 == 0 ? "TagA" : "TagB";
Message message = new Message("filter_topic", tag, ("Hello Filter Message " + i).getBytes());
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
}
}

2.4.5 顺序消息-Ordered Message

https://rocketmq.apache.org/docs/featureBehavior/03fifomessage

(1)什么是顺序消息

消息生产的顺序要与消息消费的顺序一致。

(2)目前消息的生产和消费是怎样的

目前的情况下,producer会使用Round Robin的方式把消息发送到不同的queue中,consumer消费消息的时候是通过多线程从各个queue上获取消息的,显然不能保证有序性。

(3)全局有序

全局有序:在某个topic下,所有的消息都要保证消费有序

此时topic中的queue只能有一个,这样producer往一个queue上发送消息,并且consumer只能有一 个,并且采用单线程的方式消息。 但在高并发的场景下,性能比较低,所以一般情况下用得比较少。

(4)局部有序

局部有序:某个topic下有多个queue,但是每个queue中的消息被消费时都是有序的

此时需要保证只有一个producer,并且选择指定的队列进行发送消息,同时consumer中使用单线程进行消费 这时候在高并发的场景下,性能比较高,所以这种局部有序用得比较多。

(5)局部有序的应用场景

比如在下单场景中,用户下了一个订单,订单的orderid为order001,接下来会向指定的queue发送消息,并且是按照顺序发送的,比如:订单支付消息、订单发货消息、订单物流消息等,然后consumer消费的时候,需要按照这种顺序进行消息,所以需要保证单线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//局部有序demo
//Consumer
public class OrderlyConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("orderly_topic","*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context){
for (MessageExt msg : msgs) {
try {
// 业务逻辑的处理
Thread.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName()+ " "+"queueId:"+msg.getQueueId()+ " body: "+new String(msg.getBody()));
} catch (Exception e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

//Producer
public class OrderlyProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("orderly_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] flows = {"订单支付", "订单发货", "订单物流","订单完成"};
for (int i = 0; i < 4; i++) {
String flow = flows[i % 4];
Message message = new Message("orderly_topic", "TagA", flow.getBytes());
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer queueId = (Integer) arg;
System.out.println("queueId: " + queueId);
// 也就是选择了某一个queue
return mqs.get(queueId);
}
}, 1); // 选择queueId为1的queue
System.out.println(sendResult);
}
}
}

2.4.6 事务消息-Transactional Message

https://rocketmq.apache.org/docs/featureBehavior/04transactionmessage

RocketMQ提供类似XA或Open XA的分布式事务功能,通过消息队列RocketMQ事务消息,能达到分布式事务的最终一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
//Transaction Consumer
public class TransactionConsumer{
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQConsumer("transaction_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("transaction_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, COnsumeConcurrentlyContext context) {
for(MessageExt msg : msgs){
System.out.println(msg.getTags()+" : "+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started..")
}
}

//Transaction Producer
public class TransactionProducer{
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("trans_producer_group");
//执行本地事务需要用到的监听器
TransactionListener transactionListener = new TransactionListenerImpl();
//用于回查本地事务状态的线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr("localhost:9876");
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for(int i = 0 ; i < 10 ; i++){
try {
Message msg = new Message("transaction_topic", tags[i % tags.length], "KEY"+i, ("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.pringln("%s%n", sendResult);
Thread.sleep(10);
} catch (Exception e){
e.printStackTrace();
}
}
}
}
//用于执行本地事务
class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("本地事务执行。。。");

if(msg.getTags().equals("TagA")){
//如果msg的tag值为TagA,则提交一个COMMIT_MESSAGE状态
return LocalTransactionState.COMMIT_MESSAGE;
} else if(msg.getTags().equals("TagB")) {
//如果msg的tag值为TagB,则提交一个ROLLBACK_MESSAGE状态
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
//如果msg的tag为其他,则提交一个LocalTransactionState.UNKNOW,表示需要rocketmq主动向本地事务进行回查
return LocalTransactionState.UNKNOW;
}
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//检查本地事务
return LocalTransactionState.COMMIT_MESSAGE;
}
}

3 RocketMQ刷盘机制

所谓的刷盘机制就是将broker application内存中的数据,持久化到commitlog文件中。 可以采用同步刷盘或者异步刷盘的方式。

broker.conf文件中的配置项为:flushDiskType =SYNC_FLUSH / ASYNC_FLUSH

3.1 同步刷盘

所谓的同步刷盘如图所示,也就是说当producer发送消息给broker时,消息只要持久化到磁盘中才算成功。

3.2 异步刷盘

所谓的异步刷盘,就是producer将消息发送给broker之后,broker虽然也会进行刷盘操作,但是对于producer而言不关心,直接返回success。

4 RocketMQ零拷贝

RocketMQ中使用的是mmap的方式,当然也有其他实现方式,比如sendfile。mmap适合小数据量读写,sendFile 适合大文件传输。

5 SpringBoot集成

https://docs.spring.io/spring-boot/docs/2.7.6/reference/html/messaging.html#messaging

(1)引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>

(2)写配置

1
2
3
4
5
6
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: spring-boot-producer-group
consumer:
group: spring-boot-consumer-group

(3)生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {

@Resource
private RocketMQTemplate rocketMQTemplate;

@RequestMapping("/produce")
public String produce(){
this.rocketMQTemplate.convertAndSend("springboot-topic", "Hello Spring Boot Rocketmq");
return "produce success";
}
}

(4)消费者

1
2
3
4
5
6
7
8
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}",topic = "springboot-topic")
public class SpringBootConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("Spring Boot 获取到消息内容: "+msg);
}
}

6 Spring Cloud集成(Spring Cloud Stream)

https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ-en

https://github.com/alibaba/spring-cloud-alibaba/blob/2.2.x/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md

引入依赖

1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

6.1 SC Stream3.X之前的用法

6.1.1 默认用法

(1)写配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
server:
port: 9999
spring:
cloud:
stream:
rocketmq:
binder:
# 指定rocketmq nameserver的地址
name-server: 127.0.0.1:9876
# 如果没有group,则启动会报错
group: stream-group
bindings:
# 定义name为output的binding,用于发送消息给rocketmq
output:
# 发送给rocketmq上的哪个topic
destination: springcloud-stream-topic
# 定义name为input的binding,用于从rocketmq上获取消息
input:
# 从rocketmq上的哪个topic获取消息
destination: springcloud-stream-topic

(2)添加注解,在入口类添加@Enable驱动

1
2
3
4
5
6
7
8
@SpringApplication
//向spring ioc容器中注入这两个接口实现类,用于代码中进行依赖注入。这个地方会显示过期,跟SpringCloud Stream版本有关
@EnableBinding({Source.class, Sink.class})
public class SpringCloudStreamDemoApplication{
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamDemoApplication.class, args);
}
}

(3)生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
@RequestMapping("/producer")
public class ProducerController{

@Resource
private Source source;

@RequestMapping("/stream-produce")
public String streamProduce(){
this.source.output().send(MessageBuilder.withPayload("stream msg...").build());
return "stream produce successfully.";
}
}

(4)消费者

1
2
3
4
5
6
7
@Service
public class StreamConsumer{
@StreamListener(Sink.INPUT)
public void receiveMsg(String msg){
System.out.println("receive msg: " + msg);
}
}

6.1.2 自定义用法

(1)模仿官方的Source接口,自定义TextSource接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//官方
public interface Source {

String OUTPUT = "output";

@Output(Source.OUTPUT)
MessageChannel output();
}
//自定义
public interface TestSource {

String OUTPUT = "test-output";

@Output(TestSource.OUTPUT)
MessageChannel output();
}

(2)模仿官方的Sink接口,自定义TestSource接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//官方
public interface Sink {
String INPUT = "input";

@Input(Sink.INPUT)
SubscribableChannel input();
}
//自定义
public interface TestSink {
String INPUT = "test-input";

@Input(TestSink.INPUT)
SubscribableChannel input();
}

(3)写配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
server:
port: 9999
spring:
cloud:
stream:
rocketmq:
binder:
# 指定rocketmq nameserver的地址
name-server: 127.0.0.1:9876
# 如果没有group,则启动会报错
group: stream-group
bindings:
# 定义name为output的binding,用于发送消息给rocketmq
output:
# 发送给rocketmq上的哪个topic
destination: springcloud-stream-topic
# 定义name为input的binding,用于从rocketmq上获取消息
input:
# 从rocketmq上的哪个topic获取消息
destination: springcloud-stream-topic

# 自定义output和input
# 定义name为test-output的binding,用于发送消息给rocketmq
test-output:
# 发送给rocketmq上的哪个topic
destination: springcloud-stream-topic
# 定义name为test-input的binding,用于从rocketmq上获取消息
test-input:
# 从rocketmq上的哪个topic获取消息
destination: springcloud-stream-topic

(4)写注解:在入口类上添加TestSource和TestSink

1
2
3
4
5
6
7
@SpringBootApplication
@EnableBinding({Source.class, Sink.class,TestSource.class,TestSink.class})
public class SpringCloudStreamDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamDemoApplication.class, args);
}
}

(5)写代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//生产者
@Resource
private TestSource testSource;
@RequestMapping("/test-stream-produce")
public String testStreamProduce(){
this.testSource.output().send(MessageBuilder.withPayload("test stream msg...").build());
return "test stream produce successfully.";
}

//消费者
@Service
public class TestStreamConsumer {
@StreamListener(TestSink.INPUT)
public void receiveMsg(String msg) {
System.out.println("test receive msg: " + msg);
}
}

6.2 SC Stream 3.X之后的用法

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producing_and_consuming_messages


RocketMQ组件介绍与基本使用
http://www.zivjie.cn/2023/06/17/消息队列/RocketMQ/组件介绍与基本使用/
作者
Francis
发布于
2023年6月17日
许可协议