乐趣区

关于前端:分布式事务框架seata落地实践

前言

seata 是阿里巴巴研发的一套开源分布式事务框架,提供了 AT、TCC、SAGA 和 XA 几种事务模式。本文以精品课项目组的物流后盾服务为例,介绍 seata 框架落地的过程,遇到的问题以及解决方案。

作者 / 邓新伟

编辑 / 网易有道

有道精品课教务零碎是基于 springcloud 的分布式集群服务。在理论业务中,存在许多分布式事务场景。然而传统的事务框架是无奈实现全局事务的。长期以来,咱们的分布式场景的一致性,往往指的是放弃强一致性,保障最终一致性。

咱们从调研中发现,seata 框架既能够满足业务需要,灵便兼容多种事务模式,又能够实现数据强一致性。

本文以 物流业务 为例,记录了在理论业务中落地 seata 框架落地的过程中遇到的一些问题以及解决方案,供大家学习探讨~ 欢送大家在留言区探讨交换

1. 根底信息

  • seata 版本:1.4
  • 微服务框架:springcloud
  • 注册核心:consul

2. 根本框架

2.1 根本组件

seata 框架分为 3 个组件:

  • TC (Transaction Coordinator) - 事务协调者(即 seata-server)

保护全局和分支事务的状态,驱动全局事务提交或回滚。

  • TM (Transaction Manager) - 事务管理器(在 client 上,发动事务的服务)

定义全局事务的范畴:开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) – 资源管理器(在 client)

治理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚

2.2. 部署 seata-server(TC)

在官网下载 seata 服务端,解压后执行 bin/seata-server.sh 即可启动。

seata-server 有 2 个配置文件:registry.conf 与 file.conf。而 registry.conf 文件决定了 seata-server 应用的注册核心配置和配置信息获取形式。

咱们应用 consul 做注册核心,因而须要在 registry.conf 文件中,须要批改以下配置:


registry {
  #file、nacos、eureka、redis、zk、consul、etcd3、sofa
  type = "consul" ## 这里注册核心填 consul
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10
   ... ...
  consul {
    cluster = "seata-server"
    serverAddr = "*** 注册核心地址 ***"
    #这里的 dc 指的是 datacenter,若 consul 为多数据源配置须要在申请中退出 dc 参数。#dc 与 namespace 并非是 seata 框架自带的,文章前面将会进一步解释
    dc="bj-th"
    namespace="seata-courseop"
  }
  ... ...
}

config {
  # file、nacos、apollo、zk、consul、etcd3
  ## 如果启动时从注册核心获取根底配置信息,填 consul
  ## 否则从 file.conf 文件中获取
  type = "consul"
  consul {serverAddr = "127.0.0.1:8500"}
... ...
}

其中须要留神的是,如果须要高可用部署,seata 获取配置信息的形式就必须是注册核心,此时 file.conf 就没用了。

(当然,须要当时把 file.conf 文件中的配置信息迁徙到 consul 中)


store {
  ## store mode: file、db、redis
  mode = "db"

... ...
  ## database store property
  ## 如果应用数据库模式,须要配置数据库连贯设置
  db {## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://*** 线上数据库地址 ***/seata"
    user = "******"
    password = "******"
    minConn = 5
    maxConn = 100
    ## 这里的三张表须要提前在数据库建好
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }
... ...
}

service {
  #vgroup->rgroup
  vgroupMapping.tx-seata="seata-server"
  default.grouplist="127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

其中,global_tablebranch_tablelock_table三张表须要提前在数据库中建好。

2.3 配置 client 端(RM 与 TM)

每个应用 seata 框架的服务都须要引入 seata 组件


dependencies {

    api 'com.alibaba:druid-spring-boot-starter:1.1.10'
    api 'mysql:mysql-connector-java:6.0.6'
    api('com.alibaba.cloud:spring-cloud-alibaba-seata:2.1.0.RELEASE') {exclude group:'io.seata', module:'seata-all'}
    api 'com.ecwid.consul:consul-api:1.4.5'
    api 'io.seata:seata-all:1.4.0'
}

每个服务都同样须要配置 file.conf 与 registry.conf 文件,放在 resource 目录下。registry.conf 与 server 的保持一致。在 file.conf 文件中,除了 db 配置外,还须要进行 client 参数的配置:

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    ## 这个 undo_log 也须要提前在 mysql 中创立
    logTable = "undo_log"
  }
  log {exceptionRate = 100}
}

在 application.yml 文件中增加 seata 配置:

spring:
  cloud:
      seata: ## 留神 tx-seata 须要与服务端和客户端的配置文件保持一致
        tx-service-group: tx-seata

另外,还须要替换我的项目的数据源,

