https://github.com/alibaba/spring-cloud-alibaba/wiki/版本说明#组件版本关系
1 组件介绍
1.1 Message Queue

RocketMQ支持多master的架构。思考一个问题:当有多个master的时候,发往Topic的多条消息会在多个master的Broker上存储。那么,发往某一个Topic的多条消息,是不是在所有的Broker上存储完全相同的内容?
肯定不是的。如果所有的master存储相同的内容,而slave又跟master存储相同的内容:第一个,浪费了存储空间。第二个,无法通过增加机器数量线性的提升Broker的性能,也就是只能垂直扩展,通过升级硬件的方式提升性能,无法实现横向(水平)扩展。那么在分布式的环境中,RocketMQ的性能肯定会受到非常大的限制。一句话,不符合分片的思想。
在kafka中设计了一个partiton,一个topic可以拆分成多个partition,这些partition可以分布在不同的Broker上,这样就实现了数据的分片。也决定了kafka可以实现横向扩展。
RocketMQ中设计了一个叫做Message Queue的逻辑概念,左右跟partition类似。
首先,创建Topic的时候会指定队列的数量,一个叫writeQueueNums(写队列数量),一个readQueueNums(读队列数量)。写队列的数量决定了有几个Message Queue,读队列的数量决定了有一个线程来消费这些Message Queue(只是用来负载)。服务端创建一个Topic默认8个队列:
1
| private int defaultTopicQueueNums = 8;
|
topic不存在,生产者发送消息时创建默认4个队列:
1
| private volatile int defaultTopicNums = 4;
|
1.2 Producer and Consumer

1.2.1 Producer
生产者,用于生产消息,会定时从NameServer拉取路由信息(不用配置RocketMQ的服务地址),然后路由信息与指定的Broker建立TCP长连接,从而将消息发送到Broker中。发送逻辑一致的Producer可以组成一个Group。RocketMQ的生产者同样支持批量发送,不过List要自己传进去。Producer写数据只能操作master节点。
1.2.2 Consumer
消息的消费者,通过NameServer 集群获得Topic的路由信息,连接到对应的Broker上消费消息。消费逻辑一致的Consumer可以组成一个Group,这时候消息会在Consumer之间负载。由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
注意:同一个consumer group内的消费者应该订阅同一个topic。或者反过来,消费不同topic的消费者不应该采用相同的consumer group名字。如果不一样,后面的消费者的订阅,会覆盖前面的订阅。
消费者有两种消费方式:一种是集群消费(消息轮询),一种是广播消费(全部收到相同副本)。从消费模型来说,RocketMQ支持pull和push两种模式。
pull模式
Pull模式是consumer 轮询从broker拉取消息。pull有两种实现范式:
一种是普通轮询(Polling)。不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。普通轮询的缺点:因为大部分时候没有数据,这些无效的请求会大大的浪费服务器的资源。而且定时请求的间隔过长的时候,会导致消息延迟。
另一种是长轮询,RocketMQ的pull用长轮询来实现。客户端发起Long Polling,如果此时服务端没有相关数据,会hold住请求,直到服务端有相关数据,或者等待一定时间超时才会返回。返回后,客户端又会立即再次发起下一次Long Polling(所谓的hold住请求指的服务端暂时不回复结果,保存相关请求,不关闭请求连接,等相关数据准备好,写回客户端)。长轮询解决了轮询的问题,唯一的缺点是服务器在挂起的时候比较消耗内存。
push模式
push模式是Broker推送消息给consumer,RocketMQ的push模式实际上是基于pull模式实现的,只不过是在pull模式上封装了一层,所以RocketMQ push模式并不是真正意义上的“推模式”。在RokcetMQ中,PushConsumer会注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
1.3 Topic

