作者:君哥聊技术
来源:
https://mp.weixin.qq.com/s/n9QlZ73SQyCGIyPLvHMy0A
大家好,我是君哥 。今天聊一聊 RocketMQ 的顺序消息实现机制 。
在有些场景下,使用 MQ 需要保证消息的顺序性,比如在电商系统中,用户提交订单、支付订单、订单出库这 3 个消息应该保证顺序性,如下图:

文章插图
对于 RocketMQ 来说,主要是通过 Producer 和 Consumer 来保证消息顺序的 。
1 Producer下面代码是 Producer 发送顺序消息的官方示例:
public static void main(String[] args) throws UnsupportedEncodingException { try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown(); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace(); }}跟发送并发消息不一样的是,发送消息时传入了 MessageQueueSelector,这里可以指定消息发送到固定的 MessageQueue 。注意:上面的代码把 orderId 相同的消息都会发送到同一个 MessageQueue,这样同一个 orderId 的消息是有序的,这也叫做局部有序 。对应的另一种是全局有序,这需要把所有的消息都发到同一个 MessageQueue 。
下面再来看一下发送的代码:
private SendResult sendSelectImpl( Message msg, MessageQueueSelector selector, Object arg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //省略部分逻辑 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}//省略部分逻辑if (mq != null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);} }//省略部分逻辑}可以看到,在发送的时候,使用 MessageQueueSelector 选择一个 MessageQueue,然后发送消息到这个 MessageQueue 。对于并发消息,这里不传 MessageQueueSelector,如果发送方法没有指定 MessageQueue,就会按照默认的策略选择一个 。2 Consumer以 RocketMQ 推模式为例,消费者会注册一个监听器,进行消息的拉取和消费处理,下面的 UML 类图显示了调用关系:

文章插图
上图中包含了对顺序消息和对并发消息的处理 。其中 MessageListenerOrderly 和 ConsumeMessageOrderlyService 对顺序消息进行处理 。跟并发消息不一样的是,顺序消息定义了一个 MessageQueueLock 类,这个类保存了每个 MessageQueue 对应的锁,代码如下:
private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();下面代码是顺序消费的官方示例:public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;} else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;} }); consumer.start(); System.out.printf("Consumer Started.%n");}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 新手怎么加入自媒体,学会这4个运营技巧,你也可以出爆文
- 如何重启MySQL服务
- 来自远程技术提供者的建议,如何阻止他人远程访问你的移动设备?
- 苹果|iPhone开售15周年:34款机型 你用过哪些?
- 大排怎么油炸,油炸大排怎么做法-
- 为什么想应聘文员,面试官问你为什么不做销售想做文员-
- 大爷|59岁离异大妈带简历相亲,要彩礼,还得工资上交:我优秀,你不亏
- 我的解放日志|看了《我的解放日志》,你是致郁了还是治愈了?
- 国产|国产桌面操作系统开发者平台发布:你知道哪些国产系统?
- 大学|高考填报志愿时,省外大学值得报考么?这些你值得了解
