c语言sscanf函数的用法是什么
280
2023-04-01
RocketMq事务消息发送代码流程详解
一、RocketMq事务消息流程:
1、首先会向broker发送一个预请求消息,消费者不可见
2、回调执行本地事务(比如操作数据库)
3、事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见。如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功。
二、RocketMq事务消息实例:
1、引入rocketMq相关的依赖:
2、创建一个TransactionProducer类:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
//创建生产者并制定组名
TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group");
//2.指定Nameserver地址
producer.setNamesrvAddr("192.168.***.***:9876");
//3、指定消息监听对象用于执行本地事务和消息回查
TransactionListener listener = new TransactionListenerImol();
producer.setTransactionListener(listener);
//4、线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue
@Override
public Thread newThread(Runnable r) {
Thread thread = newThread(r);
thread.setName("client-tanscation-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
//5、启动producer
producer.start();
//6.创建消息对象,指定主题Topic、Tag和消息体 String topic, String tags, String keys, byte[] body
Message message = new Message("Topic_transaction_demo", //主题
"Tags", //主要用于消息过滤
"Key_1", //消息唯一值
("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));
//7、发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");
producer.shutdown();
}
}
3、发送事务消息还需要一个事务监听对象,它实现TransactionListener 接口,其中有两个方法作用分别是执行本地事务和消息回查:
public class TransactionListenerImol implements TransactionListener {
//存储事务状态信息 key:事务id value:当前事务执行的状态
private ConcurrentHashMap
//执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//事务id
String transactionId = message.getTransactionId();
//0:执行中,状态未知 1:执行成功 2:执行失败
localTrans.put(transactionId, 0);
//业务执行,本地事务,service
System.out.println("hello-demo-transaction");
try {
System.out.println("正在执行本地事务---");
Thread.sleep(60000*2);
System.out.println("本地事务执行成功---");
localTrans.put(transactionId, 1);
} catch (InterruptedException e) {
e.printStackTrace();
localTrans.put(transactionId, 2);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
//消息回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//获取对应事务的状态信息
String transactionId = messageExt.getTransactionId();
//获取对应事务id执行状态
Integer status = localTrans.get(transactionId);
//消息回查
System.out.println("消息回查---transactionId:" + transactionId + "状态:" + status);
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LmMbCoPiXocalTransactionState.UNKNOW;
}
}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~