关于java:springcloud结合bytetcc实现数据强一致性原理剖析

45次阅读

共计 8121 个字符,预计需要花费 21 分钟才能阅读完成。

1 应用背景和束缚

公司应用的是 springcloud,面临分布式事务的场景的时候,能够应用对 springcloud 反对比拟好的 byte-tcc 框架,git 目前 2600 星,应用起来也十分不便,原理也很清晰,非常适合学习。https://github.com/liuyangmin…,联合 cloud 有几个重点束缚如下,

(1) 一个业务接口,须要有三种实现类,别离是 try,confirm,cancel,合乎 tcc 的思路。实现办法必须加 Transactional,且 propagation 必须是 Required, RequiresNew, Mandatory 中的一种。

(2) 服务提供方 Controller 必须增加 @Compensable 注解,不容许对 Feign/Ribbon/RestTemplate 等 HTTP 申请自行进行封装。想想看为什么?

(3) 在每个参加 tcc 事务的数据库中创立 bytejta 表。

配置上也非常简单,@Import(SpringCloudConfiguration.class),服务提供方,try 下面加上

 @Service("accountService")
 @Compensable(
  interfaceClass = IAccountService.class 
 , confirmableKey = "accountServiceConfirm"
 , cancellableKey = "accountServiceCancel"
 )

confirm 和 cancel 对应的

@Service("accountServiceConfirm")
@Service("accountServiceCancel"),

try confirm cancel 对应业务能够了解为 解冻库存 / 真正扣减库存 / 复原库存,或者解冻优惠券 / 核销优惠券 / 复原优惠券这种理论业务场景。
0.5 当前 datasource 主动应用 LocalXADataSource,之前须要手动配置

 @Bean(name = "dataSource")
  public DataSource getDataSource() {LocalXADataSource dataSource = new LocalXADataSource();
        dataSource.setDataSource(this.invokeGetDataSource());
        return dataSource;
    }

所以,从配置上看,bytetcc 和 springcloud 联合,一个应该是通过引入本人的 SpringCloudConfiguration 封装了 feign/ribbon/hystrix 调用,一个是提供了本人的 datasource 治理事务。有了本人的 datasource,定制本人的 transactionManager, 就能够在事务前后动手脚,2pc/3pc 对事务的治理,体现在管制不同数据库连贯上,微服务为主的 tcc,对事务的治理体现在管制各个服务的调用上。

2 业务场景思考

bytetcc 的 tcc,其实 try 和 cancel 是配套的,思考下业务场景:

(1)如果 a 服务 try 胜利了,b 服务 try 失败,则 a 服务须要回滚,调用 a 的 cancel。这是广泛流程。

(2)如果 a 和 b 都 try 胜利了,而后 a confirm 胜利,b 的 confirm 失败,是没有 cancel 和 confirm 配对的。b 的 confirm 会一直调用直到胜利为止。

因为 bytetcc 的设计思路是,通过 try 做好筹备工作(如锁定资源),try 如果能胜利,那么逻辑上 confirm 肯定要胜利。如果 confirm 不胜利,则可能是外部环境问题,如网络问题等,那么环境复原了迟早应该胜利 confirm。基于这个思维,try 和 cancel 是互逆的,confirm 一旦执行就不可逆。

如果要设计 confirm 也可逆的,那要么 cancel 里判断是该回滚 try 还是回滚 try+confirm,不清晰且实现很麻烦,或者做成“tccc”加一个对应 confirm 的 cancel,由事务管理器对立判断调用几个 cancel,引入太多不确定。所以业务上能够间接这么设计:try 胜利,那么 confirm 是肯定要胜利的。

3 外围组件 SpringCloudConfiguration 原理剖析

第一步就 import 的 SpringCloudConfiguration 是重点,通过它的各种主动拆卸根本能够实现 bytetcc 的全逻辑。要想扩大 spring,那就得扩大各种 BeanFactoryPostProcessor,SpringCloudConfiguration 自身就是个 BeanFactoryPostProcessor,然而 postProcessBeanFactory 没干啥,应该是引入了各种其余 processor 进行扩大。如何扩大面临几个问题

(1)如何辨认外围的 @Compensable 注解?
在 byte-tcc 的一堆 resource 文件里,配置了各种 bean。既然都 Springcloud 了为啥还用这种文件 bean,可能是兼容 cloud 之外的场景,加强通用性。在 bytetcc-supports-tcc.xml 里,有个 bean <bean class=”org.bytesoft.bytetcc.supports.spring.CompensableAnnotationConfigValidator” />,通过要害代码
clazz.getAnnotation(Compensable.class); 扫描失去所有注解了 Compensable 的类,同时解析进去 cancel,confirm 对应的类。同时校验一下有没有加 transactional 注解。前面很多相似的 processor 都是用这种注解辨认须要的 bean

