共计 15692 个字符,预计需要花费 40 分钟才能阅读完成。
原因
Spring Cloud 作为微服务框架,工作当中必用。起因也很简略,Spring cloud 依靠 Springboot,背靠 Spring Framework,又有 Eureka 和 Alibaba 两个大厂的反对,大多数的互联网公司会优先选择以此作为后端开发框架。微服务架构下每个服务都会有一个数据库,不同的数据库、不同的服务实例都会产生事务问题。
简述
微服务架构下的性能开发会遇到分布式事务问题,有 2PC 3PC TCC 等解决方案,基于性能思考,公司会思考应用 Seata 作为分布式事务解决方案,其中提供了基于 2PC 的 AT 模式、TCC 模式、Saga 模式等等,不过最罕用的还是 AT 模式,只须要一个 GlobalTransaction 注解即可,无业务侵入性。
引入 Seata
-
pom 引入
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency>
-
服务配置
seata.enabled=false seata.application-id=${spring.application.name} seata.tx-service-group=default_tx_group seata.registry.type=nacos seata.registry.nacos.application=seata-server seata.registry.nacos.server-addr=localhost:8848 seata.registry.nacos.namespace=41eddc87-5fc2-4fdf-8d4f-e16118a143e0 seata.registry.nacos.group=SEATA_GROUP seata.config.type=nacos seata.config.nacos.server-addr=localhost:8848 seata.config.nacos.group=SEATA_GROUP seata.config.nacos.namespace=41eddc87-5fc2-4fdf-8d4f-e16118a143e0 seata.config.nacos.data‐id=seataServer.properties
-
nacos 配置
配置项 内容 Data ID service.vgroupMapping.default_tx_group Group SEATA_GROUP 配置内容 default / / Data ID seataServer.properties Group SEATA_GROUP 配置内容 default seataServer.properties 配置参考,不同手敲,文档中有脚本的执行形式,
sh ${SEATAPATH}/script/config-center/nacos/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca
- 性能上开启 @GlobalTransaction 注解即可
SpringBoot 集成 Seata
剖析 Seata 的集成形式,还是要从 spring-cloud-starter-alibaba-seata 开始,此 maven 依赖引入:
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.3.0</version>
<scope>compile</scope>
</dependency>
Seata 的集成逻辑在 Seata-spring-boot-starter 当中。以下是依赖源文件中的 spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration
SeataAutoConfiguration
- 默认有 maven 依赖就会加载,能够通过配置敞开 Seata: seata.enabled=false.
- 引入默认实现的 FailureHandler。
- 引入 GlobalTransactionScanner。
GlobalTransactionScanner 实现了 InitializingBean ApplicationContextAware DisposableBean,在对应的 Bean 周期实现对 Seata 的操作,集成 AbstractAutoProxyCreator 实现 Aop 的逻辑,使得办法上标注 @GlobalTransaction 注解就能开启分布式事务性能。
- 实例化后执行客户端初始化 initClient,创立 TM 客户端 RM 客户端
- Bean 销毁时执行 Seata 扩大的销毁机制,能够触发 TMClient RMClient 中的敞开办法调用。
- 重写的 wrapIfNecessary() 办法中实现代理类的创立。能够解决 GlobalTransaction 注解,也是剖析的入口。
初始化 TM RM 客户端
初始化 TM 客户端,首先会创立一个 TmNettyRemotingClient 对象,创建对象须要一个线程池,能够通过参数配置线程池。初始化的逻辑比拟有意思,外面封装了 Netty 作为客户端连贯工具。
创立过程:
-
创立父类 AbstractNettyRemotingClient
- 创立父类 AbstractNettyRemoting,将客户端的 messageExecutor 与 Bean 销毁关联。
- AbstractNettyRemotingClient 构造方法中创立一些要害对象
- 创立 NettyClientBootstrap 对象,创立 Netty 客户端的细节就在其中。
- 创立 ChannelDuplexHandler 实现类 io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler,把 Netty 的 channel 办法比方 channelRead(),和 Seata 注册对应的处理器关联起来,有对应的音讯后,就会调用对应的处理器,channelInactive() exceptionCaught() userEventTriggered() 保护着 Seata 中的 channel 的存活和心跳,保护连接池中的有效性。
- 创立 NettyClientChannelManager 对象,应用 apache common pools 池化 netty Channel,防止每次与 Seata 服务交互都要建设连贯。连贯的建设和删除逻辑就在其中。
- seata 能够通过 SPI 扩大性能,比方 AuthSigner。
- 读取是否能够批量申请的配置。并设置监听器监听此配置
- 单例对象
初始化过程:
- 注册 TC 的响应处理器 ClientOnResponseProcessor
- 注册心跳音讯处理器 ClientHeartbeatProcessor
- 执行父类 AbstractNettyRemotingClient 初始化,定时刷新注册核心指定分组的 Seata 服务实例,将实例地址拿到后,建设连贯并放入连接池 NettyClientChannelManager。若容许批量申请,则创立合并申请的线程池,合并申请的解决逻辑在 MergedSendRunnable 中。
- 执行父类 AbstractNettyRemoting 初始化,单线程定时执行清理哪些超时的异步申请
- 启动 Netty。
初始化 RM 客户端,创立和初始化过程与 TM 基本一致。
初始化过程:
- 注册分支事务提交处理器 RmBranchCommitProcessor
- 注册分支事务回滚处理器 RmBranchRollbackProcessor
- 注册 undo log 处理器 RmUndoLogProcessor
- 注册 TC 的响应处理器 ClientOnResponseProcessor
- 注册心跳音讯处理器 ClientHeartbeatProcessor
- …
GlobalTransaction 注解
Bean 初始化实现后调用初始化后的 postProcessors,applyBeanPostProcessorsAfterInitialization(),Seata 注册了一个 GlobalTransactionScanner,继承了 AbstractAutoProxyCreator,具备了创立代理的能力,每个 Bean 初始化后都有机会进入 GlobalTransactionScanner.wrapIfNecessary()。
进入 wrapIfNecessary()
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// 创立 TccActionInterceptor
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
// 非 GlobalTransactional GlobalLock 标注的类或者办法 跳过解决
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {return bean;}
if (interceptor == null) {if (globalTransactionalInterceptor == null) {
// 创立 GlobalTransactionalInterceptor
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
// 调用父类的 wrapIfNecessary,创立代理类
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {throw new RuntimeException(exx);
}
}
// AbstractAutoProxyCreator
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// ...
// Create proxy if we have advice.
// 调用子类的 getAdvicesAndAdvisorsForBean 返回 GlobalTransactionalInterceptor 对象。// 将 seata 性能织入 Bean 代理类中
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
if (specificInterceptors != DO_NOT_PROXY) {this.advisedBeans.put(cacheKey, Boolean.TRUE);
Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());
return proxy;
}
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}
标注 GlobalTransactional GlobalLock 注解的类或者办法的 Bean 曾经创立了代理,那么在 Bean 办法调用的时候就会进入 GlobalTransactionalInterceptor 的 invoke() 办法中执行 Seata 的逻辑。
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
// 获取指标类
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
// 获取指标办法,比方:Foo.bar()
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {// 获取原生办法 bar()
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 解析出注解
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
// disable 代表是否禁用 service.disableGlobalTransaction,默认是启用状态
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
// 构建 AspectTransactional 对象,将注解中的参数解析到 对象中
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.rollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes());
} else {transactional = this.aspectTransactional;}
// 进入 GlobalTransaction 解决流程
return handleGlobalTransaction(methodInvocation, transactional);
} else if (globalLockAnnotation != null) {
// 进入 GlobalLock 解决流程
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();}
以上办法是代理类的加强逻辑入口,次要是解析出以后办法的注解,进入 GlobalTransaction 的解决流程。
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final AspectTransactional aspectTransactional) throws Throwable {
boolean succeed = true;
try {
// 全局事务的执行是在 transactionalTemplate 对象中进行
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {return methodInvocation.proceed();
}
public String name() {String name = aspectTransactional.getName();
if (!StringUtils.isNullOrEmpty(name)) {return name;}
return formatMethod(methodInvocation.getMethod());
}
// 把注解中的参数解析为 TransactionInfo 对象
@Override
public TransactionInfo getTransactionInfo() {
// reset the value of timeout
int timeout = aspectTransactional.getTimeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {timeout = defaultGlobalTransactionTimeout;}
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(aspectTransactional.getPropagation());
transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());
transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getRollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {TransactionalExecutor.Code code = e.getCode();
// 异样解决机制
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
以上会把 GlobalTransaction 注解中的参数转换为一个事务,另创立一个事务执行器,在事务模板中执行事务。
/**
* Execute object.
*
* @param business the business
* @return the object
* @throws TransactionalExecutor.ExecutionException the execution exception
*/
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo 获取事务
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'. 获取以后全局事务,若有空则是事务参与者
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation. 依照流传行文处理事务
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {switch (propagation) {
case NOT_SUPPORTED:// 不反对事务,将以后事务挂起,以无事务的形式执行
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:// 须要新事务,挂起以后事务,开始一个新事务
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();}
// Continue and execute with new transaction
break;
case SUPPORTS:// 反对事务或者无事务运行,没有事务则间接执行办法
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:// 须要事务,以后事务有则是五以后事务,若没有创立一个事务
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:// 不反对事务
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();}
case MANDATORY: // 必须要一个事务
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null) {tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
// 发动全局事务注册
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business 执行本地办法,作为事务发起者,本地办法中产生的任何近程调用都将携带 XID
rs = business.execute();} catch (Throwable ex) {
// 发动回滚申请,TC 收到后,会一一告诉子事务回滚
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit. 提交事务
commitTransaction(tx);
return rs;
} finally {
//5. clear 清理阶段
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();}
} finally {
// If the transaction is suspended, resume it. 复原挂起的事务
if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);
}
}
}
GlobalTransactionRole 全局事务角色分为发起者和参与者,以上下文是否有 GlobalTransaction 对象来判断。
本地办法执行后,近程调用也全副胜利,发起者就向 Seata 发动提交事务的申请。
全局事务 ID 的传递
Seata 反对的近程调用:SpringMvc Dubbbo sofa motan hsf grpc dubbo brpc.
SpringMvc 中自定义一个 HandlerInterceptor,在调用 preHandle 是绑定 seata 的 rpcId。向 Spring 注入一个 WebMvcConfigurer 实现类将自定义的 HandlerInterceptor 放入 MVC 中。
Dubbo 会自定义一个 Filter 扩大,绑定 seata 的 rpcId。
其余的近程调用反对,能够自行查看,逻辑比较简单,调用前绑定 rpcId,产生异样须要革除 rpcId
以上就是全局事务管理器的大抵内容。具体事务的管制在 RM 端,TM 客户端的的操作大多数在下面应用到。
数据源代理
SeataDataSourceAutoConfiguration 会引入 SeataAutoDataSourceProxyCreator,此对象将会创立一个数据库代理,代理类中加强了 RM 的逻辑,事务提交前要解决 RM 的逻辑。
SeataAutoDataSourceProxyCreator 也是一个 AbstractAutoProxyCreator 子类,会被 Spring 作为 AOP 解决,重写的 wrapIfNecessary() 中,将数据库对象进行加强,创立代理类 DataSourceProxy , 具体的创立代理逻辑在 super.wrapIfNecessary() 中。
DataSourceProxy.getConnection() 会创立一个 ConnectionProxy,组合了 targetDataSource.getConnection(),不会影响原有数据库连贯的应用。
ConnectionProxy 代理的 commit 办法中,就会 GlobalTransaction 进行了逻辑解决:注册本地事务,创立 undo log 执行本地提交。TC 向 RM 客户端收回提交或者回滚申请后,解决 undolog 即可。
@Override
public void commit() throws SQLException {
try {lockRetryPolicy.execute(() -> {
// 执行提交
doCommit();
return null;
});
} catch (SQLException e) {if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {rollback();
}
throw e;
} catch (Exception e) {throw new SQLException(e);
}
}
// 依据上下文判断执行哪种提交
private void doCommit() throws SQLException {if (context.inGlobalTransaction()) {processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {processLocalCommitWithGlobalLocks();
} else {targetConnection.commit();
}
}
// 全局事务提交
private void processGlobalTransactionCommit() throws SQLException {
try {
// 注册分支事务
register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {// undo log 出场 UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
// 真正的业务提交
targetConnection.commit();} catch (Throwable ex) {LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {report(true);
}
context.reset();}
总结
Seata 作为一款分布式的框架,提供了分布式事务的解决方案,其中的实现原理值得剖析,以上是对 Seata 客户端的局部进行理解说,Seata 服务端的内容同样精彩,敬请期待。