首先发送第一阶段信息直接返回半提交状态,然后执行本地事务返回事务的三种状态,未知,回滚,提交,最后执行endTransaction方法,把事务执行的状态告诉broker 。
endTransaction方法根据本地事务执行状态构建requestHeader对象执行二阶段提交 。
public void endTransaction(final Message msg,final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {final MessageId id;// 获取消息中的MessageIdif (sendResult.getOffsetMsgId() != null) {id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {id = MessageDecoder.decodeMessageId(sendResult.getMsgId());}String transactionId = sendResult.getTransactionId();// 找到broker地址final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());// 构建EndTransactionRequestHeader对象EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);// offset是prepare消息中offsetMsgId中获取的requestHeader.setCommitLogOffset(id.getOffset());requestHeader.setBname(sendResult.getMessageQueue().getBrokerName());// 社会提交/回滚状态switch (localTransactionState) {case COMMIT_MESSAGE:// 提交requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:// 回滚requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW:// 未知requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;// 发送给broker端this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout());}将本地方法执行事务的结果发送给Broker,通过endTransactionOneway方法创建Code为END_TRANSACTION的消息,然后在Broker就会找出对应的Processor来处理 。
Broker端处理 Broker总共存在两个处理,首先针对第一个阶段发送的Half消息,broker要进行相关的操作,后面endTransaction提交进来的事务状态,针对三种状态进行相关操作 。
接收第一阶段发送的Half消息SendMessageProcessor的sendMessage方法中去执行处理事务消息 。
// 发送Half消息时,在属性中设置了PROPERTY_TRANSACTION_PREPARED为true,这里根据这个属性判断是否是事务消息String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(traFlag)&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return response;}// 事务消息进入这里 , 把消息的topic改成RMQ_SYS_TRANS_HALF_TOPIC , 以同步刷盘的方式存入storeputMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);}如果消息携带事务标记就去执行TransactionMessageService类的prepareMessage方法进行相关的处理 。
// 解析Half消息private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {// 把真实的topic和真实的queueId放在消息的属性中MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));// 设置默认的事务状态为TRANSACTION_NOT_TYPE=>unknowmsgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));// 将消息的topic设置为RMQ_SYS_TRANS_HALF_TOPIC,这个是对消费者不可见的msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());// 设置queueId=0msgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner;}进行topic的切换,将原来的topic存入到消息的属性里面 , 将消息的topic设置为RMQ_SYS_TRANS_HALF_TOPIC 。
处理endTransaction方法在endTransaction方法中将消息同步给Broker处理的Code对应为END_TRANSACTION,Broker就会找出对应的Processor来处理该类即调用EndTransactionProcessor类的processRequest方法处理 。
推荐阅读
- 项目开发中什么场景下Redis适用?
- 数量过百,国产游戏版号发放创下新高,游戏行业发展正形成三大共识
- 又一位“睡衣男”榜一大哥?女主播直播效果搞砸,误以为地下恋情
- 无尽贪婪手机版怎么下
- 分手26年,赖文峰出狱后结婚到乡下度日,杨钰莹的选择令人心疼
- 干粉灭火器的压把怎样才能压下去 干粉式灭火器怎么把压力卸掉
- 手机WPS怎么提取页面,手机wps做的PPT怎么保存下来
- 7天狂赚一个亿!被封杀,被下架,也挡不住观众连夜排队送钱…
- 文章:39岁放下一切再出发,拼尽全力打出王炸,重新找回人生真谛
- 剪映片尾怎么去掉,剪映下载本地视频怎么去掉剪映号