@Primary
    @Bean("dataSource")
    public DataSource druidDataSource(){DruidDataSource druidDataSource = new DruidDataSource();
        druidDataSource.setUrl(url);
        druidDataSource.setUsername(username);
        druidDataSource.setPassword(password);
        druidDataSource.setDriverClassName(driverClassName);
        return new DataSourceProxy(druidDataSource);
    }

至此,client 端的配置也曾经实现了。

3. 性能演示

一个分布式的全局事务,整体是两阶段提交的模型。

全局事务 是由若干分支事务组成的,

分支事务 要满足两阶段提交的模型要求,即须要每个分支事务都具备本人的:

  • 一阶段 prepare 行为
  • 二阶段 commit 或 rollback 行为

依据两阶段行为模式的不同,咱们将分支事务划分为 Automatic (Branch) Transaction ModeTCC (Branch) Transaction Mode.

3.1 AT 模式

AT 模式基于反对本地 ACID 事务的关系型数据库:

  • 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
  • 二阶段 commit 行为:马上胜利完结,主动异步批量清理回滚日志。
  • 二阶段 rollback 行为:通过回滚日志,主动生成弥补操作,实现数据回滚

间接在须要增加全局事务的办法中加上注解 @GlobalTransactional


  @SneakyThrows
    @GlobalTransactional
    @Transactional(rollbackFor = Exception.class)
    public void buy(int id, int itemId){
        // 学生成订单
        Order order = orderFeignDao.create(id, itemId);
        // 依据订单扣减账户余额
        accountFeignDao.draw(id, order.amount);
    }

留神:同 @Transactional 一样,@GlobalTransactional 若要失效也要满足:

  • 指标函数必须为 public 类型
  • 同一类内办法调用时,调用指标函数的办法必须通过 springBeanName.method 的模式来调用,不能应用 this 间接调用外部办法

3.2TCC 模式

TCC 模式是反对把自定义的分支事务纳入到全局事务的治理中。

  • 一阶段 prepare 行为:调用自定义的 prepare 逻辑。
  • 二阶段 commit 行为:调用自定义的 commit 逻辑。
  • 二阶段 rollback 行为:调用自定义的 rollback 逻辑。

首先 编写一个 TCC 服务接口:

@LocalTCC
public interface BusinessAction {@TwoPhaseBusinessAction(name = "doBusiness", commitMethod = "commit", rollbackMethod = "rollback")
    boolean doBusiness(BusinessActionContext businessActionContext,
                       @BusinessActionContextParameter(paramName = "message") String msg);

    boolean commit(BusinessActionContext businessActionContext);

    boolean rollback(BusinessActionContext businessActionContext);
}

其中,BusinessActionContext 为全局事务上下文,能够从此对象中获取全局事务相干信息(如果是发动全局事务方,传入 null 后主动生成),而后实现该接口:


