关于java:千亿级数据平滑扩容之应用服务动态扩容实现

9次阅读

共计 10346 个字符,预计需要花费 26 分钟才能阅读完成。

1. ShardingJDBC 的集成配置

  1. POM 依赖配置

    <dependencies>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
    
        <!-- spring boot 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <!-- sharding-jdbc 依赖 -->
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-core</artifactId>
            <version>${sharding.jdbc.version}</version>
        </dependency>
    
        <!-- sharding-jdbc 服务编排依赖 -->
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-orchestration</artifactId>
            <version>${sharding.jdbc.version}</version>
        </dependency>
    
        <!-- mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
    
        <!-- druid 数据库连接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>
    
        <!-- Spring data jpa 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
    </dependencies>
  2. 数据源配置

    server:
      port: 10692
    spring:
      application:
        name: dynamic-database
      # 第一个数据源配置,采纳 Druid
      datasource:
        tradesystem:
          type: com.alibaba.druid.pool.DruidDataSource
          driver-class-name: com.mysql.cj.jdbc.Driver
          username: root
          password: 654321
          url: jdbc:mysql://10.10.20.130:3306/smooth?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
          druid:
            # 连接池的配置信息
            # 初始化大小,最小,最大
            initial-size: 5
            min-idle: 5
            maxActive: 20
            # 配置获取连贯期待超时的工夫
            maxWait: 60000
            # 配置距离多久才进行一次检测,检测须要敞开的闲暇连贯,单位是毫秒
            timeBetweenEvictionRunsMillis: 60000
            # 配置一个连贯在池中最小生存的工夫,单位是毫秒
            minEvictableIdleTimeMillis: 300000
            validationQuery: SELECT 1
            testWhileIdle: true
            testOnBorrow: false
            testOnReturn: false
            # 关上 PSCache,并且指定每个连贯上 PSCache 的大小
            poolPreparedStatements: true
            maxPoolPreparedStatementPerConnectionSize: 20
            # 配置监控统计拦挡的 filters,去掉后监控界面 sql 无奈统计,'wall' 用于防火墙
            filters: stat,wall,log4j
            # 通过 connectProperties 属性来关上 mergeSql 性能;慢 SQL 记录
            #connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
  3. ShardingJDBC 代码配置

    分库配置规定:

    /**
     * 分库配置规定
     */
    public class ShardingDataSourceRule implements PreciseShardingAlgorithm<Long> {
    
        /**
         * 分片规定,取模运算
         */
        public static int MOD = 1;
    
        /**
         * 依据账户 ID 做分库解决
         * @param names
         * @param value
         * @return
         */
        @Override
        public String doSharding(Collection<String> names, PreciseShardingValue<Long> preciseShardingValue) {Long accountNo = preciseShardingValue.getValue();
            String dataSource = DatasourceEnum.DATASOURCE_PREFIX.getValue() + accountNo % MOD;
            return dataSource;
        }
    }

    这里假如依据账户 ID 来做分库解决,依据账户 ID 取模计算分库信息。
    分表配置规定:

      /**
         * 表分片规定
         */
      public class ShardingTableRule implements PreciseShardingAlgorithm<Long> {
              @Override
              public String doSharding(Collection<String> collection, PreciseShardingValue<Long> preciseShardingValue) {
                      // 不做分表处理,间接返回表名
                      return preciseShardingValue.getLogicTableName();}
      }

    如有须要,能够在这里设置分表配置规定,因为是做数据库的平滑扩容,只有实现分库即可,这里就不做分表的配置,采纳默认表名即可。

    分片规定的集成配置:

    /**
     * 分片规定的集成配置
     */
    private TableRuleConfiguration orderRuleConfig(){// 订单表,多个分片示例:"DB_${1..3}.t_order_${1..3}"  ds_0.t_trade_order
        DynamicShardingService.SHARDING_RULE_DATASOURCE =  DatasourceEnum.DATASOURCE_1.getValue();
        String actualDataNodes =  DatasourceEnum.DATASOURCE_1.getValue() + "." + DatasourceEnum.TABLE_ORDER.getValue() ;
        TableRuleConfiguration tableRuleConfig = new TableRuleConfiguration(DatasourceEnum.TABLE_ORDER.getValue(), actualDataNodes);
        // 设置分表策略
        tableRuleConfig.setDatabaseShardingStrategyConfig(new StandardShardingStrategyConfiguration("accountNo", new ShardingDataSourceRule()));
        tableRuleConfig.setTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("accountNo",new ShardingTableRule()));
        // 记录订单表的分片规定,便于后续编排治理
        DynamicShardingService.SHARDING_RULE_TABLE_ORDER = actualDataNodes;
        return tableRuleConfig;
    }
    
    /**
     * 数据源 Sharding JDBC 配置
     * @return
     */
    @Bean(name = "tradeSystemDataSource")
    @Primary
    @DependsOn("tradeDruidDataSource")
    public DataSource tradeSystemDataSource(@Autowired DruidDataSource tradeDruidDataSource) throws Exception{ShardingRuleConfiguration shardJdbcConfig = new ShardingRuleConfiguration();
        shardJdbcConfig.getTableRuleConfigs().add(orderRuleConfig());
        ...
    }

    在 orderRuleConfig 办法外面配置分片规定,在 tradeSystemDataSource 办法外面退出分片规定配置。