Topic用于将消息按主题做划分,比如订单消息、物流消息。注意,跟kafka不同的是,在RocketMQ中,topic是一个逻辑概念,消息不是按topic划分存储的。
Producer将消息发往指定的topic,Consumer订阅这个topic就可以收到相应的消息。跟kafka一样,如果topic不存在,会自动创建.
1 2
| private boolean autoCreateTopicEnable = true;
|
Topic跟生产者和消费者都是多对多的关系,一个生产者可以发送消息到多个topic,一个消费者可以订阅多个topic。
1.4 Tag

1.5 Broker

RocketMQ的服务,或者说一个进程,叫做Broker,Broker的作用是存储和转发消息。RocketMQ单机大约能承受10万QPS的请求。
为了提升Broker的可用性(防止单点故障),以及提升服务器的性能(实现负载),通常会做集群的部署。跟kafka获取redis cluster一样,RocketMQ集群的每个Broker节点保存总数据的一部分,因此可以实现横向扩展。为了提高可靠性(防止数据丢失),每个Broker可以有自己的副本(slave)。
默认情况下,读写都发生在master上。在slaveReadEnable=true的情况下,slave也可以参与读负载。但是默认只有BrokerId=1的slave才会参与读负载,而且是在master消费慢的情况下,由whichBrokerWhenConsumeSlowly这个参数决定。
1
| private long whichBrokerWhenConsumeSlowly = 1;
|
1.6 NameServer

当不同的消息存储在不同的Broker上,生产者和消费者对于Broker的选取,或者说路由选择是一个非常关键的问题。所以,跟分布式的服务调用的场景需要一个注册中心一样,在RocketMQ中需要有一个角色来管理Broker的信息。kafka是用Zookeeper管理的,RocketMQ是用NameServer。
可以把NameServer理解是RocketMQ的路由中心,每一个NameServer节点都保存着全量的路由信息,为了保证高可用,nameServer自身也可以做集群的部署,它的作用有点像Eureka或者Redis的Sentinel。也就是说,Broker会在NameServer上注册自己,Producer和Consumer用NameServer来发现Broker。
1.7 RocketMQ Cluster