@Slf4j
@Service
public class BusinessActionImpl implements BusinessAction {@Transactional(rollbackFor = Exception.class)
    @Override
    public boolean doBusiness(BusinessActionContext businessActionContext, String msg) {log.info("筹备 do business:{}",msg);
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {log.info("business 曾经 commit");
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {log.info("business 曾经 rollback");
        return true;
    }
}

最初,开启全局事务办法同 AT 模式。

@SneakyThrows
    @GlobalTransactional
    public void doBusiness(BusinessActionContext context, String msg){accountFeignDao.draw(3, new BigDecimal(100));
        businessAction.doBusiness(context, msg);
    }

4. 遇到的问题

4.1 client TM/RM 无奈注册到 TC

在部署 seata 我的项目时经常会遇到这样的问题:在本地调试时一切正常,然而当试图部署到线上时,总是在 clinet 端提醒注册 TC 端失败。

  • 这是因为 client 须要先通过服务发现,找到注册中心里 seata-server 的服务信息,而后再与 seata-server 建设连贯。不过线上的 consul 采纳了多数据中心模式,在调用 consul api 时,必须加上 dc 参数项,否则将无奈返回正确的服务信息;然而,seata 提供的 consul 服务发现组件仿佛并不反对 dc 参数的配置。
  • 还有一个起因也会导致 client 无奈连贯到 TC:seata 的 consul 客户端在调用服务状态监控 api 时,应用了 wait 与 index 参数,从而使 consul 查问进入了阻塞查问模式。此时 client 对 consul 中要查问的 key 做监听,只有当 key 发生变化或者达到最大申请工夫时,才会返回后果。貌似因为 consul 版本的问题,这个阻塞查问并没有监听到 key 的变动,反而会让服务发现的线程陷入有限期待之中,天然也就无奈让 client 获取到 server 的注册信息了。

4.2 高可用部署

seata 服务的高可用部署 只反对注册核心模式。因而,咱们须要想方法将 file.conf 文件以键值对的模式存到 consul 中。

遗憾的是,consul 并没有显式反对 namespace,咱们只能在 put 申请中用“/”为分隔符起到相似的成果。当然,seata 框架也没有思考到这一点。所以咱们须要批改源码中的 Configuration 接口与 RegistryProvider 接口的 consul 实现类,减少 namespace 属性

4.3global_log 与 branch_log

TC 在想 mysql 插入日志数据时,偶然会报:

Caused by: java.sql.SQLException: Incorrect string value:

application_data 字段其实就是对业务数据的记录。官网给出的建表语句是这样的:

CREATE TABLE IF NOT EXISTS `global_table`
(`xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

显然,VARCHAR(2000)的大小是不适合的,utf8 的格局也是不适合的。所以咱们须要批改 seata 对于 数据源连贯 的局部代码:

// connectionInitSql 设置
    protected Set<String> getConnectionInitSqls(){Set<String> set = new HashSet<>();
        String connectionInitSqls = CONFIG.getConfig(ConfigurationKeys.STORE_DB_CONNECTION_INIT_SQLS);
        if(StringUtils.isNotEmpty(connectionInitSqls)) {String[] strs = connectionInitSqls.split(",");
            for(String s:strs){set.add(s);
            }
        }
        // 默认反对 utf8mb4
        set.add("set names utf8mb4");
        return set;
    }

5. 自定义开发

5.1 利用 SPI 机制编写自定义组件

seata 基于 java 的 spi 机制提供了自定义实现接口的性能,咱们只须要在本人的服务中,依据 seata 的接口写好本人的实现类即可。

SPI(Service Provider Interface)是 JDK 内置的服务发现机制,用在不同模块间通过接口调用服务,防止对具体服务服务接口具体实现类的耦合。比方 JDBC 的数据库驱动模块,不同数据库连贯驱动接口雷同但实现类不同,在应用 SPI 机制以前调用驱动代码须要间接在类里采纳 Class.forName(具体实现类全名)的形式调用,这样调用方依赖了具体的驱动实现,在替换驱动实现时要批改代码。

ConsulRegistryProvider 为例:

  • ConsulRegistryServiceImpl

    
    // 减少 DC 和 namespace
      private static String NAMESPACE;
      private static String DC;
    
      private ConsulConfiguration() {Config registryCongig = ConfigFactory.parseResources("registry.conf");
          NAMESPACE = registryCongig.getString("config.consul.namespace");
          DC = CommonSeataConfiguration.getDatacenter();
          consulNotifierExecutor = new ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM, Integer.MAX_VALUE,
                  TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
                  new NamedThreadFactory("consul-config-executor", THREAD_POOL_NUM));
      }
      ... ...
    // 同时在 getHealthyServices 中,删除申请参数 wait&index    
      /**
       * get healthy services
       *
       * @param service
       * @return
       */
      private Response<List<HealthService>> getHealthyServices(String service, long index, long watchTimeout) {return getConsulClient().getHealthServices(service, HealthServicesRequest.newBuilder()
                  .setTag(SERVICE_TAG)
                  .setDatacenter(DC)
                  .setPassing(true)
                  .build());
      }
  • ConsulRegistryProvider 留神 order 要大于 seata 包中的默认值 1,seata 类加载器会优先加载 order 更大的实现类

    @LoadLevel(name = "Consul" ,order = 2)
    public class ConsulRegistryProvider implements RegistryProvider {
     @Override
     public RegistryService provide() {return ConsulRegistryServiceImpl.getInstance();
     }
    }
  • 而后在 META-INF 的 services 目录下增加:io.seata.discovery.registry.RegistryProvider

    
    com.youdao.ke.courseop.common.seata.ConsulRegistryProvider
    

    这样就能够替换 seata 包中的实现了。

5.2 common-seata 工具包

对于这些自定义实现类,以及一些公共 client 配置,咱们能够对立封装到一个工具包下:

这样,其余我的项目只须要引入这个工具包,就能够无需繁琐的配置,间接应用了。

gradle 引入 common 包:


api 'com.youdao.ke.courseop.common:common-seata:0.0.+'

6. 落地实例

以一个物流场景为例:
业务架构

  • logistics-server(物流服务)
  • logistics-k3c-server (物流 - 金蝶客户端,封装调用金蝶服务的 api
  • elasticsearch

业务背景:logistics 执行领用单新增,在 elasticsearch 中更新数据,同时通过 rpc 调用 logistics-k3c 的金蝶出库办法,生成金蝶单据,如图 2 所示

问题:如果 elasticsearch 单据更新出现异常,金蝶单据将无奈回滚,造成数据不统一的问题。

在部署完 seata 线上服务后,只须要在 logistics 与 logistics-k3c 中别离引入 common-seata 工具包

logistics 服务

 // 应用全局事务注解开启全局事务
    @GlobalTransactional
    @Transactional(rollbackFor = Exception.class)
    public void Scm 通过(StaffOutStockDoc staffOutStock, String body) throws Exception {
        ... 一些业务解决...
         // 构建金蝶单据申请
        K3cApi.StaffoutstockReq req = new K3cApi.StaffoutstockReq();
        req.materialNums = materialNums;
        req.staffOutStockId = staffOutStock.id;
        ... 一些业务解决 ...
       // 调用 logistics-k3c-api 金蝶出库
        k3cApi.staffoutstockAuditPass(req);

        staffOutStock.status = 待发货;
        staffOutStock.scmAuditTime = new Date();
        staffOutStock.updateTime = new Date();
        staffOutStock.historyPush("scm 通过");
        // 更新对象后存入 elasticsearch
        es.set(staffOutStock);
    }

logistics-k3c

因为咱们新增单据接口是调用金蝶的服务,所以这里应用 TCC 模式构建事务接口

  • 首先创立 StaffoutstockCreateAction 接口

    @LocalTCC
    public interface StaffoutstockCreateAction {@TwoPhaseBusinessAction(name = "staffoutstockCreate")
      boolean create(BusinessActionContext businessActionContext,
                         @BusinessActionContextParameter(paramName = "staffOutStock") StaffOutStock staffOutStock,
                         @BusinessActionContextParameter(paramName = "materialNum") List<Triple<Integer, Integer, Integer>> materialNum);
    
      boolean commit(BusinessActionContext businessActionContext);
    
      boolean rollback(BusinessActionContext businessActionContext);
    
    }
  • 接口实现 StaffoutstockCreateActionImpl

    @Slf4j
    @Service
    public class StaffoutstockCreateActionImpl implements StaffoutstockCreateAction {
    
      @Autowired
      private K3cAction4Staffoutstock k3cAction4Staffoutstock;
    
      @SneakyThrows
      @Transactional(rollbackFor = Exception.class)
      @Override
      public boolean create(BusinessActionContext businessActionContext, StaffOutStock staffOutStock, List<Triple<Integer, Integer, Integer>> materialNum) {
          // 金蝶单据新增
          k3cAction4Staffoutstock.staffoutstockAuditPass(staffOutStock, materialNum);
          return true;
      }
    
      @SneakyThrows
      @Transactional(rollbackFor = Exception.class)
      @Override
      public boolean commit(BusinessActionContext businessActionContext) {Map<String, Object> context = businessActionContext.getActionContext();
          JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock");
          // 如果尝试新增胜利,commit 不做任何事
          StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class);
          log.info("staffoutstock {} commit successfully!", staffOutStock.id);
          return true;
      }
    
      @SneakyThrows
      @Transactional(rollbackFor = Exception.class)
      @Override
      public boolean rollback(BusinessActionContext businessActionContext) {Map<String, Object> context = businessActionContext.getActionContext();
          JSONObject staffOutStockJson = (JSONObject) context.get("staffOutStock");
          StaffOutStock staffOutStock = staffOutStockJson.toJavaObject(StaffOutStock.class);
          // 这里调用金蝶单据删除接口进行回滚
          k3cAction4Staffoutstock.staffoutstockRollback(staffOutStock);
          log.info("staffoutstock {} rollback successfully!", staffOutStock.id);
          return true;
      }
    }
    
  • 封装为业务办法

    /**
       * 项目组领用 & 报废的审核通过:新增其余出库单
       * 该办法应用 seata-TCC 计划实现全局事务
       * @param staffOutStock
       * @param materialNum
       */
      
      @Transactional
      public void staffoutstockAuditPassWithTranscation(StaffOutStock staffOutStock,
                                                        List<Triple<Integer, Integer, Integer>> materialNum){staffoutstockCreateAction.create(null, staffOutStock, materialNum);
      }
  • k3c API 实现类

    
     @SneakyThrows
      @Override
      public void staffoutstockAuditPass(StaffoutstockReq req) {
          ... 一些业务解决办法 ...
          // 这里调用了封装好的事务办法
          k3cAction4Staffoutstock.staffoutstockAuditPassWithTranscation(staffOutStock, triples);
      }

这样,一个 基于 TCC 的全局事务链路 就建设起来了。

当全局事务 执行胜利 时,咱们能够在 server 中看到打印的日志(如图 3):

如果全局事务 执行失败,会进行回滚,此时会执行接口中的 rollback,调用金蝶接口删除生成的单据,如图 4。

7. 总结

本文以 seata 框架的部署与应用为 主线 ,记录了seata 框架 使用的一些 关键步骤与技术细节,并针对我的项目落地时遇到的一些的技术问题提供了解决方案。

在后续的推文中,咱们还将持续以 seata 框架的源码解析为主线,向大家介绍 seata 实现分布式事务的外围原理与技术细节。
-END-

退出移动版