<> Transaction message source code learning

<>TransactionMQProducer Send transaction message

When a transaction message is sent , It needs to be marked accordingly , Distinguish from ordinary messages
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED,
"true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,
to broker After sending the message , According to the return status , Deal with it accordingly

SEND_OK:LocalTransactionExecuter perhaps TransactionListener Execute local transactions

And return to the local transaction completion status , include UNKNOW,ROLLBACK,COMMIT


They are all message sending failure status , Mark local transaction status as ROLLBACK_MESSAGE

After that, it was approved endTransaction Return the corresponding local transaction execution status information to the broker. Note that the message is sent by one way

<>broker End processing transaction message

TransactionMessageBridge, Responsible for the main transaction message storage logic .

Of transaction message topic Set to TransactionalMessageUtil.buildHalfTopic(), Namely
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner)); } private
MessageExtBrokerInnerparseHalfMessageInner(MessageExtBrokerInner msgInner) {
// Hide the real topic and queueId MessageAccessor.putProperty(msgInner, MessageConst.
PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner,
MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),
MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(
TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.
getProperties())); return msgInner; }
EndTransactionProcessor Of processRequest method , handle producer Transaction status returned by the end

If the transaction status is commit, Restore message to original topic and queueId, Store in commitLog
in , And delete the preprocessing message (prepare), In fact, the message is stored in the :RMQ_SYS_TRANS_OP_HALF_TOPIC
In the theme of , Represents that the message has been processed ( Commit or roll back )

If the transaction status is rollbackMessage, Delete prepare news , The message is also stored in the :RMQ_SYS_TRANS_OP_HALF_TOPIC
In the theme of , Represents that the message has been processed

<>half Message queuing and op Message queuing

half Message consumption queue : prepare Message consumption queue , The transaction message first enters the message consumption queue

op Message consumption queue : After transaction message processing , get into op Message consumption queue ,op Message consumption queue is mainly used to record transaction message completion status

<> Fixed time task check back

If the first time producer The transaction message returned is UNKNOW, You need to check back the transaction

Transaction check back ,broker The main logic in the TransactionalMessageService Of check method

prepare news , Will be stored in RMQ_SYS_TRANS_HALF_TOPIC In message queue

prepare news , After being processed , Will be stored in RMQ_SYS_TRANS_OP_HALF_TOPIC In message queue

So through the back check prepare Message queuing , Some failed transaction messages can be reported , Try again .

In order to make full use of it commitLog Characteristics of sequential writing ,

When checking back , As long as the callback message is sent ,pepare
The consumption progress of message consumption queue will be pushed forward , At the same time prepare Message queuing writes a new message , If the back check fails , The new message can be sent back again . If the back check is successful , According to op Messages in message queuing , Judgment repetition , Avoid sending check back messages repeatedly .

producer The end transaction back query processing logic is mainly in the TransactionListener Of checkLocalTransaction method , General rewriting
checkLocalTransaction method , Implementation of user-defined back look-up logic .