seata源码解析:分支事务的提交或回滚

网友投稿 329 2022-09-07

seata源码解析:分支事务的提交或回滚

介绍

一个全局事务是由多个分支事务组成的,而ResourceManager就是用来管理分支事务的,主要有如下3个功能

向TC注册资源接收TC的commit请求,提交分支事务接收TC的rollback请求,回滚分支事务

因为Seata支持多种模式,每种模式都有对应的资源管理器,如TCCResourceManager,DataSourceManager,ResourceManagerXA等

DefaultResourceManager

DefaultResourceManager包含了所有ResourceManager,它根据分支事务的类型来调用对应的ResourceManager执行具体的操作

public class DefaultResourceManager implements ResourceManager { /** * all resource managers */ protected static Map resourceManagers = new ConcurrentHashMap<>(); private DefaultResourceManager() { initResourceManagers(); } protected void initResourceManagers() { //init all resource managers List allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class); if (CollectionUtils.isNotEmpty(allResourceManagers)) { for (ResourceManager rm : allResourceManagers) { resourceManagers.put(rm.getBranchType(), rm); } } } }

DataSourceManager

注册分支事务的逻辑都差不多,通过RmNettyRemotingClient向TC发送注册消息,并在本地保存resourceId和Resource的映射关系,当提交或者回滚的时候就能根据resourceId拿到对应的Resource

二阶段提交

当TM向TC发起全局提交请求时,此时分支事务实际上以及提交了,TC立即释放该全局事务的锁,然后异步调用RM清理回滚日志

二阶段回滚

当RM收到TC发来的回滚请求时,根据xid和branchid找到对应的回滚记录,通过回滚记录生成反向的SQL并执行,完成分支的回滚。当分支回滚结束时,通过TC回滚完成,当所有分支都回滚完成时,才会释放全局事务的锁

RM在回滚的时候会有如下的校验

beforeImmage等于afterImage则不用回滚(数据没有发生变化),否则执行下一步如果当前的记录等于afterImage,则回滚,否则执行下一步如果当前的记录等于beforeImage,则不用会滚了,否则抛出异常(当前记录和beforeImage和afterImage都不相等),说明发生了脏写

public class DataSourceManager extends AbstractResourceManager { @Override public void registerResource(Resource resource) { DataSourceProxy dataSourceProxy = (DataSourceProxy) resource; dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy); super.registerResource(dataSourceProxy); } @Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { // 将分支提交信息包装成 Phase2Context 插入内存中的异步提交队列,异步删除undoLog return asyncWorker.branchCommit(xid, branchId, resourceId); } @Override public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { DataSourceProxy dataSourceProxy = get(resourceId); if (dataSourceProxy == null) { throw new ShouldNeverHappenException(); } try { // 根据 undoLog 构造回滚 sql 并执行 UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId); } catch (TransactionException te) { StackTraceLogger.info(LOGGER, te, "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]", new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()}); if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) { return BranchStatus.PhaseTwo_RollbackFailed_Unretryable; } else { return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } return BranchStatus.PhaseTwo_Rollbacked; }}

TCCResourceManager

TCC事务解决方案主要分为2个阶段

第一阶段为prepare阶段,锁定相关的资源第二阶段为commit/rollback阶段,根据全局事务的执行状态来对分支事务进行commit/rollback阶段

理解了思路我们再来看代码就非常简单了,commit和rollback代码非常类似,因此就只分析一个分支了

提交阶段:反射调用@TwoPhaseBusinessAction指定的commitMethod方法

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId); if (tccResource == null) { throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId)); } Object targetTCCBean = tccResource.getTargetBean(); Method commitMethod = tccResource.getCommitMethod(); if (targetTCCBean == null || commitMethod == null) { throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId)); } try { //BusinessActionContext // 根据applicationData(分支事务注册的时候把相关信息放到了这里,比如commitMethod,rollbackMethod),xid等重新构建BusinessActionContext BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData); // 反射调用 commitMethod 方法 Object ret = commitMethod.invoke(targetTCCBean, businessActionContext); LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", ret, xid, branchId, resourceId); boolean result; if (ret != null) { if (ret instanceof TwoPhaseResult) { result = ((TwoPhaseResult)ret).isSuccess(); } else { result = (boolean)ret; } } else { result = true; } return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable; } catch (Throwable t) { String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid); LOGGER.error(msg, t); return BranchStatus.PhaseTwo_CommitFailed_Retryable; }}

从反射的逻辑我们可以看到对commitMethod和rollbackMethod的要求 入参只能是BusinessActionContext,返回值只能是boolean类型或者TwoPhaseResult

ResourceManagerXA

因为很多数据库都支持XA协议,因此XA模式的实现根简单,只需要通过XAResource来进行commit或者rollback即可

参考博客

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SocialMarketing:百度父亲节:用最硬的方式,做最软的事!
下一篇:[React Native]瀑布流AutoResponsive
相关文章

 发表评论

暂时没有评论,来抢沙发吧~