分布式场景下的事务机制( 三 )


if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {// 根据commitLogOffset获取文件中的message,获取到了返回successresult = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 检查消息是否一致RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 生成要保存的消息MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);// 把真实的topic消息存储到CommitLog中RemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {// 移除prepare消息,存入opQueueMap中this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}// 回滚} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {// 查询到half消息则返回成功result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 检查消息是否一致RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 移除prepare消息,存入opQueueMap中this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}仅仅展示相关核心代码,其主要逻辑:首先去判断请求的方式是commit还是rollback,如果是commit查询到消息还原消息原来的topic , 然后删除half topic上的消息转存到opQueueMap中,如果是rollback直接进行删除half topic上的消息并转存到opQueueMap中去 。
注意:opQueueMap的引入为了解决有可能出现网络、进程、线程等各种因素导致消费端未能成功处理消息的情况,该机制的作用是在消费者端将未成功处理的消息重新发送到服务端进行重试 , 直到确认消息已经被成功处理或者达到最大重试次数后进行回滚操作 。而 Op 消息本身则是通过修改消息状态来实现的 。
消息回查当网络中断或者响应超时等各种异常信息导致消息并没有传送到broker端去,为了解决这一问题在Broker就开启一个回查线程每隔一分钟执行一次处理超过6s未回查的消息 , 当超过15次回查后直接将消息丢弃 。
在启动BrokerController类时 , 会去调用startProcessorByHa方法如果是Master节点就会去启动一个线程每隔6s处理未回查的消息,检查最大次数为15次 。
public void run() {log.info("Start transaction check service thread!");long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();while (!this.isStopped()) {this.waitForRunning(checkInterval);}log.info("End transaction check service thread!");}protected void onWaitEnd() {long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();long begin = System.currentTimeMillis();log.info("Begin to check prepare message, begin time:{}", begin);// 检查回查消息 timeout = 6s checkMax=15this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);}在check方法里面去调用listener.resolveHalfMsg(msgExt)方法去处理事务消息 。
public void resolveHalfMsg(final MessageExt msgExt) {executorService.execute(new Runnable() {@Overridepublic void run() {try {sendCheckMessage(msgExt);} catch (Exception e) {LOGGER.error("Send check message error!", e);}}});}执行sendCheckMessage方法发送一个检查事务状态的Code为CHECK_TRANSACTION_STATE的消息,在客户端MQClientAPIImpl初始化的时候就会去注册一个Code对应的Processor,最终就会去执行checkTransactionState方法,判断本地事务的状态,然后再去执行endTransactionOneway发起END_TRANSACTION处理 。
public void checkTransactionState(final String addr, final MessageExt msg,final CheckTransactionStateRequestHeader header) {Runnable request = new Runnable() {private final String brokerAddr = addr;private final MessageExt message = msg;private final CheckTransactionStateRequestHeader checkRequestHeader = header;private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();// 执行线程方法@Overridepublic void run() {TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();TransactionListener transactionListener = getCheckListener();if (transactionCheckListener != null || transactionListener != null) {LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable exception = null;try {if (transactionCheckListener != null) {localTransactionState = transactionCheckListener.checkLocalTransactionState(message);} else if (transactionListener != null) {log.debug("Used new check API in transaction message");// 检查本地事务localTransactionState = transactionListener.checkLocalTransaction(message);} else {log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);}} catch (Throwable e) {log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);exception = e;}// 处理事务状态this.processTransactionState(localTransactionState,group,exception);} else {log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);}}//private void processTransactionState(final LocalTransactionState localTransactionState,final String producerGroup,final Throwable exception) {final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());thisHeader.setProducerGroup(producerGroup);thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());thisHeader.setFromTransactionCheck(true);thisHeader.setBname(checkRequestHeader.getBname());String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqueKey == null) {uniqueKey = message.getMsgId();}thisHeader.setMsgId(uniqueKey);thisHeader.setTransactionId(checkRequestHeader.getTransactionId());switch (localTransactionState) {// 提交状态case COMMIT_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;// 回滚状态case ROLLBACK_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);log.warn("when broker check, client rollback this transaction, {}", thisHeader);break;// 未知状态case UNKNOW:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);log.warn("when broker check, client does not know this transaction state, {}", thisHeader);break;default:break;}String remark = null;if (exception != null) {remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);}doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);try {// 再次执行endTransactionOneway发起END_TRANSACTIONDefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);} catch (Exception e) {log.error("endTransactionOneway exception", e);}}};this.checkExecutor.submit(request);}


推荐阅读