(2)如何革新事务管理器,使之适应散布式微服务环境?
在 bytetcc-supports-tcc.xml 中,定义了革新过的 transactionManager,

    <bean id="transactionManager" class="org.bytesoft.bytetcc.TransactionManagerImpl" />

这外面重写了 begin,commit 那一套,联合了 tcc 专用组件 CompensableManager,从新包装了事务操作。
同时,通过 SpringCloudConfiguration 配置的 CompensableHandlerInterceptor,达到 transactionInterceptor 的成果。

(3)事务如何复原?
在 bytetcc-supports-logger-primary.xml 中,有个 ResourceAdapterImpl,这个是启动 bytetcc 后盾线程的中央,看 bean 配置就晓得,把两个 bean compensableWork 和 bytetccCleanupWork 注入到 ResourceAdapterImpl 中对立治理,字面意思上看是弥补工作和数据清理工作。这里的 compensableWork 就是弥补和数据恢复专用的 job。

    <bean id="compensableResourceAdapter" class="org.bytesoft.transaction.adapter.ResourceAdapterImpl">
        <property name="workList">
            <list>
                <ref bean="compensableWork" />
                <ref bean="bytetccCleanupWork" />
            </list>
        </property>
    </bean>

追进去看,通过 List<Work> workList 收集起来 work,而后对立用 mananger 进行 start,看 work 对象自身就继承了 Runnable,所以这里是开启了后盾线程,进行事务复原等操作。

(4)具体的申请接口,怎么扩大?
import 的 SpringCloudConfiguration 的代理组件,通过条件注解和配置,依据是否开启 hystrix 和是否引入 HystrixFeign 的类,注入针对 feign 或 hystrix 的 CompensableFeignBeanPostProcessor,如下

@org.springframework.context.annotation.Bean
    @ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "false", matchIfMissing = true)
    public CompensableFeignBeanPostProcessor feignPostProcessor() {return new CompensableFeignBeanPostProcessor();
    }

    @org.springframework.context.annotation.Bean
    @ConditionalOnProperty(name = "feign.hystrix.enabled")
    @ConditionalOnClass(feign.hystrix.HystrixFeign.class)
    public CompensableHystrixBeanPostProcessor hystrixPostProcessor() {return new CompensableHystrixBeanPostProcessor();
    }

以 CompensableFeignBeanPostProcessor 为例,显著这就是为了对 feign 接口进行代理的 PostProcessor,在 postProcessAfterInitialization 中,果然通过 createProxiedObject(),创立了 CompensableFeignHandler 的代理类,对 springcloud 本人的 FeignInvocationHandler 进行了又一次代理。这样所有 @FeignClient 的接口都会通过这个 handler

同理如果是 hystrix 的代理,CompensableHystrixBeanPostProcessor 会创立 CompensableHystrixFeignHandler 代理,代替原来的 CompensableHystrixInvocationHandler。

通过这两种代理,能够对原生的 feign/hystrix 组件持续代理,在申请前后做些事件。feign/hystrix 其实自身也是联合了 ribbon 的代理,所以很多 spring 的扩大就是一层层的代理叠加,为咱们扩大组件提供了一种思路。那么 bytetcc 的外围流程必定就蕴含在这个申请代理中。

(5)如何管制申请哪一种办法?
bytetcc-supports-springcloud-primary.xml 中,有个 controller,CompensableCoordinatorController,能够看到外面封装了几种办法,prepare,commit,rollback,额定还有 recover,forget,名字上能够看出是复原,删除事务。联合第四点,原来的 feign 调用被代理一层,申请的实在 url 应该被改过,改成了申请这一个 controller 的办法,通过这个 controller 再决定前面做什么。

@RequestMapping(value = "/org/bytesoft/bytetcc/prepare/{xid}", method = RequestMethod.POST)
@RequestMapping(value = "/org/bytesoft/bytetcc/commit/{xid}/{opc}", method = RequestMethod.POST)
@RequestMapping(value = "/org/bytesoft/bytetcc/rollback/{xid}", method = RequestMethod.POST)
@RequestMapping(value = "/org/bytesoft/bytetcc/recover/{flag}", method = RequestMethod.GET)
@RequestMapping(value = "/org/bytesoft/bytetcc/forget/{xid}", method = RequestMethod.POST)

综上所述,通过新的分布式事务管理器的封装,feign/hystrix 申请的代理,controller 的管制,后盾弥补工作的执行,基本上能够实现强一致性的分布式事务。

4 事务启动 -try 过程

(1)产生事务
接到用户一个申请时,CompensableHandlerInterceptor 会先拦挡,这是用户刚发的申请,在这里没找到事务信息什么都不干就返回 true 了,如果是被调用者,无论是 try/confirm/cancel,都会有个事务上下文信息,解析出事务。

CompensableMethodInterceptor->excute(), 取得了 @transactional 和 @composable 注解,包含 confirm/cancel 办法信息,封装到 invocation,保留本次调用的一些信息。