2. 服务编排性能(自定义注册核心)

2.0.0.M1 版本开始,Sharding-JDBC 提供了数据库治理编排性能,次要包含:

  • 配置集中化与动态化,可反对数据源、表与分片及读写拆散策略的动静切换
  • 数据治理。提供熔断数据库拜访程序对数据库的拜访和禁用从库的拜访的能力
  • 反对 Zookeeper 和 Etcd 的注册核心

这里要实现动静数据源的切换,须要退出编排性能。

本地注册核心的实现类,LocalRegistryCenter 要害代码:

public class LocalRegistryCenter implements RegistryCenter {
    /**
     * 注册事件监听缓存记录
     */
    public static Map<String, DataChangedEventListener> listeners = new ConcurrentHashMap<>();
    
    private RegistryCenterConfiguration config;
    
    private Properties properties;
    /**
     * 记录 Sharding 节点配置信息
     */
    public static Map<String, String> values = new ConcurrentHashMap<>();
    ...
        
    @Override
    public void watch(String key, DataChangedEventListener dataChangedEventListener) {if (null != dataChangedEventListener) {
            // 将 Sharding 事件监听器缓存下来
            listeners.put(key, dataChangedEventListener);
        }
    }
    ...
    @Override
    public String getType() {
        // 标识本地注册核心的注入名称
        return "localRegisterCenter";
    }
    ...
    
}
        

通过 SPI 机制,主动注入,创立配置文件:

org.apache.shardingsphere.orchestration.reg.api.RegistryCenter 内容指向方才创立的配置类:

com.itcast.database.smooth.config.LocalRegistryCenter

最初在数据源配置外面退出配置类:

public DataSource tradeSystemDataSource(@Autowired DruidDataSource tradeDruidDataSource) throws Exception{ShardingRuleConfiguration shardJdbcConfig = new ShardingRuleConfiguration();
    shardJdbcConfig.getTableRuleConfigs().add(orderRuleConfig());
    shardJdbcConfig.setDefaultDataSourceName(DatasourceEnum.DATASOURCE_1.getValue());

    Properties props = new Properties();
    // 打印 sql 语句,生产环境敞开缩小日志量
    props.setProperty("sql.show",Boolean.TRUE.toString());

    Map<String,DataSource> dataSourceMap = new LinkedHashMap<>() ;
    dataSourceMap.put(DatasourceEnum.DATASOURCE_1.getValue(),tradeDruidDataSource) ;
    // 服务编排配置,退出本地注册核心配置类
    OrchestrationConfiguration orchestrationConfig = new OrchestrationConfiguration(DYNAMIC_SHARDING, new RegistryCenterConfiguration("localRegisterCenter"),
            false);
    return OrchestrationShardingDataSourceFactory.createDataSource(dataSourceMap, shardJdbcConfig, props,
            orchestrationConfig);

}

