一,相关概念
1.1,最终一致性
RocketMQ是一种最终一致性的分布式事务,就是说它保证的是消息最终一致性。
可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性,RocketMQ主要解决了两个功能:
1、本地事务与消息发送的原子性问题。
2、事务参与方接收消息的可靠性。
可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景
。引入消息机制后,同步的事务操作变为基于消息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。
1.2,Half Message(半消息)
是指暂不能被 Consumer 消费的消息。Producer 已经把消息成功发送到了 Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Produce对消息的二次确认后,Consumer才能去消费它。
1.3,消息回查
由于网络闪段,生产者应用重启等原因。导致 Producer 端一直没有对 Half Message(半消息) 进行 二次确认。这是 Brock 服务器会定时扫描长期处于半消息的消息,会主动询问 Producer端 该消息的最终状态(Commit或者Rollback),该消息即为 消息回查。
二,RocketMQ分布式事务原理
两个核心概念:两阶段提交、事务状态定时回查
1、A服务先发送个Half Message给Brock端。
2、当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。
3、执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)
4.1、如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。
4.2、如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。
4.3、如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查。
只有A服务本地事务执行成功 ,B服务才能消费该message。
MQ订阅方(积分服务)消费消息,消费成功则向 MQ 回应 ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。
2.1,两阶段提交
第一阶段:生产者向 MQ 服务器发送事务消息,即 prepare 消息,服务端确认后回调通知生产者执行本地事务(此时消息为Prepare消息,存储于RMQ_SYS_TRANS_HALF_TOPIC 队列中,不会被消费者消费)
第二阶段:生产者执行完本地事务后(业务执行完成,同时将消息唯一标记,如 transactionId 与该业务执行记录同时入库,方便事务回查),根据本地事务执行结果,返回 Commit/Rollback/Unknow 状态码
- 服务端若收到 Commit 状态码,则将 prepare 消息变为提交;
- 收到 Rollback 则对消息进行回滚;
- 若状态为 Unknow,则等待 MQ 服务端定时发起消息状态回查,超过一定重试次数或者超时,消息会被丢弃。
2.2,为什么要先发送Half Message(半消息)
1,可以先确认 broker 服务器是否正常 ,如果半消息都发送失败了 那说明 broker 挂了。
2,可以通过半消息来回查事务,如果半消息发送成功后一直没有被二次确认,那么就会回查事务状态。
2.3,什么情况会回查
1,执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(commit或者rollback)导致最终返回 UNKNOW,那么就会回查。
2,本地事务执行成功后,返回Commit进行消息二次确认的时候的服务挂了,在重启服务那么这个时候在 broker 端它还是个Half Message(半消息),这也会回查。
2.4,代码实战
事务消息生产者
事务消息发送消息对象是 TransactionMQProducer,为了实现本地事务操作和回查,我们需要创建一个监听器,监听器需要实现 TransactionListener 接口。
// 1、创建TransactionMQProducer
// 2、设置Namesrv地址
// 3、指定消息监听对象
// 4、指定线程池,用于执行本地事务和消息回查
// 5、开启TransactionMQProducer
// 6、创建消息Message
// 7、发送事务消息
// 8、关闭TransactionMQProducer
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 1、创建TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("demo_producer_transaction_group");
// 2、设置Namesrv地址
producer.setNamesrvAddr("");
// 3、指定消息监听对象,用于执行本地事务和消息回查
producer.setTransactionListener(new TransactionListenerImpl());
// 4、指定线程池
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
);
producer.setExecutorService(executorService);
// 5、开启TransactionMQProducer
producer.start();
// 6、创建消息Message
// topic:主题,tags: 标签,主要用于消息过滤,keys:消息的唯一值,body:消息体
Message message = new Message(
"Topic_Transaction_Demo",
"Tags",
"Keys_T",
"Hello Transaction RocketMQ Message".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 7、发送事务消息
// 第一个参数:发送的消息信息
// 第二个参数:选择指定的消息队列对象(会将所有的消息队列传进来)
TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");
System.out.println(result);
// 8、关闭TransactionMQProducer
producer.shutdown();
}
}
监听器代码
public class TransactionListenerImpl implements TransactionListener {
/**
* 存储对应事务的状态信息,key:事务ID,value:当前事务执行的状态
*/
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 执行本地事务
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 事务ID
String transactionId = message.getTransactionId();
// 0:执行中,状态未知,1:本地事务执行成功,2:本地事务执行失败
localTrans.put(transactionId, 0);
// 业务执行,处理本地事务
System.out.println("Hello-Transaction");
try {
System.out.println("正在执行本地事务");
Thread.sleep(2000);
System.out.println("本地事务执行成功");
localTrans.put(transactionId, 1);
} catch (InterruptedException e) {
e.printStackTrace();
localTrans.put(transactionId, 2);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
/**
* 消息回查
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 获取事务ID
String transactionId = messageExt.getTransactionId();
// 获取对应事务ID的执行状态
Integer status = localTrans.get(transactionId);
System.out.println("消息回查--TransactionId:" + transactionId + ", 状态:" + status);
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
事务消息消费者
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
// 1、创建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_transaction_group");
// 2、设置namesrv地址
consumer.setNamesrvAddr("");
// 3、设置消息拉取最大数
consumer.setConsumeMessageBatchMaxSize(2);
// 设置消息顺序
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 4、设置subscribe,这里是要读取订单主题信息
// topic:指定消费的主题,subExpression:过滤规则
consumer.subscribe("Topic_Transaction_Demo",
"*");
// 5、创建消息监听MessageListener
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt msg: list) {
try {
String topic = msg.getTopic();
String tags = msg.getTags();
String keys = msg.getKeys();
String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("topic: " + topic + ", tags: " + tags + ", keys: " + keys + ", body: " + body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 开启Consumer
consumer.start();
}
}
评论区