transactionInterceptor,调用 bytetcc 提供的 TransactionManagerImpl, 提供了新的 begin,启动 tcc 事务。留神这里,如果没有事务上下文,没有 compensable 注解,那就走个别的 begin,就是个别的本地事务。
有了 compensable 注解,begin 就是下面说到的 CompensableManager 的 compensableBegin 办法,初始化了事务上下文环境 transanctionContext,还生成了个事务 id-xid。

(2)办法执行,CompensableFeignInterceptor, 把下面生成的事物上下文环境 transactionContext,通过序列化生成字符串,放入 request 的 header 中,这样发动事务无论调用什么服务,事务的上下文信息就保留在 request 的 header 里了。
前面依据 feign 的代理 CompensableFeignHandler 发出请求,return this.delegate.invoke(proxy, method, args) delegate 就是 feign,实质上也是应用原生的 feign 发申请。

既然实质也是 feign 调用,思考一下为啥还麻烦代理一次?事务上下文环境在 interceptor 外面曾经设置到 request 里了,还代理干啥?
往后看,我认为要害在这里,

beanRegistry.setLoadBalancerInterceptor(new CompensableLoadBalancerInterceptor(this.statefully) 

大家晓得,feign 通过 ribbon 组件进行的简单平衡,即 chooseInstance, 抉择申请往哪个实例上发,如果还是轮训或随机,第一次 try 申请发到某实例,第二次 confirm/cancel 发到其余实例,别的实例上没有 try 带来的的事务信息,会十分不不便,也不晓得 try 到底什么状况,
所以这里要屡次申请粘滞到一个实例上。所以 bytetcc 实现了 ribbon 算法 CompensableLoadBalancerRuleImpl,不反对自定义 rule

(3)服务接管方承受,首先通过 CompensableHandlerInterceptor 的 preHandle, 解析出事务上下文 transactionContext, 封装成 TransactionRequestImpl,并在 response 里加上一些 header 信息。这在发起者那里因为没有 header 的上下文,所以在(1)是什么都不做的
再到 CompensableMethodInterceptor, 解析办法。
再到 TransactionManager,即 bytetcc 的 TransactionManagerImpl,到这里的 begin,因为刚接到申请的时候,这时候曾经有事务了,所以调用的是个别的本地事务 compensableManager.begin(),最初开启一个本地事务,而后执行本地办法,执行 commit。
下图能够简略介绍这个过程。

5 try 调用失败,cancel 过程

(1)如果有任意一个 try 失败,那么要把曾经胜利的 try 给回滚掉,spring 通用的 transactionInterceptor 的处理过程,invokeWithinTransaction 办法,如果有异样,catch 住执行
completeTransactionAfterThrowing(),而后到 transactionManagerImpl 的 rollback, 持续到 CompensableManager 的 collback

(2)CompensableTransanctionImpl 中,fireRemoteParticipantCancel 是真正的 rollback,外面保护了一个 resourcelist,按程序记录了其余各个服务在 try 的时候调用的服务,在这里循环这个 list 调用 SpringCloudCoordinator, 拼接 cancel 地址,带着事务 id 发送申请过来。

(3)接管方,CompensableCoordinatorController 的 rollback,外围是从 CompensableTransactionImpl 到 SpringContainerContextImpl 的 cancel,失去申请的 controller 的对应的 cancel 办法,封装到 cancellableKey,而后拿到解决 cancel 的实在的 bean,

Object instance = this.applicationContext.getBean(cancellableKey);
this.cancelComplicated(method, instance, args);

进而执行 cancel 对应的 bean 的办法。整个过程能够如下概括

6 try 胜利,confirm 过程

同理,transactionManagerImpl 的 commit,最终达到 CompensableTransactionImp 进行 fireCommit,先提交本地事务,而后 fireRemoteParticipantConfirm,和 cancel 截然不同,读取 resourceList, 遍历 list 发送申请到各个服务端。
各个服务方 CompensableCoordinatorController 的 commit,拿到 confirmablekey,找到 confirm 的 bean 进行 confirm。

7“compensable”的弥补

(1)如果 cancel,commit 有失败(失败蕴含 runtimeexception 和自定义的一些异样),那么如何进行弥补,下面提到的一开始就启动的 CompensableWork 线程的 run 外面,其实有个 while(true),每隔 100 秒循环一次,调用组件 TransactionRecovery(看名字就晓得复原事务用的)的 timingRecover,就是定时回复,会调用到 CompensableTransactionImpl 的 recoveryRollback/recoveryCommit,还是 SpringCloudCoordinator 发送的申请。
(2)如果呈现宕机,重启后也是通过 CompensableWork 线程的 run,第一步是 init,尝试复原现有的事务。

a 如果 try 没有执行完就 down 机,复原时把已执行的 try 给 cancel 掉。因为事务个别是业务申请触发的,down 机就申请失败了,没必要重启后还复原方才的申请。b 如果是 confirm/cancel 有没胜利的,会始终定时进行 confirm/cancel。

正文完
 0