3. 动静切换实现(预约义形式)

  1. 在配置文件减少第二个数据源:

    ...
        # 减少第二个数据源配置
        tradesystem2:
          type: com.alibaba.druid.pool.DruidDataSource
          driver-class-name: com.mysql.cj.jdbc.Driver
          username: root
          password: 654321
          url: jdbc:mysql://10.10.20.126:3306/smooth?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
          druid:
            # 连接池的配置信息
            # 初始化大小,最小,最大
            initial-size: 5
            min-idle: 5
            maxActive: 20
            # 配置获取连贯期待超时的工夫
            maxWait: 60000
            # 配置距离多久才进行一次检测,检测须要敞开的闲暇连贯,单位是毫秒
            timeBetweenEvictionRunsMillis: 60000
            # 配置一个连贯在池中最小生存的工夫,单位是毫秒
            minEvictableIdleTimeMillis: 300000
            validationQuery: SELECT 1
            testWhileIdle: true
            testOnBorrow: false
            testOnReturn: false
            # 关上 PSCache,并且指定每个连贯上 PSCache 的大小
            poolPreparedStatements: true
            maxPoolPreparedStatementPerConnectionSize: 20
            # 配置监控统计拦挡的 filters,去掉后监控界面 sql 无奈统计,'wall' 用于防火墙
            filters: stat,wall,log4j
            # 通过 connectProperties 属性来关上 mergeSql 性能;慢 SQL 记录
            #connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
  2. 代码配置:

    减少第二个数据源配置的配置,退出 MAP 中:

  3. sharding 分片规定配置:

    这里会通过接口来调用,实现 Sharding 数据源的动静切换:

    /**
      * 替换 sharding 里的分片规定
      */
    public void replaceActualDataNodes(String newRule){
        // 获取已有的配置
        String rules = LocalRegistryCenter.values
                .get("/" + DruidSystemDataSourceConfiguration.DYNAMIC_SHARDING + "/config/schema/logic_db/rule");
        // 批改为新的分片规定
        String rule = rules.replace(SHARDING_RULE_TABLE_ORDER, newRule);
        LocalRegistryCenter.listeners.get("/" + DruidSystemDataSourceConfiguration.DYNAMIC_SHARDING + "/config/schema")
                .onChange(new DataChangedEvent(
                        "/" + DruidSystemDataSourceConfiguration.DYNAMIC_SHARDING + "/config/schema/logic_db/rule",
                        rule, DataChangedEvent.ChangedType.UPDATED));
        LocalRegistryCenter.values.put("/" + DruidSystemDataSourceConfiguration.DYNAMIC_SHARDING + "/config/schema/logic_db/rule",rule);
        SHARDING_RULE_TABLE_ORDER = newRule;
    
    }

    依据传递的取模参数进行调用批改,如果 mod 为 2 代表要分两个库:

  4. 创立两个数据库及对应表构造
  5. 启动服务测试验证

    拜访接口地址,服务启动默认只有一个数据源失效,所有数据都会落在一台数据库节点。

    动静调整让第二个数据源失效,扩容为 2 个数据源:

    从后盾日志能够看到 Sharding 分片规定已失效:

    这样数据,就会依据取模规定,落至不同的数据源节点。

4. 动静切换实现(动静增加形式)

