关于springboot:分布式任务调度系统chaconne介绍

39次阅读

共计 11697 个字符,预计需要花费 30 分钟才能阅读完成。

chaconne(音译夏空),是一款用 Java 写的,基于 SpringBoot 框架的轻量级的分布式任务调度框架。引入 chaconne 相干组件,能够十分快捷地帮忙你搭建一个分布式工作集群

chaconne 个性列表

  1. 完满反对 spring-boot 框架(2.2.0+)
  2. 反对多种形式设置的定时工作(Cron 表达式,参数设置等)
  3. 反对动静保留和删除工作
  4. 反对注解配置定时工作
  5. 反对两种集群模式(主备模式和负载平衡模式)
  6. 内置多种负载平衡算法,反对自定义负载平衡算法
  7. 反对失败重试和失败转移
  8. 反对日志追踪
  9. 反对工作参数分片解决
  10. 反对工作依赖(串行依赖和并行依赖)
  1. 反对 DAG 模仿工作流
  2. 反对工作自定义终止策略
  3. 反对工作超时冷却和重置
  4. 反对邮件告警

chaconne 两种集群部署模式:

  1. 去中心化部署模式
    没有固定的调度核心节点,集群会选举其中一个利用作为 Leader, 进行工作指挥调度
  2. 中心化部署模式
    分为调度核心和工作执行节点两个角色,且调度核心和工作执行节点都反对集群模式
    阐明:
    这里的集群是指参加工作执行的利用所组成的集群 (chaconne 集群),它和基于 SpringCloud 框架组成的集群是两个独立的概念
    如果 chaconne 集群规模较小,举荐应用去中心化部署模式,若该集群规模较大,根据理论状况,两者模式都能够应用。

chaconne 框架由两局部组成:

  1. chaconne-spring-boot-starter
    外围 jar 包,蕴含了 chaconne 全副外围性能
  2. chaconne-console
    Chaconne Web 治理界面,进行工作治理和查看工作运行状态

Maven:

<dependency>
      <groupId>com.github.paganini2008.atlantis</groupId>
     <artifactId>chaconne-spring-boot-starter</artifactId>
     <version>1.0-RC1</version>
</dependency>

兼容性:
Jdk 1.8+

chaconne 实现原理简述

chaconne 的底层是依赖 tridenter-spring-boot-starter 组件来实现工作集群模式的(主备模式和负载平衡模式),利用音讯单播机制(通过 Redis PubSub 模仿)来实现工作散发和负载平衡,分片解决等高级个性。须要指出的是,chaconne 框架中对于集群的定义和 tridenter 对于集群的定义是统一的,对于集群的概念,等同于用来区别不同的产品组或公司,同时 chaconne 也反对工作组的概念,它是可选配置,默认状况下,组名就是以后利用名称(${spring.application.name}),即当起了多个雷同利用名的利用,那这些利用就成为了一个工作组。chaconne 不仅反对跨组的工作调用,更反对跨集群的工作调用。

如何定义工作?

  1. 应用注解 @ChacJob
  2. 继承 ManagedJob 类
  3. 实现 Job 接口
  4. 实现 NotManagedJob 接口
    阐明:
    前 3 种定义工作的形式属于编程式定义工作,即通过代码形式定义一个工作,通过 Spring 框架启动并主动加载
    第 4 种定义的形式用来定义动静工作,即用户在 Web 界面上(Chaconne-Console)提交来创立工作或间接通过调用 HTTP API 创立工作,不同的是,该工作对象不属于 Spring 上下文托管的 Bean。

示例代码:

注解形式创立工作
@ChacJob
@ChacTrigger(cron = "*/5 * * * * ?")
public class DemoCronJob {

    @Run
    public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {log.info("DemoCronJob is running at: {}", DateUtils.format(System.currentTimeMillis()));
        return RandomUtils.randomLong(1000000L, 1000000000L);
    }

    @OnSuccess
    public void onSuccess(JobKey jobKey, Object result, Logger log) {log.info("DemoCronJob's return value is: {}", result);
    }

    @OnFailure
    public void onFailure(JobKey jobKey, Throwable e, Logger log) {log.error("DemoCronJob is failed by cause: {}", e.getMessage(), e);
    }

}
接口方式创立工作
@Component
public class HealthCheckJob extends ManagedJob {

    @Override
    public Object execute(JobKey jobKey, Object arg, Logger log) {log.info(info());
        return UUID.randomUUID().toString();
    }

