关于spring:动态数据源的那些事

5次阅读

共计 4757 个字符,预计需要花费 12 分钟才能阅读完成。

      没事刷博客,看到动静数据源,发现网上的代码啦,配置啦,千篇一律,你抄我来我抄你,没啥意义,网上开源的轮子也有 dynamic-datasource-spring-boot-starter,不过我看了下他那边的源码,不反对 spring 事务,这个我集体还是感觉比拟不喜爱的,而且在他本人实现的事务性能外面,貌似对事务嵌套也没做什么解决,它 DSTransactional 注解标记的办法最初间接就提交事务了。
      整个我的项目将应用 springboot + mybatis + mysql,简略点配置也少点,代码地址也在最初放进去了。
      PS:前情提醒,提前温习下 ThreadLocal 以及 aop 的用法,以及 dynamic-datasource-spring-boot-starter 为什么要应用栈的数据结构来存储事务内的所有连贯,mybatis 的一级缓存带来的问题。
      spring 其实曾经提供了一个很好的数据源,帮咱们实现运行时动静切换数据源,它就是 AbstractRoutingDataSource

简略来说,AbstractRoutingDataSource 外部保护了一个默认数据源以及一个 map(value 为 DataSource),它的 getConnection 办法,在运行时只有咱们实现 determineCurrentLookupKey 办法,返回一个 key,它就会去 map 中依据 key 找到对应的 DataSource(找不到则应用默认数据源),从而实现动静切换的性能。
      如果所有获取 connection 的形式,都是间接从 DataSource 来的话,此时咱们曾经完满实现了动静数据源的性能,而且并没有和 orm 框架强绑定,你以前是啥配置,当初仍旧啥配置,只是须要你注入的数据源是 AbstractRoutingDataSource 即可。不过,这是不是意味着咱们一劳永逸了?
      不论你是否本人钻研过 spring 事务源码,最起码咱们都晓得一个事实:一个事务内会复用 connection。究其原因,spring 事务 aop 在开启事务时,会间接获取 connection, 组装成 ConnectionHolder,而后通过 TransactionSynchronizationManager.bindResource(DataSource,ConnectionHolder),在其余须要获取 connection 的中央,则是优先通过 TransactionSynchronizationManager.getResource(dataSource)去获取 connection,这就导致了咱们的 AbstractRoutingDataSource,在一个事务内只会触发一次 getConnection,从而导致数据源切换失败!
      这时候有意思的来了,如果存在事务的状况下,即便会复用 connection,然而咱们让这个 connection 也具备 动静能力,是不是就解决了事务上面切换数据源的问题?
       间接上外围改写的类

package com.routing.datasource;

import com.routing.constant.DataSourceKeyConstant;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

import javax.sql.DataSource;
import java.lang.reflect.Constructor;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;

/**
 * 动静数据源
 */
public class DynamicDataSource extends AbstractRoutingDataSource {

    private Map<Object,Object> targetDataSource ;


    public DynamicDataSource(DataSource defaultDataSource, Map<Object, Object> targetDataSource){setDefaultTargetDataSource(defaultDataSource);
        setTargetDataSources(targetDataSource);
        this.targetDataSource = targetDataSource;
    }

    public Map<Object, Object> getTargetDataSource() {return targetDataSource;}

    /**
     * 获取实在的物理数据源信息
     * @return
     */
    public DataSource getPhysicsDataSource(){return determineTargetDataSource();
    }

    @Override
    public Connection getConnection() throws SQLException {Connection connection = super.getConnection();
        return getConnectionProxy(connection);
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {Connection connection = super.getConnection(username, password);
        return getConnectionProxy(connection);
    }

    @Override
    protected Object determineCurrentLookupKey() {return DataSourceHolder.getDbKey();
    }

    /**
     * 生成代理对象,connection 存在事务的状况下,复用 connection
     * @return
     */
    public Connection getConnectionProxy(Connection target){boolean existingTransaction = DataSourceHolder.isExistingTransaction();
        if(existingTransaction){Object dbKey = DataSourceHolder.getDbKey();
            if(dbKey == null){dbKey = DataSourceKeyConstant.PRIMARY;}
            ConnectionContext.initStack(dbKey,target);
        }
        // 生成代理类
        Class<?>[] interfaces = {Connection.class};
        Connection connection = (Connection) Proxy.newProxyInstance(ClassUtils.getDefaultClassLoader(),interfaces,new DynamicConnectionProxy(target));
        return connection;
    }


}

在获取到的 connection 上,咱们应用 jdk 代理的形式,如果存在事务,代理类 DynamicConnectionProxy 会动静实在物理库连贯,而后通过反射去执行实在物理库连贯的办法(一个事务内一个库的连贯重复使用)
而后看看 jdk 代理类的相干改写局部

package com.routing.datasource;

import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.sql.Connection;

/**
 * 动静数据源
 */
@Slf4j
public class DynamicConnectionProxy implements InvocationHandler {

    // 默认连贯
    private Connection target;

    // 是否已提交
    private boolean commit;

    // 是否已回滚
    private boolean rollback;

    public DynamicConnectionProxy(Connection target){this.target = target;}

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String methodName = method.getName();
        if("toString".equals(methodName)){return this.toString();
        }
        // 重写提交和回滚办法
        if("commit".equals(methodName)){log.info("connection 动静代理 commit 办法....");
            ConnectionContext.commitAllConnectionInTransaction();
            commit = true;
            return null;
        }
        if("rollback".equals(methodName)){log.info("connection 动静代理 rollback 办法....");
            ConnectionContext.rollbackAllConnectionInTransaction();
            rollback = true;
            return null;
        }
        if("close".equals(methodName)){
            // 以后连贯已被提交或者回滚
            if(commit || rollback){ConnectionContext.removeAllConnectionInTransaction(true);
                DataSourceHolder.cleanDbKey();
                return null;
            }
        }
        // 判断是否存在事务,如果存在事务,则提供动静切换 connection 的性能
        if(DataSourceHolder.isExistingTransaction()){Connection connection = ConnectionContext.getPhysicsConnection(target);
            return method.invoke(connection,args);
        }
        return method.invoke(target,args);
    }
}

代理类中保护了一个 target,次要是解决没有事物的状况,这个 target 本就是实在物理库的连贯,此时不须要 connection 有动静能力,全副由 target 间接执行即可;当存在事务的状况下,才须要咱们提供动静能力去解决这个问题。
这两个类曾经帮咱们解决了绝大部分动静数据源的问题,其余就是些 aop 以及 ThreadLocal 的利用了,就不丢进去了。
整个代码在 https://codeup.aliyun.com/620…,有须要的人本人去拉吧(QQ 1767028198)

正文完
 0