在理论利用当中,可能并不能事后晓得所要扩容的机器节点信息,这时候就须要实现动静增加的形式。

  1. 删除原来的预约义数据源配置,只加载一个数据源即可。
  2. 批改动静分片的实现:
    DynamicShardingService:

    public void dynamicSharding(int mod) {
            ShardingDataSourceRule.MOD = mod;
            String newRule = DatasourceEnum.DATASOURCE_PREFIX.getValue() + "${0.." + (mod - 1) + "}";
            if(mod == 1) {...}else {
                // 动静数据源配置实现扩容
                Properties properties = loadPropertiesFile("dynamic_datasource.properties");
                try {log.info("load datasource config url:" + properties.get("url"));
                    DruidDataSource druidDataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(properties);
                    druidDataSource.setRemoveAbandoned(true);
                    druidDataSource.setRemoveAbandonedTimeout(600);
                    druidDataSource.setLogAbandoned(true);
                    // 设置数据源谬误重连工夫
                    druidDataSource.setTimeBetweenConnectErrorMillis(60000);
                    druidDataSource.init();
                    OrchestrationShardingDataSource dataSource = SpringContextUtil.getBean("tradeSystemDataSource", OrchestrationShardingDataSource.class);
                    Map<String, DataSource> dataSourceMap = dataSource.getDataSource().getDataSourceMap();
                    dataSourceMap.put(DatasourceEnum.DATASOURCE_2.getValue(), druidDataSource);
    
                    Map<String, DataSourceConfiguration> dataSourceConfigMap = new HashMap<String, DataSourceConfiguration>();
                    for(String key : dataSourceMap.keySet()) {dataSourceConfigMap.put(key, DataSourceConfiguration.getDataSourceConfiguration(dataSourceMap.get(key)));
                    }
                    String result = SHARDING_RULE_TABLE_ORDER.replace(SHARDING_RULE_DATASOURCE, newRule);
                    replaceActualDataNodes(result);
                    SHARDING_RULE_DATASOURCE = newRule;
                    // 从新数据源配置
                    dataSource.renew(new DataSourceChangedEvent(
                            "/" + DruidSystemDataSourceConfiguration.DYNAMIC_SHARDING + "/config/schema/logic_db/datasource",
                            dataSourceConfigMap));
                    return;  
                } catch (Exception e) {log.error(e.getMessage(), e);
                }   
            }          
            String result = SHARDING_RULE_TABLE_ORDER.replace(SHARDING_RULE_DATASOURCE, newRule);
            replaceActualDataNodes(result);
            SHARDING_RULE_DATASOURCE = newRule;  
        }

    如果取模分片大于 1,走扩容解决逻辑,在这里能够将扩容数据源信息写至配置文件内(也能够从配置核心读取),而后动态创建数据源,重写 Sharding 的编排配置 OrchestrationShardingDataSource。

    扩容的数据源配置文件放至资源目录下:

    dynamic_datasource.properties

    driverClassName=com.mysql.cj.jdbc.Driver
    username=root
    password=654321
    url=jdbc:mysql://10.10.20.131:3306/smooth?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC
    initialSize=5
    minIdle=5
    maxActive=20
    maxWait=60000
    timeBetweenEvictionRunsMillis=60000
    minEvictableIdleTimeMillis=300000
    validationQuery=SELECT 1
    testWhileIdle=true
    testOnBorrow=false
    testOnReturn=false
  3. 测试验证
    参照下面的形式进行测试验证,这样就能够在不须要重启服务的状况下,任意增加数据源节点。

5. ShardingJDBC 应用注意事项

Sharding JDBC, Mycat, Drds 等产品都是分布式数据库中间件, 相比间接的数据源操作, 会存在一些限度, Sharding JDBC 在应用时, 须要留神以下问题,防止采坑:

  • 无限反对子查问
  • 不反对 HAVING
  • 不反对 OR,UNION 和 UNION ALL
  • 不反对非凡 INSERT
  • 每条 INSERT 语句只能插入一条数据,不反对 VALUES 后有多行数据的语句
  • 不反对 DISTINCT 聚合
  • 不反对 dual 虚构表查问
  • 不反对 SELECT LAST_INSERT_ID(), 不反对自增序列
  • 不反对 CASE WHEN
    • *
      本文由 mirson 创作分享,如需进一步交换,请加 QQ 群:19310171 或拜访 www.softart.cn

正文完
 0