    @Override
    public Trigger getTrigger() {return GenericTrigger.Builder.newTrigger("*/5 * * * * ?").setStartDate(DateUtils.addSeconds(new Date(), 30)).build();}

    private String info() {long totalMemory = Runtime.getRuntime().totalMemory();
        long usedMemory = totalMemory - Runtime.getRuntime().freeMemory();
        return FileUtils.formatSize(usedMemory) + "/" + FileUtils.formatSize(totalMemory);
    }

    @Override
    public long getTimeout() {return 60L * 1000;}

}
动静工作:
public class EtlJob implements NotManagedJob {

    @Override
    public Object execute(JobKey jobKey, Object attachment, Logger log) {log.info("JobKey:{}, Parameter: {}", jobKey, attachment);
        return null;
    }

}

工作依赖

工作依赖是 chaconne 框架的重要个性之一,工作依赖分为 串行依赖 并行依赖
所谓串行依赖是指工作 A 做完接着执行工作 B, 即工作 B 依赖工作 A。
并行依赖是指,比方有 3 个工作,别离为工作 A, 工作 B, 工作 C, 工作 A 和工作 B 都做完能力执行工作 C, 相似会签的业务场景。
串行依赖和并行依赖都能够共享工作上下文参数和运行后果,并且反对自定义判断策略来决定要不要触发上游工作。

DAG(有向无环图)

而在联合串行依赖和并行依赖的根底上,chaconne 框架又提供了 DAG 性能并提供了敌对的 API,来模仿相似工作流的业务场景,更加丰盛了工作依赖的应用场景。
(这里为了不便举例,都通过注解的形式配置工作)

