What are transaction messages ?

Transaction message , It can be considered as a message implementation based on two-phase commit theory , It is used to ensure the final consistency in the distributed system . Transactional messages ensure that the execution of local transactions and the sending of messages can be performed atomically .

Restrictions on use

(1) Timing and batch messages are not supported .
(2) In order to avoid a single message being checked too many times , Results in half queue message accumulation , We will
The number of checks for a single message is limited to by default 15 second , But users can change broker In configuration “transactionCheckMax” parameter , To change this restriction
. If a message is checked “TransactionCheckMax” second , So by default ,broker This message will be discarded , And print the error log at the same time . Users can rewrite the “AbstractTransactionCheckListener” Class to change this behavior .
(3) Transactional message , Support timeout setting
. It will be in broker Configuration parameters for “transactionTimeout” After a specified period of time , Check . Users can also send transactional messages , By setting user properties “CHECK_IMMUNITY_TIME_IN_SECONDS” To change this restriction . This parameter takes precedence over “TransactionMsgtimeout” parameter .
(4) Transactional messages may be checked or used multiple times .
(5) Resend the submitted message to the user's target topic , It can fail . at present , It depends on logging .Rocketmq Its high availability mechanism ensures high availability .
If you want to ensure that transactional messages are not lost , And transaction integrity is guaranteed , Synchronous is recommended double write mechanism .
(6) Producers of transactional messages ID Cannot be associated with producers of other types of messages ID share . Unlike other types of messages ,
Transactional messages allow reverse queries , Namely MQ The server can be based on the producer ID To query the producer instance .


1, Transaction status

Transactional messages have three states :
(1) TransactionStatus.CommitTransaction: Commit transaction , This means allowing consumers to consume the message .
(2) TransactionStatus.RollbackTransaction: Rollback transaction , Means that the message will be deleted , And no consumption is allowed .
(3) TransactionStatus.Unknown: Intermediate state , This means that MQ A second check is required to determine the status .

2, Send transaction message

(1) Creating transactional producers

use TransactionMQProducer Class creation Producer client , And specify unique ProducerGroup, You can set up a custom thread pool to handle check requests .
After executing the local transaction , You need to reply according to the execution result MQ The server , The transaction status of the reply is described in the above section .
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import
import org.apache.rocketmq.common.message.MessageExt; import java.util.List;
public class TransactionProducer { public static void main(String[] args)
throws MQClientException, InterruptedException { TransactionListener
transactionListener = new TransactionListenerImpl();
// establish TransactionMQProducer Transactional producer instance TransactionMQProducer producer = new
TransactionMQProducer("please_rename_unique_group_name"); ExecutorService
executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new
ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public
Thread newThread(Runnable r) { Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread"); return thread; } });
// Set up custom thread pool , To process the check request producer.setExecutorService(executorService); // Set up transaction listener
producer.setTransactionListener(transactionListener); producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int
i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i %
tags.length], "KEY" + i, ("Hello RocketMQ " +
// call sendMessageInTransaction method , To send transactional messages SendResult sendResult =
producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n",
sendResult); Thread.sleep(10); } catch (MQClientException |
UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i <
100000; i++) { Thread.sleep(1000); } producer.shutdown(); } } ```
(2) realization TransactionListener Interface
“executeLocalTransaction” Method is used to send half When the message is successful , Execute local transactions . It returns one of the three transaction states mentioned in the previous section .

“check local transaction” Method is used to check the state of local transactions , And respond MQ Inspection request . It also returns one of the three transaction states mentioned in the previous section .
import ... public class TransactionListenerImpl implements
TransactionListener { private AtomicInteger transactionIndex = new
AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new
ConcurrentHashMap<>(); @Override public LocalTransactionState
executeLocalTransaction(Message msg, Object arg) { int value =
transactionIndex.getAndIncrement(); int status = value % 3;
localTrans.put(msg.getTransactionId(), status); return
LocalTransactionState.UNKNOW; } @Override public LocalTransactionState
checkLocalTransaction(MessageExt msg) { Integer status =
localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) {
case 0: return LocalTransactionState.UNKNOW; case 1: return
LocalTransactionState.COMMIT_MESSAGE; case 2: return
LocalTransactionState.ROLLBACK_MESSAGE; } } return
LocalTransactionState.COMMIT_MESSAGE; } } ```