2 基本使用
2.1 简单案例
(1)引入rocketmq-client包
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.4</version> </dependency>
|
(2)创建Consumer并启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class Consumer01 { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("simple_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("simple_topic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printLn("%s Receive New Message: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printLn("Consumer Started.%n"); } }
|
(3)创建Producer并启动
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class SyncProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("simple_consumer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for(int i=0;i<3;i++){ Message message = new Message("simple_topic", "TAGA", ("Hello ROCKETMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.printLn(sendResult); } producer.shutdown(); } }
|
2.2 生产者发送消息方式
2.2.1 同步
producer发送消息给broker时,只有当broker服务器刷盘成功,返回sendResult.OK才算成 功,例如上述简单案例中的producer#send(msg)方法

2.2.2 异步
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ的异步发送,需要实现异步发送回调接口(SendCallback)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class Consumer02 { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("async_topic","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { System.out.println(msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch (Exception e){ e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("Consumer Started."); } }
public class ASyncProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("async_producer"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < 6; i++) { Message message = new Message("async_topic","tag_sync","OrderID188", ("Hello ASync " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); } } }
|
2.2.3 单向
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class Consumer03 { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oneway_consumer"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("oneway_topic","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { System.out.println(msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch (Exception e){ e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("Consumer Started."); } }
public class OnewayProducer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer = new DefaultMQProducer("oneway_producer"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("oneway_topic" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } Thread.sleep(5000); producer.shutdown(); } }
|
2.2.4 三种发送模式对比
| 发送方式 |
TPS |
可靠性 |
返回值 |
应用场景 |
| Sync |
快 |
高 |
有 |
重要消息,比如:短信、邮件等 |
| ASync |
较快 |
较高 |
有 |
对并发要求和响应时间比较高的场景,同时需要回调函数 |
| Oneway |
最快 |
不高 |
无 |
对于可靠性要求不高的场景,也不需要返回值,比如日志收集 |
2.3 消费消息模式
2.3.1 集群模式(负载均衡模式)
Clustering:也是默认的模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class Consumer04 { public static void main(Strin[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_model_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("consumer_model_topic","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { System.out.println(msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch (Exception e){ e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("Consumer Started."); } }
public class Producer01 { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("test_producer01"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i=0;i<10;i++) { Message message = new Message("consumer_model_topic", "TAGA", ("consumer model "+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.println(sendResult); } producer.shutdown(); } }
|
观察producer返回结果,可以发现,10条消息被发送到了consumer_model_topic下的4个队列上。
启动两个Consumer04实例,观察两个Consumer实例的输出,可以发现其中一个consumer获取到了2个队列[2和3]上的5条消 息,另外一个consumer获取到了2个队列[0和1]上的5条消息
2.3.2 广播模式
将Consumer的consumer.setMessageModel(MessageModel.CLUSTERING)修改成 consumer.setMessageModel(MessageModel.BROADCASTING)
(1)将Consumer04的消费模式更改为BROADCASTING
(2)重启Consumer04的两个实例
(3)通过Producer01再次发送10条消息,观察Consumer04中的console打印,可以发现,每个Consumer04实例都会消费所有的消息
2.3.3 集群模式与广播模式思考
(1)负载均衡模式下,某个topic下有4个队列,consumer有5个实例

(2)负载均衡模式下,某个topic下有4个队列,consumer有2个实例

(3)负载均衡模式下,某个topic下有4个队列,consumer有3个实例

(4)广播模式下,某个topic下有4个队列,consumer有多少个实例

2.4 消息类型
2.4.1 普通消息-Normal Message
普通消息
2.4.2 延迟消息-Delay Message
msg通过producer发送到broker之后,不会立即被consumer消息,而是要等待到指定的时间 之后才能被消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("schedule_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("schedule_topic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("schedule_producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); int totalMessagesToSend = 8; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("schedule_topic", ("Hello scheduled message " + i).getBytes()); message.setDelayTimeLevel(4); SendResult sendResult = producer.send(message); System.out.println(sendResult); } Thread.sleep(5000); producer.shutdown(); } }
|
延迟等级对应时间:https://rocketmq.apache.org/docs/4.x/producer/04message3/

2.4.3 批量消息-Batch Message
https://rocketmq.apache.org/docs/4.x/producer/05message4
Producer发送单个消息的最大限制是4M,可以通过maxMessageSize进行设置,同时broker对于单个消息的最大限制也是4M,在MessageStoreConfig中的maxMessageSize属性可以看到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class BatchConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("batch_topic","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
public class BatchProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("batch_producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); List<Message> messages = new ArrayList<>(); messages.add(new Message("batch_topic", "TagA", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message("batch_topic", "TagA", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message("batch_topic", "TagA", "OrderID003", "Hello world 2".getBytes())); SendResult sendResult = producer.send(messages); System.out.println(sendResult); } }
|
2.4.4 过滤消息-Filter Message
https://rocketmq.apache.org/docs/featureBehavior/07messagefilter
可以根据TAG和SQL语句进行过滤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class FilterConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("filter_topic","TagB"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { String body = new String(msg.getBody()); System.out.println(msg); System.out.println(body+" "+msg.getTags()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
public class FilterProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("filter_producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < 10; i++) { String tag = i % 2 == 0 ? "TagA" : "TagB"; Message message = new Message("filter_topic", tag, ("Hello Filter Message " + i).getBytes()); SendResult sendResult = producer.send(message); System.out.println(sendResult); } } }
|
2.4.5 顺序消息-Ordered Message
https://rocketmq.apache.org/docs/featureBehavior/03fifomessage
(1)什么是顺序消息
消息生产的顺序要与消息消费的顺序一致。
(2)目前消息的生产和消费是怎样的
目前的情况下,producer会使用Round Robin的方式把消息发送到不同的queue中,consumer消费消息的时候是通过多线程从各个queue上获取消息的,显然不能保证有序性。
(3)全局有序
全局有序:在某个topic下,所有的消息都要保证消费有序
此时topic中的queue只能有一个,这样producer往一个queue上发送消息,并且consumer只能有一 个,并且采用单线程的方式消息。 但在高并发的场景下,性能比较低,所以一般情况下用得比较少。
(4)局部有序
局部有序:某个topic下有多个queue,但是每个queue中的消息被消费时都是有序的
此时需要保证只有一个producer,并且选择指定的队列进行发送消息,同时consumer中使用单线程进行消费 这时候在高并发的场景下,性能比较高,所以这种局部有序用得比较多。
(5)局部有序的应用场景
比如在下单场景中,用户下了一个订单,订单的orderid为order001,接下来会向指定的queue发送消息,并且是按照顺序发送的,比如:订单支付消息、订单发货消息、订单物流消息等,然后consumer消费的时候,需要按照这种顺序进行消息,所以需要保证单线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
public class OrderlyConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("orderly_topic","*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context){ for (MessageExt msg : msgs) { try { Thread.sleep(new Random().nextInt(5)); System.out.println(Thread.currentThread().getName()+ " "+"queueId:"+msg.getQueueId()+ " body: "+new String(msg.getBody())); } catch (Exception e) { e.printStackTrace(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
public class OrderlyProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("orderly_producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); String[] flows = {"订单支付", "订单发货", "订单物流","订单完成"}; for (int i = 0; i < 4; i++) { String flow = flows[i % 4]; Message message = new Message("orderly_topic", "TagA", flow.getBytes()); SendResult sendResult = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer queueId = (Integer) arg; System.out.println("queueId: " + queueId); return mqs.get(queueId); } }, 1); System.out.println(sendResult); } } }
|
2.4.6 事务消息-Transactional Message
https://rocketmq.apache.org/docs/featureBehavior/04transactionmessage
RocketMQ提供类似XA或Open XA的分布式事务功能,通过消息队列RocketMQ事务消息,能达到分布式事务的最终一致。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| public class TransactionConsumer{ public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQConsumer("transaction_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("transaction_topic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, COnsumeConcurrentlyContext context) { for(MessageExt msg : msgs){ System.out.println(msg.getTags()+" : "+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer started..") } }
public class TransactionProducer{ public static void main(String[] args) throws Exception { TransactionMQProducer producer = new TransactionMQProducer("trans_producer_group"); TransactionListener transactionListener = new TransactionListenerImpl(); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setNamesrvAddr("localhost:9876"); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for(int i = 0 ; i < 10 ; i++){ try { Message msg = new Message("transaction_topic", tags[i % tags.length], "KEY"+i, ("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.pringln("%s%n", sendResult); Thread.sleep(10); } catch (Exception e){ e.printStackTrace(); } } } }
class TransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("本地事务执行。。。"); if(msg.getTags().equals("TagA")){ return LocalTransactionState.COMMIT_MESSAGE; } else if(msg.getTags().equals("TagB")) { return LocalTransactionState.ROLLBACK_MESSAGE; } else { return LocalTransactionState.UNKNOW; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { return LocalTransactionState.COMMIT_MESSAGE; } }
|
3 RocketMQ刷盘机制
所谓的刷盘机制就是将broker application内存中的数据,持久化到commitlog文件中。 可以采用同步刷盘或者异步刷盘的方式。
broker.conf文件中的配置项为:flushDiskType =SYNC_FLUSH / ASYNC_FLUSH
3.1 同步刷盘
所谓的同步刷盘如图所示,也就是说当producer发送消息给broker时,消息只要持久化到磁盘中才算成功。

3.2 异步刷盘
所谓的异步刷盘,就是producer将消息发送给broker之后,broker虽然也会进行刷盘操作,但是对于producer而言不关心,直接返回success。

4 RocketMQ零拷贝
RocketMQ中使用的是mmap的方式,当然也有其他实现方式,比如sendfile。mmap适合小数据量读写,sendFile 适合大文件传输。

5 SpringBoot集成
https://docs.spring.io/spring-boot/docs/2.7.6/reference/html/messaging.html#messaging
(1)引入依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency>
|
(2)写配置
1 2 3 4 5 6
| rocketmq: name-server: 127.0.0.1:9876 producer: group: spring-boot-producer-group consumer: group: spring-boot-consumer-group
|
(3)生产者
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RestController @RequestMapping("/rocketmq") public class RocketMQController { @Resource private RocketMQTemplate rocketMQTemplate; @RequestMapping("/produce") public String produce(){ this.rocketMQTemplate.convertAndSend("springboot-topic", "Hello Spring Boot Rocketmq"); return "produce success"; } }
|
(4)消费者
1 2 3 4 5 6 7 8
| @Service @RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}",topic = "springboot-topic") public class SpringBootConsumer implements RocketMQListener<String> { @Override public void onMessage(String msg) { System.out.println("Spring Boot 获取到消息内容: "+msg); } }
|
6 Spring Cloud集成(Spring Cloud Stream)
https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ-en
https://github.com/alibaba/spring-cloud-alibaba/blob/2.2.x/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md
引入依赖
1 2 3 4
| <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
|
6.1 SC Stream3.X之前的用法
6.1.1 默认用法
(1)写配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| server: port: 9999 spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 group: stream-group bindings: output: destination: springcloud-stream-topic input: destination: springcloud-stream-topic
|
(2)添加注解,在入口类添加@Enable驱动
1 2 3 4 5 6 7 8
| @SpringApplication
@EnableBinding({Source.class, Sink.class}) public class SpringCloudStreamDemoApplication{ public static void main(String[] args) { SpringApplication.run(SpringCloudStreamDemoApplication.class, args); } }
|
(3)生产者
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RestController @RequestMapping("/producer") public class ProducerController{ @Resource private Source source; @RequestMapping("/stream-produce") public String streamProduce(){ this.source.output().send(MessageBuilder.withPayload("stream msg...").build()); return "stream produce successfully."; } }
|
(4)消费者
1 2 3 4 5 6 7
| @Service public class StreamConsumer{ @StreamListener(Sink.INPUT) public void receiveMsg(String msg){ System.out.println("receive msg: " + msg); } }
|
6.1.2 自定义用法
(1)模仿官方的Source接口,自定义TextSource接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT) MessageChannel output(); }
public interface TestSource {
String OUTPUT = "test-output";
@Output(TestSource.OUTPUT) MessageChannel output(); }
|
(2)模仿官方的Sink接口,自定义TestSource接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public interface Sink { String INPUT = "input";
@Input(Sink.INPUT) SubscribableChannel input(); }
public interface TestSink { String INPUT = "test-input"; @Input(TestSink.INPUT) SubscribableChannel input(); }
|
(3)写配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| server: port: 9999 spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 group: stream-group bindings: output: destination: springcloud-stream-topic input: destination: springcloud-stream-topic test-output: destination: springcloud-stream-topic test-input: destination: springcloud-stream-topic
|
(4)写注解:在入口类上添加TestSource和TestSink
1 2 3 4 5 6 7
| @SpringBootApplication @EnableBinding({Source.class, Sink.class,TestSource.class,TestSink.class}) public class SpringCloudStreamDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringCloudStreamDemoApplication.class, args); } }
|
(5)写代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Resource private TestSource testSource; @RequestMapping("/test-stream-produce") public String testStreamProduce(){ this.testSource.output().send(MessageBuilder.withPayload("test stream msg...").build()); return "test stream produce successfully."; }
@Service public class TestStreamConsumer { @StreamListener(TestSink.INPUT) public void receiveMsg(String msg) { System.out.println("test receive msg: " + msg); } }
|
6.2 SC Stream 3.X之后的用法
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producing_and_consuming_messages