世界微动态丨分布式事务(Seata)原理详解篇
2022-07-11 05:58:07来源:牧小农
今天这篇,就给大家分析一下Seata的源码是如何一步一步实现的。读源码的时候我们需要俯瞰起全貌,不要去扣一个一个的细节,这样我们学习起来会快捷而且有效率,我们学习源码需要掌握的是整体思路和核心点。
首先Seata客户端启动一般分为以下几个流程:
(资料图)
自动加载Bean属性和配置信息。初始化TM。初始化RM。初始化分布式事务客户端完成,完成代理数据库配置。连接TC(Seata服务端),注册RM和TM。开启全局事务。在这篇源码的讲解中,我们主要以AT模式为主导,官网也是主推AT模式,我们在上篇的文章中也讲解过,感兴趣的小伙伴可以去看一看分布式事务(Seata) 四大模式详解,在官网中也提供了对应的流程地址:https://seata.io/zh-cn/docs/dev/mode/at-mode.html ,在这里我们只是做一些简单的介绍,AT模式主要分为两个阶段:
一阶段:解析SQL,获取SQL类型(CRUD)、表信息、条件(where) 等相关信息。查询前镜像(改变之前的数据),根据解析得到的条件信息,生成查询语句,定位数据。执行业务SQL,更新数据。查询后镜像(改变后的数据),根据前镜像的结果,通过主键都给你为数据。插入回滚日志,将前后镜像数据以及业务SQL等信息,组织成一条回滚日志记录,插入到undo Log表中。提交前,向TC注册分支,申请全局锁。本地事务提交,业务数据的更细腻和生成的undoLog一起提交。将本地事务提交的结果通知给TC。二阶段:如果TC收到的是回滚请求:
开启本地事务,通过XID和BranchID查找到对应的undo Log记录。根据undoLog中的前镜像和业务SQL的相关信息生成并执行回滚语句。提交本地事务,将本地事务的执行结果(分支事务回滚的信息)通知给TC。如果没问题,执行提交操作:
收到TC分支提交请求,将请求放入到一个异步任务的队列中,马上返回提交成功的结果给TC。异步任务阶段的分支提交请求删除undoLog中记录。源码入口接下来,我们就需要从官网中去下载源码,下载地址:https://seata.io/zh-cn/blog/download.html,选择source即可,下载完成之后,通过IDEA打开项目。
源码下载下来之后,我们应该如何去找入口呢?首先我们需要找到对应引入的Seata包spring-alibaba-seata,我们在回想一下,我们开启事务的时候,是不是添加过一个@GlobalTransactional的注解,这个注解就是我们入手的一个点,我们在spring.factories中看到有一个GlobalTransactionAutoConfiguration,这个就是我们需要关注的点,也就是我们源码的入口。
在GlobalTransactionAutoConfiguration中我们找到一个用Bean注入的方法globalTransactionScanner,这个就是全局事务扫描器,这个类型主要负责加载配置,注入相关的Bean。
这里给大家展示了当前GlobalTransactionScanner的类关系图,其中我们现在继承了Aop的AbstractAutoProxyCreator类型,在这其中有一个重点方法,这个方法就是判断Bean对象是否需要代理,是否需要增强。
@Configuration@EnableConfigurationProperties(SeataProperties.class)public class GlobalTransactionAutoConfiguration { //全局事务扫描器 @Bean public GlobalTransactionScanner globalTransactionScanner() { String applicationName = applicationContext.getEnvironment() .getProperty("spring.application.name"); String txServiceGroup = seataProperties.getTxServiceGroup(); if (StringUtils.isEmpty(txServiceGroup)) { txServiceGroup = applicationName + "-fescar-service-group"; seataProperties.setTxServiceGroup(txServiceGroup); } // 构建全局扫描器,传入参数:应用名、事务分组名,失败处理器 return new GlobalTransactionScanner(applicationName, txServiceGroup); }}
在这其中我们要关心的是GlobalTransactionScanner这个类型,这个类型扫描@GlobalTransactional注解,并对代理方法进行拦截增强事务的功能。我们就从源码中搜索这个GlobalTransactionScanner类,看看里面具体是做了什么。
/** * The type Global transaction scanner. * 全局事务扫描器 * @author slievrly */public class GlobalTransactionScanner //AbstractAutoProxyCreator AOP动态代理 增强Bean extends AbstractAutoProxyCreator /** * ConfigurationChangeListener: 监听器基准接口 * InitializingBean: Bean初始化 * ApplicationContextAware: Spring容器 * DisposableBean: Spring 容器销毁 */ implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean { private final String applicationId;//服务名 private final String txServiceGroup;//事务分组 private void initClient() { //启动日志 if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } //检查应用名以及事务分组名,为空抛出异常IllegalArgumentException if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) { LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " + "please change your default configuration as soon as possible " + "and we don"t recommend you to use default tx-service-group"s value provided by seata", DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP); } if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup)); } //init TM //初始化TM TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } //init RM //初始化RM RMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Global Transaction Clients are initialized. "); } registerSpringShutdownHook(); } @Override public void afterPropertiesSet() { if (disableGlobalTransaction) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this); return; } if (initialized.compareAndSet(false, true)) { initClient(); } } private void initClient() { //启动日志 if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } //检查应用名以及事务分组名,为空抛出异常IllegalArgumentException if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) { LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " + "please change your default configuration as soon as possible " + "and we don"t recommend you to use default tx-service-group"s value provided by seata", DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP); } //检查应用名以及事务分组名,为空抛出异常IllegalArgumentException if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup)); } //init TM //初始化TM TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } //init RM //初始化RM RMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Global Transaction Clients are initialized. "); } registerSpringShutdownHook(); } //代理增强,Spring 所有的Bean都会经过这个方法 @Override protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { // do checkers //检查bean和beanName if (!doCheckers(bean, beanName)) { return bean; } try { //加锁防止并发 synchronized (PROXYED_SET) { if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; //check TCC proxy //检查是否为TCC模式 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // init tcc fence clean task if enable useTccFence //如果启用useTccFence 失败 ,则初始化TCC清理任务 TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext); //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC //如果是,添加TCC拦截器 interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } else { //不是TCC Class> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); //判断是否有相关事务注解,如果没有不进行代理 if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } //发现存在全局事务注解标注的Bean对象,添加拦截器 if (globalTransactionalInterceptor == null) { //添加拦截器 globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook); ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } interceptor = globalTransactionalInterceptor; } LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName()); //检查是否为代理对象 if (!AopUtils.isAopProxy(bean)) { //不是代理对象,调用父级 bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { //是代理对象,反射获取代理类中已经存在的拦截器组合,然后添加到这个集合中 AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); int pos; for (Advisor avr : advisor) { // Find the position based on the advisor"s order, and add to advisors by pos pos = findAddSeataAdvisorPosition(advised, avr); advised.addAdvisor(pos, avr); } } PROXYED_SET.add(beanName); return bean; } } catch (Exception exx) { throw new RuntimeException(exx); } }}
InitializingBean:中实现了一个afterPropertiesSet()方法,在这个方法中,调用了initClient()。
AbstractAutoProxyCreator:APO动态代理,在之前的的Nacos和Sentiel中都有这个代理类,AOP在我们越往深入学习,在学习源码的会见到的越来越多,越来越重要,很多相关代理,都是通过AOP进行增强,在这个类中,我们需要关注有一个wrapIfNecessary()方法, 这个方法主要是判断被代理的bean或者类是否需要代理增强,在这个方法中会调用GlobalTransactionalInterceptor.invoke()进行带来增强。
具体代码如下:
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor { public GlobalTransactionalInterceptor(FailureHandler failureHandler) { this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler; this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION); this.order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER); degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK); if (degradeCheck) { ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this); degradeCheckPeriod = ConfigurationFactory.getInstance() .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD); degradeCheckAllowTimes = ConfigurationFactory.getInstance() .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES); EVENT_BUS.register(this); if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) { startDegradeCheck(); } } this.initDefaultGlobalTransactionTimeout(); } @Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { //获取执行的方法 Class> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); //获取GlobalTransactional(全局事务)、GlobalLock(全局锁)元数据 final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); //GlobalLock会将本地事务的执行纳入Seata分布式事务的管理,共同竞争全局锁 //保证全局事务在执行的时候,本地事务不可以操作全局事务的记录 final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//获取全局锁 boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes); if (!localDisable) { if (globalTransactionalAnnotation != null || this.aspectTransactional != null) { AspectTransactional transactional; if (globalTransactionalAnnotation != null) { transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes()); } else { transactional = this.aspectTransactional; } //执行全局事务 return handleGlobalTransaction(methodInvocation, transactional); } else if (globalLockAnnotation != null) { //执行全局锁 return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } return methodInvocation.proceed(); }}
具体流程图如下所示:
核心源码在上面我们讲解到GlobalTransactionalInterceptor作为全局事务拦截器,一旦执行拦截,就会进入invoke方法,其中,我们会做@GlobalTransactional注解的判断,如果有这个注解的存在,会执行全局事务和全局锁,再执行全局事务的时候会调用handleGlobalTransaction全局事务处理器,获取事务信息,那我们接下来就来看一下GlobalTransactionalInterceptor.handleGlobalTransaction到底是如何执行全局事务的。
Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable { boolean succeed = true; try { return transactionalTemplate.execute(new TransactionalExecutor() { @Override public Object execute() throws Throwable { return methodInvocation.proceed(); } //获取事务名称,默认获取方法名 public String name() { String name = aspectTransactional.getName(); if (!StringUtils.isNullOrEmpty(name)) { return name; } return formatMethod(methodInvocation.getMethod()); } /** * 解析GlobalTransation注解属性,封装对对象 * @return */ @Override public TransactionInfo getTransactionInfo() { // reset the value of timeout //获取超时时间,默认60秒 int timeout = aspectTransactional.getTimeoutMills(); if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) { timeout = defaultGlobalTransactionTimeout; } //构建事务信息对象 TransactionInfo transactionInfo = new TransactionInfo(); transactionInfo.setTimeOut(timeout);//超时时间 transactionInfo.setName(name());//事务名称 transactionInfo.setPropagation(aspectTransactional.getPropagation());//事务传播 transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());//校验或占用全局锁重试间隔 transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());//校验或占用全局锁重试次数 SetrollbackRules = new LinkedHashSet<>(); //其他构建信息 for (Class> rbRule : aspectTransactional.getRollbackFor()) { rollbackRules.add(new RollbackRule(rbRule)); } for (String rbRule : aspectTransactional.getRollbackForClassName()) { rollbackRules.add(new RollbackRule(rbRule)); } for (Class> rbRule : aspectTransactional.getNoRollbackFor()) { rollbackRules.add(new NoRollbackRule(rbRule)); } for (String rbRule : aspectTransactional.getNoRollbackForClassName()) { rollbackRules.add(new NoRollbackRule(rbRule)); } transactionInfo.setRollbackRules(rollbackRules); return transactionInfo; } }); } catch (TransactionalExecutor.ExecutionException e) { //执行异常 TransactionalExecutor.Code code = e.getCode(); switch (code) { case RollbackDone: throw e.getOriginalException(); case BeginFailure: succeed = false; failureHandler.onBeginFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case CommitFailure: succeed = false; failureHandler.onCommitFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case RollbackFailure: failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException()); throw e.getOriginalException(); case RollbackRetrying: failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException()); throw e.getOriginalException(); default: throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code)); } } finally { if (degradeCheck) { EVENT_BUS.post(new DegradeCheckEvent(succeed)); } } }
在这里我们,主要关注一个重点方法execute(),这个方法主要用来执行事务的具体流程:
获取事务信息。执行全局事务。发生异常全局回滚,各个数据通过UndoLog进行事务补偿。全局事务提交。清除所有资源。这个位置也是一个非常核心的一个位置,因为我们所有的业务进来以后都会去走这个位置,具体源码如下所示:
public Object execute(TransactionalExecutor business) throws Throwable { // 1. Get transactionInfo //获取事务信息 TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } // 1.1 Get current transaction, if not null, the tx role is "GlobalTransactionRole.Participant". //获取当前事务,主要获取XID GlobalTransaction tx = GlobalTransactionContext.getCurrent(); // 1.2 Handle the transaction propagation. //根据配置的不同事务传播行为,执行不同的逻辑 Propagation propagation = txInfo.getPropagation(); SuspendedResourcesHolder suspendedResourcesHolder = null; try { switch (propagation) { case NOT_SUPPORTED: // If transaction is existing, suspend it. if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); } // Execute without transaction and return. return business.execute(); case REQUIRES_NEW: // If transaction is existing, suspend it, and then begin new transaction. if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); tx = GlobalTransactionContext.createNew(); } // Continue and execute with new transaction break; case SUPPORTS: // If transaction is not existing, execute without transaction. if (notExistingTransaction(tx)) { return business.execute(); } // Continue and execute with new transaction break; case REQUIRED: // If current transaction is existing, execute with current transaction, // else continue and execute with new transaction. break; case NEVER: // If transaction is existing, throw exception. if (existingTransaction(tx)) { throw new TransactionException( String.format("Existing transaction found for transaction marked with propagation "never", xid = %s" , tx.getXid())); } else { // Execute without transaction and return. return business.execute(); } case MANDATORY: // If transaction is not existing, throw exception. if (notExistingTransaction(tx)) { throw new TransactionException("No existing transaction found for transaction marked with propagation "mandatory""); } // Continue and execute with current transaction. break; default: throw new TransactionException("Not Supported Propagation:" + propagation); } // 1.3 If null, create new transaction with role "GlobalTransactionRole.Launcher". //如果当前事务为空,创建一个新的事务 if (tx == null) { tx = GlobalTransactionContext.createNew(); } // set current tx config to holder GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { // 2. If the tx role is "GlobalTransactionRole.Launcher", send the request of beginTransaction to TC, // else do nothing. Of course, the hooks will still be triggered. //开始执行全局事务 beginTransaction(txInfo, tx); Object rs; try { // Do Your Business // 执行当前业务逻辑 //1、在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据 //2、执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log中 //3、远程调用其他应用,远程应用接收到XID,也会注册分支事务,写入branch_table以及本地undo_log表 //4、会在lock_table表中插入全局锁数据(一个分支一条) rs = business.execute(); } catch (Throwable ex) { // 3. The needed business exception to rollback. //发生异常全局回滚,每个事务通过undo_log表进行事务补偿 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } // 4. everything is fine, commit. //全局提交 commitTransaction(tx); return rs; } finally { //5. clear //清理所有资源 resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); } } finally { // If the transaction is suspended, resume it. if (suspendedResourcesHolder != null) { tx.resume(suspendedResourcesHolder); } } }
这其中的第三步和第四步其实在向 TC(Seata-Server)发起全局事务的提交或者回滚,在这里我们首先关注执行全局事务的beginTransaction()
方法。
// 向TC发起请求,采用模板模式 private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeBegin(); //对TC发起请求 tx.begin(txInfo.getTimeOut(), txInfo.getName()); triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } }
在来关注其中,向TC发起请求的tx.begin()方法,而调用begin()方法的类为:DefaultGlobalTransaction。
@Override public void begin(int timeout, String name) throws TransactionException { //判断调用者是否为TM if (role != GlobalTransactionRole.Launcher) { assertXIDNotNull(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid); } return; } assertXIDNull(); String currentXid = RootContext.getXID(); if (currentXid != null) { throw new IllegalStateException("Global transaction already exists," + " can"t begin a new global transaction, currentXid = " + currentXid); } //获取XID xid = transactionManager.begin(null, null, name, timeout); status = GlobalStatus.Begin; //绑定XID RootContext.bind(xid); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction [{}]", xid); } }
再来看一下transactionManager.begin()方法,这个时候使用的是DefaultTransactionManager.begin默认的事务管理者,来获取XID,传入事务相关的信息 ,最好TC返回对应的全局事务XID,它调用的是DefaultTransactionManager.begin()方法。
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); //发送请求得到响应 GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); if (response.getResultCode() == ResultCode.Failed) { throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg()); } //返回XID return response.getXid(); }
在这里我们需要关注一个syncCall,在这里采用的是Netty通讯方式。
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException { try { // 通过Netty发送请求 return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request); } catch (TimeoutException toe) { throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe); } }
具体图解如下:
在这里我们需要重点了解GlobalTransactionScanner这个类型,在这个类型中继承了一些接口和抽象类,这个类主要作用就是扫描有注解的Bean,并做AOP增强。
ApplicationContextAware:继承这个类型以后,需要实现其方法setApplicationContext(),当Spring启动完成以后,会自动调用这个类型,将ApplicationContext给bean,也就是说,GlobalTransactionScanner能够很自然的使用Spring环境。InitializingBean: 继承这个接口,需要实现afterPropertiesSet(),但凡是继承这个接口的类,在初始化的时候,当所有的properties设置完成以后,会执行这个方法。DisposableBean: 这个类,实现了一个destroy()这个方法是在销毁的时候去调用。AbstractAutoProxyCreator: 这个类是Spring实现AOP的一种方式,本质上是一个BeanPostProcessor,在Bean初始化至去年,调用内部createProxy(),创建一个Bean的AOP代理Bean并返回,对Bean进行增强。Seata数据源代理在上面的环节中,我们讲解了Seata AT模式2PC的执行流程,那么现在我们就来带大家了解一下关于AT数据源代理的信息,这也是AT模式中非常关键的一个重要知识点,大家可以拿起小本子,记下来。
首先AT模式的核心主要分为一下两个:
开启全局事务,获取全局锁。解析SQL并写入undoLog中。关于第一点我们已经分析清楚了,第二点就是关于AT模式如何解析SQL并写入undoLog中,但是在这之前,我们需要知道Seata是如何选择数据源,并进行数据源代理的。虽然全局事务拦截成功后最终还是执行了业务方法进行SQL提交和操作,但是由于Seata对数据源进行了代理,所以SQL的解析和undoLog的操作,是在数据源代理中进行完成的。
数据源代理是Seata中一个非常重要的知识点,在分布式事务运行过程中,undoLog的记录、资源的锁定,用户都是无感知的,因为这些操作都是数据源的代理中完成了,恰恰是这样,我们才要去了解,这样不仅有利于我们了解Seata的核心操作,还能对以后源码阅读有所帮助,因为其实很多底层代码都会去使用这样用户无感知的方式(代理)去实现。
同样,我们在之前的寻找源码入口的时候,通过我们项目中引入的jar找到一个SeataAutoConfiguration类,我们在里面找到一个SeataDataSourceBeanPostProcessor(),这个就是我们数据源代理的入口方法。
我们进入SeataDataSourceBeanPostProcessor类里面,发现继承了一个BeanPostProcessor,这个接口我们应该很熟悉,这个是Sprng的拓展接口,所有的Bean对象,都有进入两个方法postProcessAfterInitialization()和postProcessBeforeInitialization()这两个方法都是由BeanPostProcessor提供的,这两个方法,一个是初始化之前执行Before。一个是在初始化之后执行After,主要用来对比我们的的Bean是否为数据源代理对象。
在这里我们需要关注到一个postProcessAfterInitialization.proxyDataSource()方法,这个里面。
private Object proxyDataSource(Object originBean) { DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) originBean); if (this.useJdkProxy) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), SpringProxyUtils.getAllInterfaces(originBean), (proxy, method, args) -> handleMethodProxy(dataSourceProxy, method, args, originBean)); } else { return Enhancer.create(originBean.getClass(), (MethodInterceptor) (proxy, method, args, methodProxy) -> handleMethodProxy(dataSourceProxy, method, args, originBean)); } }
这里有一个DataSourceProxy代理对象,我们需要看的就是这个类,这个就是我们数据库代理的对象,我们从我们下载的源码项目中,搜索这个代理对象,当我们打开这个类的目录时发现,除了这个,还有ConnectionProxy连接对象、StatementProxy、PreparedStatementProxySQL执行对象,这些都被Seata进行了代理,为什么要对这些都进行代理,代理的目的其实为了执行Seata的业务逻辑,生成undoLog,全局事务的开启,事务的提交回滚等操作。
DataSourceProxy具体做了什么,主要功能有哪些,我们来看一下。他在源码中是如何体现的,我们需要关注的是init()。
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource { private String resourceGroupId; private void init(DataSource dataSource, String resourceGroupId) { //资源组ID,默认是“default”这个默认值 this.resourceGroupId = resourceGroupId; try (Connection connection = dataSource.getConnection()) { //根据原始数据源得到JDBC连接和数据库类型 jdbcUrl = connection.getMetaData().getURL(); dbType = JdbcUtils.getDbType(jdbcUrl); if (JdbcConstants.ORACLE.equals(dbType)) { userName = connection.getMetaData().getUserName(); } else if (JdbcConstants.MARIADB.equals(dbType)) { dbType = JdbcConstants.MYSQL; } } catch (SQLException e) { throw new IllegalStateException("can not init dataSource", e); } initResourceId(); DefaultResourceManager.get().registerResource(this); if (ENABLE_TABLE_META_CHECKER_ENABLE) { //如果配置开关打开,会定时在线程池不断更新表的元数据缓存信息 tableMetaExecutor.scheduleAtFixedRate(() -> { try (Connection connection = dataSource.getConnection()) { TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()) .refresh(connection, DataSourceProxy.this.getResourceId()); } catch (Exception ignore) { } }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS); } //Set the default branch type to "AT" in the RootContext. RootContext.setDefaultBranchType(this.getBranchType()); }}
从上面我们可以看出,他主要做了以下几点的增强:
给每个数据源标识资源组ID。如果打开配置,会有一个定时线程池定时更新表的元数据信息并缓存到本地。生成代理连接ConnectionProxy对象。在这三个增强功能里面,第三个是最重要的,在AT模式里面,会自动记录undoLog,资源锁定,都是通过ConnectionProxy完成的,除此之外DataSrouceProxy重写了一个方法getConnection,因为这里返回的是一个ConnectionProxy,而不是原生的Connection。
@Override public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy(this, targetConnection); } @Override public ConnectionProxy getConnection(String username, String password) throws SQLException { Connection targetConnection = targetDataSource.getConnection(username, password); return new ConnectionProxy(this, targetConnection); }ConnectionProxy
ConnectionProxy继承AbstractConnectionProxy,在这个父类中有很多公用的方法,在这个父类有PreparedStatementProxy、StatementProxy、DataSourceProxy。
所以我们需要先来看一下AbstractConnectionProxy,因为这里封装了需要我们用到的通用方法和逻辑,在其中我们需要关注的主要在于PreparedStatementProxy和StatementProxy,在这里的逻辑主要是数据源连接的步骤,连接获取,创建执行对象等等。
@Override public Statement createStatement() throws SQLException { //调用真实连接对象获取Statement对象 Statement targetStatement = getTargetConnection().createStatement(); //创建Statement的代理 return new StatementProxy(this, targetStatement); } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { //获取数据库类型 mysql/oracle String dbType = getDbType(); // support oracle 10.2+ PreparedStatement targetPreparedStatement = null; //如果是AT模式且开启全局事务 if (BranchType.AT == RootContext.getBranchType()) { ListsqlRecognizers = SQLVisitorFactory.get(sql, dbType); if (sqlRecognizers != null && sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) { //获取表的元数据 TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(), sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId()); //得到表的主键列名 String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()]; tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray); targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray); } } } if (targetPreparedStatement == null) { targetPreparedStatement = getTargetConnection().prepareStatement(sql); } //创建PreparedStatementProxy代理 return new PreparedStatementProxy(this, targetPreparedStatement, sql); }
在这两个代理对象中,都用到了以下几个方法:
@Overridepublic ResultSet executeQuery(String sql) throws SQLException { this.targetSQL = sql; return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);}@Overridepublic int executeUpdate(String sql) throws SQLException { this.targetSQL = sql; return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);}@Overridepublic boolean execute(String sql) throws SQLException { this.targetSQL = sql; return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);}
在这些方法中都调用了ExecuteTemplate.execute(),所以我们就看一下在ExecuteTemplate类中具体是做了什么操作:
public class ExecuteTemplate { public staticT execute(List sqlRecognizers, StatementProxy statementProxy, StatementCallbackstatementCallback, Object... args) throws SQLException { //如果没有全局锁,并且不是AT模式,直接执行SQL if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) { // Just work as original statement return statementCallback.execute(statementProxy.getTargetStatement(), args); } //得到数据库类型- mysql/oracle String dbType = statementProxy.getConnectionProxy().getDbType(); if (CollectionUtils.isEmpty(sqlRecognizers)) { //sqlRecognizers 为SQL语句的解析器,获取执行的SQL,通过它可以获得SQL语句表名、相关的列名、类型等信息,最后解析出对应的SQL表达式 sqlRecognizers = SQLVisitorFactory.get( statementProxy.getTargetSQL(), dbType); } Executor executor; if (CollectionUtils.isEmpty(sqlRecognizers)) { //如果seata没有找到合适的SQL语句解析器,那么便创建简单执行器PlainExecutor //PlainExecutor直接使用原生的Statment对象执行SQL executor = new PlainExecutor<>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); switch (sqlRecognizer.getSQLType()) { //新增 case INSERT: executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer}); break; //修改 case UPDATE: executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; //删除 case DELETE: executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; //加锁 case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; //插入加锁 case INSERT_ON_DUPLICATE_UPDATE: switch (dbType) { case JdbcConstants.MYSQL: case JdbcConstants.MARIADB: executor = new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer); break; default: throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE"); } break; //原生 default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; } } else { //批量处理SQL语句 executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { //执行 rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { // Turn other exception into SQLException ex = new SQLException(ex); } throw (SQLException) ex; } return rs; }}
在ExecuteTemplate就一个execute(),Seata将SQL执行委托给不同的执行器(模板),Seata提供了6种执行器也就是我们代码 case 中(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE,INSERT_ON_DUPLICATE_UPDATE),这些执行器的父类都是AbstractDMLBaseExecutor。
UpdateExecutor: 执行update语句。InsertExecutor: 执行insert语句。DeleteExecutor: 执行delete语句。SelectForUpdateExecutor: 执行select for update语句。PlainExecutor: 执行普通查询语句。MultiExecutor: 复合执行器,在一条SQL语句中执行多条语句。关系图如下:
然后我们找到rs = executor.execute(args);最终执行的方法,找到最顶级的父类BaseTransactionalExecutor.execute()。
@Override public T execute(Object... args) throws Throwable { String xid = RootContext.getXID(); if (xid != null) { //获取XID statementProxy.getConnectionProxy().bind(xid); } //设置全局锁 statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock()); return doExecute(args); }
在根据doExecute(args);找到其中的重写方法AbstractDMLBaseExecutor.doExecute()。
@Override public T doExecute(Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); //是否自动提交 if (connectionProxy.getAutoCommit()) { return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); } }
对于数据库而言,本身都是自动提交的,所以我们进入executeAutoCommitTrue()。
protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { //设置为手动提交 connectionProxy.changeAutoCommit(); return new LockRetryPolicy(connectionProxy).execute(() -> { //调用手动提交方法,得到分支执行的最终结果 T result = executeAutoCommitFalse(args); //执行提交 connectionProxy.commit(); return result; }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true); } }
connectionProxy.changeAutoCommit()方法,修改为手动提交后,我们看来最关键的代码executeAutoCommitFalse()。
protected T executeAutoCommitFalse(Object[] args) throws Exception { if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) { throw new NotSupportYetException("multi pk only support mysql!"); } //获取前镜像 TableRecords beforeImage = beforeImage(); //执行具体业务 T result = statementCallback.execute(statementProxy.getTargetStatement(), args); //获取执行数量 int updateCount = statementProxy.getUpdateCount(); //判断如果执行数量大于0 if (updateCount > 0) { //获取后镜像 TableRecords afterImage = afterImage(beforeImage); //暂存到undolog中,在Commit的时候保存到数据库 prepareUndoLog(beforeImage, afterImage); } return result; }
我们再回到executeAutoCommitTrue中,去看看提交做了哪些操作connectionProxy.commit()。
@Override public void commit() throws SQLException { try { lockRetryPolicy.execute(() -> { //具体执行 doCommit(); return null; }); } catch (SQLException e) { if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) { rollback(); } throw e; } catch (Exception e) { throw new SQLException(e); } }
进入到doCommit()中。
private void doCommit() throws SQLException { //判断是否存在全局事务 if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); } }
作为分布式事务,一定是存在全局事务的,所以我们进入processGlobalTransactionCommit()。
private void processGlobalTransactionCommit() throws SQLException { try { //注册分支事务 register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { //写入数据库undolog UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); //执行原生提交 一阶段提交 targetConnection.commit(); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); report(false); throw new SQLException(ex); } if (IS_REPORT_SUCCESS_ENABLE) { report(true); } context.reset(); }
其中register()方法就是注册分支事务的方法,同时还会将undoLog写入数据库和执行提交等操作。
//注册分支事务,生成分支事务ID private void register() throws TransactionException { if (!context.hasUndoLog() || !context.hasLockKey()) { return; } //注册分支事务 Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.getApplicationData(), context.buildLockKeys()); context.setBranchId(branchId); }
然后我们在回到processGlobalTransactionCommit中,看看写入数据库中的flushUndoLogs()。
@Override public void flushUndoLogs(ConnectionProxy cp) throws SQLException { ConnectionContext connectionContext = cp.getContext(); if (!connectionContext.hasUndoLog()) { return; } //获取XID String xid = connectionContext.getXid(); //获取分支ID long branchId = connectionContext.getBranchId(); BranchUndoLog branchUndoLog = new BranchUndoLog(); branchUndoLog.setXid(xid); branchUndoLog.setBranchId(branchId); branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems()); UndoLogParser parser = UndoLogParserFactory.getInstance(); byte[] undoLogContent = parser.encode(branchUndoLog); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET)); } CompressorType compressorType = CompressorType.NONE; if (needCompress(undoLogContent)) { compressorType = ROLLBACK_INFO_COMPRESS_TYPE; undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent); } //写入数据库具体位置 insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection()); }
具体写入方法,此时我们使用的是MySql,所以执行的是MySql实现类MySQLUndoLogManager.insertUndoLogWithNormal()。
@Override protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, Connection conn) throws SQLException { insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn); } //具体写入操作 private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, State state, Connection conn) throws SQLException { try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) { pst.setLong(1, branchId); pst.setString(2, xid); pst.setString(3, rollbackCtx); pst.setBytes(4, undoLogContent); pst.setInt(5, state.getValue()); pst.executeUpdate(); } catch (Exception e) { if (!(e instanceof SQLException)) { e = new SQLException(e); } throw (SQLException) e; } }
具体流程如下所示:
Seata 服务端我们找到Server.java这里就是启动入口,在这个入口中找到协调者,因为TC整体的操作就是协调整体的全局事务。
//默认协调者 DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
在DefaultCoordinator类中我们找到 一个doGlobalBegin这个就是处理全局事务开始的方法,以及全局提交doGlobalCommit和全局回滚doGlobalRollback。
//处理全局事务 @Override protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { //响应客户端xid response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout())); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid()); } } //处理全局提交 @Override protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); response.setGlobalStatus(core.commit(request.getXid())); } //处理全局回滚 @Override protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); response.setGlobalStatus(core.rollback(request.getXid())); }
在这里我们首先关注doGlobalBegin中core.begin()。
@Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { //创建全局事务Session GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout); MDC.put(RootContext.MDC_KEY_XID, session.getXid()); //为Session重添加回调监听,SessionHolder.getRootSessionManager() 获取一个全局Session管理器DataBaseSessionManager //观察者设计模式,创建DataBaseSessionManager session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); //全局事务开始 session.begin(); // transaction start event MetricsPublisher.postSessionDoingEvent(session, false); return session.getXid(); }
然后我们在来看一下SessionHolder.getRootSessionManager()。
/** * Gets root session manager. * 获取一个全局Session管理器 * @return the root session manager */ public static SessionManager getRootSessionManager() { if (ROOT_SESSION_MANAGER == null) { throw new ShouldNeverHappenException("SessionManager is NOT init!"); } return ROOT_SESSION_MANAGER; } public static void init(String mode) { if (StringUtils.isBlank(mode)) { mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE)); } StoreMode storeMode = StoreMode.get(mode); //判断Seata模式,当前为DB if (StoreMode.DB.equals(storeMode)) { //通过SPI机制读取SessionManager接口实现类,读取的META-INF.services目录,在通过反射机制创建对象DataBaseSessionManager ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName()); ........ } }
在这里他其实读取的是DB模式下io.seata.server.session.SessionManager文件的内容。
我们在回到begin方法中,去查看session.begin()。
@Override public void begin() throws TransactionException { //声明全局事务开始 this.status = GlobalStatus.Begin; //开始时间 this.beginTime = System.currentTimeMillis(); //激活全局事务 this.active = true; //将SessionManager放入到集合中,调用onBegin方法 for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { //调用父级抽象类的方法 lifecycleListener.onBegin(this); } }
这里我们来看一下onBegin()方法,调用的是父级的方法,在这其中我们要关注addGlobalSession()方法,但是要注意,这里我们用的是db模式所以调用的是db模式的DateBaseSessionManager。
@Override public void onBegin(GlobalSession globalSession) throws TransactionException { //这里调用的是DateBaseSessionManager addGlobalSession(globalSession); } @Override public void addGlobalSession(GlobalSession session) throws TransactionException { if (StringUtils.isBlank(taskName)) { //写入session boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session); if (!ret) { throw new StoreException("addGlobalSession failed."); } } else { boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session); if (!ret) { throw new StoreException("addGlobalSession failed."); } } }
然后在看查询其中关键的方法DataBaseTransactionStoreManager.writeSession()。
@Override public boolean writeSession(LogOperation logOperation, SessionStorable session) { //第一次进入是写入 会进入当前方法 //全局添加 if (LogOperation.GLOBAL_ADD.equals(logOperation)) { return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); //全局修改 } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) { return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); //全局删除 } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) { return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); //分支添加 } else if (LogOperation.BRANCH_ADD.equals(logOperation)) { return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); //分支更新 } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) { return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); //分支移除 } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) { return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else { throw new StoreException("Unknown LogOperation:" + logOperation.name()); } }
我们就看第一次进去的方法logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session))。
@Override public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) { String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable); Connection conn = null; PreparedStatement ps = null; try { int index = 1; conn = logStoreDataSource.getConnection(); conn.setAutoCommit(true); ps = conn.prepareStatement(sql); ps.setString(index++, globalTransactionDO.getXid()); ps.setLong(index++, globalTransactionDO.getTransactionId()); ps.setInt(index++, globalTransactionDO.getStatus()); ps.setString(index++, globalTransactionDO.getApplicationId()); ps.setString(index++, globalTransactionDO.getTransactionServiceGroup()); String transactionName = globalTransactionDO.getTransactionName(); transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0, transactionNameColumnSize) : transactionName; ps.setString(index++, transactionName); ps.setInt(index++, globalTransactionDO.getTimeout()); ps.setLong(index++, globalTransactionDO.getBeginTime()); ps.setString(index++, globalTransactionDO.getApplicationData()); return ps.executeUpdate() > 0; } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(ps, conn); } }
在这里有一个GlobalTransactionDO对象,里面有xid、transactionId等等,到这里是不是就很熟悉了。
还记得我们第一次使用Seata的时候会创建三张表:
branch_table 分支事务表。global_table 全局事务表。lock_table 全局锁表。而这里就是对应我们的global_table表,其他两个也是差不多,都是一样的操作。
流程图如下:
总结完整流程图:
对于Seata源码来说主要是了解从哪里入口以及核心点在哪里,遇到有疑问的,可以Debug,对于Seata AT模式,我们主要掌握的核心点是
如何获取全局锁、开启全局事务。解析SQL并写入undolog。围绕这两点去看的话,会有针对性一点,到这里我们的Seata源码就讲解完了。