串行依赖示例:
@ChacJob
@ChacTrigger(triggerType = TriggerType.DEPENDENT)
@ChacDependency({@ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoSchedJob", name = "demoSchedJob") })
public class DemoDependentJob {

    @Run
    public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {log.info("DemoDependentJob is running at: {}", DateUtils.format(System.currentTimeMillis()));
        return RandomUtils.randomLong(1000000L, 1000000000L);
    }

    @OnSuccess
    public void onSuccess(JobKey jobKey, Object result, Logger log) {log.info("DemoDependentJob's return value is: {}", result);
    }

    @OnFailure
    public void onFailure(JobKey jobKey, Throwable e, Logger log) {log.error("DemoDependentJob is failed by cause: {}", e.getMessage(), e);
    }

}
并行依赖示例:

有 3 个工作,DemoTask, DemoTaskOne, DemoTaskTwo
让 DemoTaskOne, DemoTaskTwo 都做完再执行 DemoTask,且 DemoTask 能够取得 DemoTaskOne, DemoTaskTwo 执行后的值

DemoTaskOne:
@ChacJob
@ChacTrigger(triggerType = TriggerType.SIMPLE)
public class DemoTaskOne {

    @Run
    public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {log.info("DemoTaskOne is running at: {}", DateUtils.format(System.currentTimeMillis()));
        return RandomUtils.randomLong(1000000L, 1000000000L);
    }

    @OnSuccess
    public void onSuccess(JobKey jobKey, Object result, Logger log) {log.info("DemoTaskOne return value is: {}", result);
    }

    @OnFailure
    public void onFailure(JobKey jobKey, Throwable e, Logger log) {log.error("DemoTaskOne is failed by cause: {}", e.getMessage(), e);
    }

}
DemoTaskTwo:
@ChacJob
@ChacTrigger(triggerType = TriggerType.SIMPLE)
public class DemoTaskTwo {

    @Run
    public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {log.info("DemoTaskTwo is running at: {}", DateUtils.format(System.currentTimeMillis()));
        return RandomUtils.randomLong(1000000L, 1000000000L);
    }

    @OnSuccess
    public void onSuccess(JobKey jobKey, Object result, Logger log) {log.info("DemoTaskTwo return value is: {}", result);
    }

    @OnFailure
    public void onFailure(JobKey jobKey, Throwable e, Logger log) {log.error("DemoTaskTwo is failed by cause: {}", e.getMessage(), e);
    }
    
}
DemoTask:
@ChacJob
@ChacTrigger(cron = "0 0/1 * * * ?", triggerType = TriggerType.CRON)
@ChacFork({@ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoTaskOne", name = "demoTaskOne"),
        @ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoTaskTwo", name = "demoTaskTwo") })
public class DemoTask {

    @Run
    public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {log.info("DemoTask is running at: {}", DateUtils.format(System.currentTimeMillis()));
        TaskJoinResult joinResult = (TaskJoinResult) attachment;
        TaskForkResult[] forkResults = joinResult.getTaskForkResults();
        long max = 0;
        for (TaskForkResult forkResult : forkResults) {max = Long.max(max, (Long) forkResult.getResult());
        }
        return max;
    }

    @OnSuccess
    public void onSuccess(JobKey jobKey, Object result, Logger log) {log.info("DemoTask return max value is: {}", result);
    }

    @OnFailure
    public void onFailure(JobKey jobKey, Throwable e, Logger log) {log.error("DemoTask is failed by cause: {}", e.getMessage(), e);
    }

}
DAG 工作示例

DAG 工作目前只反对 API 创立, 后续会继续改良,减少界面形式创立 DAG 工作

@RequestMapping("/dag")
@RestController
public class DagJobController {@Value("${spring.application.cluster.name}")
    private String clusterName;

    @Value("${spring.application.name}")
    private String applicationName;

    @Autowired
    private JobManager jobManager;

    @GetMapping("/create")
    public Map<String, Object> createDagTask() throws Exception {Dag dag = new Dag(clusterName, applicationName, "testDag");// 创立一个 Dag 工作,并指定集群名,利用名,和工作名称
        dag.setTrigger(new CronTrigger("0 0/1 * * * ?"));// 设置 Cron 表达式
        dag.setDescription("This is only a demo of dag job");// 工作形容
        DagFlow first = dag.startWith(clusterName, applicationName, "demoDagStart", DemoDagStart.class.getName());// 定义第一个节点
        DagFlow second = first.flow(clusterName, applicationName, "demoDag", DemoDag.class.getName());// 定义第二个节点
                // 第二个节点 fork 两个子过程解决
        second.fork(clusterName, applicationName, "demoDagOne", DemoDagOne.class.getName());
        second.fork(clusterName, applicationName, "demoDagTwo", DemoDagTwo.class.getName());
        jobManager.persistJob(dag, "123");
        return Collections.singletonMap("ok", 1);
    }

}

下面的 DAG 示例阐明一下,chaconne 框架提供的 DAG 模型反对串行流入,即 flow 模式,也提供了 fork 模式进行并行处理,上例中,工作 demoDag fork 了两个子过程(“demoDagOne”和“demoDagTwo”),即 demoDagOne 和 demoDagTwo 同时解决完了再触发 demoDag 工作。

Chaconne 部署阐明

chaconne 除了依靠 SpringBoot 框架外,默认用 MySQL 存储工作信息 (目前仅反对 MySQL,后续会反对更多类型的数据库), 用 Redis 保留集群元数据和进行音讯播送
所以无论应用哪种部署形式,你都须要在你的利用中设置 DataSource 和 RedisConnectionFactory
示例代码:

@Slf4j
@Configuration
public class ResourceConfig {

    @Setter
    @Configuration(proxyBeanMethods = false)
    @ConfigurationProperties(prefix = "spring.datasource")
    public static class DataSourceConfig {

        private String jdbcUrl;
        private String username;
        private String password;
        private String driverClassName;

        private HikariConfig getDbConfig() {if (log.isTraceEnabled()) {log.trace("DataSourceConfig JdbcUrl:" + jdbcUrl);
                log.trace("DataSourceConfig Username:" + username);
                log.trace("DataSourceConfig Password:" + password);
                log.trace("DataSourceConfig DriverClassName:" + driverClassName);
            }
            final HikariConfig config = new HikariConfig();
            config.setDriverClassName(driverClassName);
            config.setJdbcUrl(jdbcUrl);
            config.setUsername(username);
            config.setPassword(password);
            config.setMinimumIdle(5);
            config.setMaximumPoolSize(50);
            config.setMaxLifetime(60 * 1000);
            config.setIdleTimeout(60 * 1000);
            config.setValidationTimeout(3000);
            config.setReadOnly(false);
            config.setConnectionInitSql("SELECT UUID()");
            config.setConnectionTestQuery("SELECT 1");
            config.setConnectionTimeout(60 * 1000);
            config.setTransactionIsolation("TRANSACTION_READ_COMMITTED");

            config.addDataSourceProperty("cachePrepStmts", "true");
            config.addDataSourceProperty("prepStmtCacheSize", "250");
            config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
            return config;
        }

        @Primary
        @Bean
        public DataSource dataSource() {return new HikariDataSource(getDbConfig());
        }

    }

    @Setter
    @Configuration(proxyBeanMethods = false)
    @ConfigurationProperties(prefix = "spring.redis")
    public static class RedisConfig {

        private String host;
        private String password;
        private int port;
        private int dbIndex;

        @Bean
        @ConditionalOnMissingBean(RedisConnectionFactory.class)
        public RedisConnectionFactory redisConnectionFactory() {RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
            redisStandaloneConfiguration.setHostName(host);
            redisStandaloneConfiguration.setPort(port);
            redisStandaloneConfiguration.setDatabase(dbIndex);
            redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
            JedisClientConfiguration.JedisClientConfigurationBuilder jedisClientConfiguration = JedisClientConfiguration.builder();
            jedisClientConfiguration.connectTimeout(Duration.ofMillis(60000)).readTimeout(Duration.ofMillis(60000)).usePooling()
                    .poolConfig(jedisPoolConfig());
            JedisConnectionFactory factory = new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration.build());
            return factory;
        }

        @Bean
        public JedisPoolConfig jedisPoolConfig() {JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMinIdle(1);
            jedisPoolConfig.setMaxIdle(10);
            jedisPoolConfig.setMaxTotal(200);
            jedisPoolConfig.setMaxWaitMillis(-1);
            jedisPoolConfig.setTestWhileIdle(true);
            return jedisPoolConfig;
        }

    }

}
Chaconne 去中心化部署

在你的 Spring 应用程序的主函数上加上 @EnableChaconneEmbeddedMode 注解,而后启动
示例代码:

@EnableChaconneEmbeddedMode
@SpringBootApplication
@ComponentScan
public class YourApplicationMain {public static void main(String[] args) {
        final int port = 8088;
        System.setProperty("server.port", String.valueOf(port));
        SpringApplication.run(YourApplicationMain.class, args);
    }

}
Chaconne 中心化部署
  1. 启动调度核心,这须要你新建一个 SpringBoot 我的项目,在主函数上加上 @EnableChaconneDetachedMode 注解,并指定为生产端
    示例代码:

    @EnableChaconneDetachedMode(DetachedMode.PRODUCER)
    @SpringBootApplication
    public class ChaconneManagementMain {public static void main(String[] args) {SpringApplication.run(ChaconneManagementMain.class, args);
     }
    }
    别忘了配置 DataSource 和 RedisConnectionFactory
  2. 在你的 Spring 应用程序的主函数上加上 @EnableChaconneDetachedMode 注解(默认为生产端),而后启动

    @EnableChaconneDetachedMode
    @SpringBootApplication
    @ComponentScan
    public class YourApplicationMain {public static void main(String[] args) {
         final int port = 8088;
         System.setProperty("server.port", String.valueOf(port));
         SpringApplication.run(YourApplicationMain.class, args);
     }
    
    }

Chaconne Console 应用阐明

Chaconne Console 是 chaconne 框架提供的工作治理和查看的 Web 我的项目,它也反对去中心化部署和中心化部署模式,默认端口 6140
提供了如下性能:

  1. 保留工作和查看工作信息
  2. 暂停和持续工作
  3. 删除工作
  4. 手动运行工作
  5. 查看工作统计(按天)
  6. 查看工作运行时日志

目前 Chaconne Console 我的项目还在一直的保护中,有些性能略显毛糙(工作 JSON 编辑器),有些性能暂未凋谢。
同样,Chaconne Console 也是一个 SpringBoot 的工程
源码:

@EnableChaconneClientMode
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class})
public class ChaconneConsoleMain {

    static {System.setProperty("spring.devtools.restart.enabled", "false");
        File logDir = FileUtils.getFile(FileUtils.getUserDirectory(), "logs", "indi", "atlantis", "framework", "chaconne", "console");
        if (!logDir.exists()) {logDir.mkdirs();
        }
        System.setProperty("DEFAULT_LOG_BASE", logDir.getAbsolutePath());
    }

    public static void main(String[] args) {SpringApplication.run(ChaconneConsoleMain.class, args);
        System.out.println(Env.getPid());
    }

}

注解 @EnableChaconneClientMode 示意启用一个工作治理客户端
启动后,输出首页地址:http://localhost:6140/chaconn…
首先进入的 overview 页面:

工作列表:

创立工作:

点击 ShowJson 能够展现 Json 格局的数据:

工作详情:

工作日志:

工作统计:

可查看每一个工作的统计(按天)

最初,附上分布式任务调度零碎 chaconne 的源码地址:https://github.com/paganini20…
有趣味的敌人能够钻研一下它的源码

正文完
 0