chaconne(音译夏空),是一款用Java写的,基于SpringBoot框架的轻量级的分布式任务调度框架。引入chaconne相干组件,能够十分快捷地帮忙你搭建一个分布式工作集群
chaconne 个性列表
- 完满反对spring-boot框架(2.2.0+)
- 反对多种形式设置的定时工作(Cron表达式,参数设置等)
- 反对动静保留和删除工作
- 反对注解配置定时工作
- 反对两种集群模式(主备模式和负载平衡模式)
- 内置多种负载平衡算法,反对自定义负载平衡算法
- 反对失败重试和失败转移
- 反对日志追踪
- 反对工作参数分片解决
- 反对工作依赖(串行依赖和并行依赖)
- 反对DAG模仿工作流
- 反对工作自定义终止策略
- 反对工作超时冷却和重置
- 反对邮件告警
chaconne两种集群部署模式:
- 去中心化部署模式
没有固定的调度核心节点,集群会选举其中一个利用作为Leader, 进行工作指挥调度 - 中心化部署模式
分为调度核心和工作执行节点两个角色,且调度核心和工作执行节点都反对集群模式
阐明:
这里的集群是指参加工作执行的利用所组成的集群(chaconne集群),它和基于SpringCloud框架组成的集群是两个独立的概念
如果chaconne集群规模较小,举荐应用去中心化部署模式,若该集群规模较大,根据理论状况,两者模式都能够应用。
chaconne框架由两局部组成:
- chaconne-spring-boot-starter
外围jar包,蕴含了chaconne全副外围性能 - chaconne-console
Chaconne Web治理界面,进行工作治理和查看工作运行状态
Maven:
<dependency>
<groupId>com.github.paganini2008.atlantis</groupId>
<artifactId>chaconne-spring-boot-starter</artifactId>
<version>1.0-RC1</version>
</dependency>
兼容性:
Jdk 1.8+
chaconne实现原理简述
chaconne的底层是依赖tridenter-spring-boot-starter组件来实现工作集群模式的(主备模式和负载平衡模式),利用音讯单播机制(通过Redis PubSub模仿)来实现工作散发和负载平衡,分片解决等高级个性。须要指出的是,chaconne框架中对于集群的定义和tridenter对于集群的定义是统一的,对于集群的概念,等同于用来区别不同的产品组或公司,同时chaconne也反对工作组的概念,它是可选配置,默认状况下,组名就是以后利用名称(${spring.application.name}),即当起了多个雷同利用名的利用,那这些利用就成为了一个工作组。chaconne不仅反对跨组的工作调用,更反对跨集群的工作调用。
如何定义工作?
- 应用注解@ChacJob
- 继承ManagedJob类
- 实现Job接口
- 实现NotManagedJob接口
阐明:
前3种定义工作的形式属于编程式定义工作,即通过代码形式定义一个工作,通过Spring框架启动并主动加载
第4种定义的形式用来定义动静工作,即用户在Web界面上(Chaconne-Console)提交来创立工作或间接通过调用HTTP API创立工作,不同的是,该工作对象不属于Spring上下文托管的Bean。
示例代码:
注解形式创立工作
@ChacJob
@ChacTrigger(cron = "*/5 * * * * ?")
public class DemoCronJob {
@Run
public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
log.info("DemoCronJob is running at: {}", DateUtils.format(System.currentTimeMillis()));
return RandomUtils.randomLong(1000000L, 1000000000L);
}
@OnSuccess
public void onSuccess(JobKey jobKey, Object result, Logger log) {
log.info("DemoCronJob's return value is: {}", result);
}
@OnFailure
public void onFailure(JobKey jobKey, Throwable e, Logger log) {
log.error("DemoCronJob is failed by cause: {}", e.getMessage(), e);
}
}
接口方式创立工作
@Component
public class HealthCheckJob extends ManagedJob {
@Override
public Object execute(JobKey jobKey, Object arg, Logger log) {
log.info(info());
return UUID.randomUUID().toString();
}
@Override
public Trigger getTrigger() {
return GenericTrigger.Builder.newTrigger("*/5 * * * * ?").setStartDate(DateUtils.addSeconds(new Date(), 30)).build();
}
private String info() {
long totalMemory = Runtime.getRuntime().totalMemory();
long usedMemory = totalMemory - Runtime.getRuntime().freeMemory();
return FileUtils.formatSize(usedMemory) + "/" + FileUtils.formatSize(totalMemory);
}
@Override
public long getTimeout() {
return 60L * 1000;
}
}
动静工作:
public class EtlJob implements NotManagedJob {
@Override
public Object execute(JobKey jobKey, Object attachment, Logger log) {
log.info("JobKey:{}, Parameter: {}", jobKey, attachment);
return null;
}
}
工作依赖
工作依赖是chaconne框架的重要个性之一,工作依赖分为串行依赖和并行依赖,
所谓串行依赖是指工作A做完接着执行工作B, 即工作B依赖工作A。
并行依赖是指,比方有3个工作,别离为工作A, 工作B, 工作C, 工作A和工作B都做完能力执行工作C, 相似会签的业务场景。
串行依赖和并行依赖都能够共享工作上下文参数和运行后果,并且反对自定义判断策略来决定要不要触发上游工作。
DAG(有向无环图)
而在联合串行依赖和并行依赖的根底上,chaconne框架又提供了DAG性能并提供了敌对的API,来模仿相似工作流的业务场景,更加丰盛了工作依赖的应用场景。
(这里为了不便举例,都通过注解的形式配置工作)
串行依赖示例:
@ChacJob
@ChacTrigger(triggerType = TriggerType.DEPENDENT)
@ChacDependency({ @ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoSchedJob", name = "demoSchedJob") })
public class DemoDependentJob {
@Run
public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
log.info("DemoDependentJob is running at: {}", DateUtils.format(System.currentTimeMillis()));
return RandomUtils.randomLong(1000000L, 1000000000L);
}
@OnSuccess
public void onSuccess(JobKey jobKey, Object result, Logger log) {
log.info("DemoDependentJob's return value is: {}", result);
}
@OnFailure
public void onFailure(JobKey jobKey, Throwable e, Logger log) {
log.error("DemoDependentJob is failed by cause: {}", e.getMessage(), e);
}
}
并行依赖示例:
有3个工作,DemoTask, DemoTaskOne, DemoTaskTwo
让DemoTaskOne, DemoTaskTwo都做完再执行DemoTask,且DemoTask能够取得DemoTaskOne, DemoTaskTwo执行后的值
DemoTaskOne:
@ChacJob
@ChacTrigger(triggerType = TriggerType.SIMPLE)
public class DemoTaskOne {
@Run
public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
log.info("DemoTaskOne is running at: {}", DateUtils.format(System.currentTimeMillis()));
return RandomUtils.randomLong(1000000L, 1000000000L);
}
@OnSuccess
public void onSuccess(JobKey jobKey, Object result, Logger log) {
log.info("DemoTaskOne return value is: {}", result);
}
@OnFailure
public void onFailure(JobKey jobKey, Throwable e, Logger log) {
log.error("DemoTaskOne is failed by cause: {}", e.getMessage(), e);
}
}
DemoTaskTwo:
@ChacJob
@ChacTrigger(triggerType = TriggerType.SIMPLE)
public class DemoTaskTwo {
@Run
public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
log.info("DemoTaskTwo is running at: {}", DateUtils.format(System.currentTimeMillis()));
return RandomUtils.randomLong(1000000L, 1000000000L);
}
@OnSuccess
public void onSuccess(JobKey jobKey, Object result, Logger log) {
log.info("DemoTaskTwo return value is: {}", result);
}
@OnFailure
public void onFailure(JobKey jobKey, Throwable e, Logger log) {
log.error("DemoTaskTwo is failed by cause: {}", e.getMessage(), e);
}
}
DemoTask:
@ChacJob
@ChacTrigger(cron = "0 0/1 * * * ?", triggerType = TriggerType.CRON)
@ChacFork({ @ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoTaskOne", name = "demoTaskOne"),
@ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoTaskTwo", name = "demoTaskTwo") })
public class DemoTask {
@Run
public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
log.info("DemoTask is running at: {}", DateUtils.format(System.currentTimeMillis()));
TaskJoinResult joinResult = (TaskJoinResult) attachment;
TaskForkResult[] forkResults = joinResult.getTaskForkResults();
long max = 0;
for (TaskForkResult forkResult : forkResults) {
max = Long.max(max, (Long) forkResult.getResult());
}
return max;
}
@OnSuccess
public void onSuccess(JobKey jobKey, Object result, Logger log) {
log.info("DemoTask return max value is: {}", result);
}
@OnFailure
public void onFailure(JobKey jobKey, Throwable e, Logger log) {
log.error("DemoTask is failed by cause: {}", e.getMessage(), e);
}
}
DAG工作示例
DAG工作目前只反对API创立, 后续会继续改良,减少界面形式创立DAG工作
@RequestMapping("/dag")
@RestController
public class DagJobController {
@Value("${spring.application.cluster.name}")
private String clusterName;
@Value("${spring.application.name}")
private String applicationName;
@Autowired
private JobManager jobManager;
@GetMapping("/create")
public Map<String, Object> createDagTask() throws Exception {
Dag dag = new Dag(clusterName, applicationName, "testDag");// 创立一个Dag工作,并指定集群名,利用名,和工作名称
dag.setTrigger(new CronTrigger("0 0/1 * * * ?"));// 设置Cron表达式
dag.setDescription("This is only a demo of dag job");// 工作形容
DagFlow first = dag.startWith(clusterName, applicationName, "demoDagStart", DemoDagStart.class.getName());// 定义第一个节点
DagFlow second = first.flow(clusterName, applicationName, "demoDag", DemoDag.class.getName());// 定义第二个节点
// 第二个节点fork两个子过程解决
second.fork(clusterName, applicationName, "demoDagOne", DemoDagOne.class.getName());
second.fork(clusterName, applicationName, "demoDagTwo", DemoDagTwo.class.getName());
jobManager.persistJob(dag, "123");
return Collections.singletonMap("ok", 1);
}
}
下面的DAG示例阐明一下,chaconne框架提供的DAG模型反对串行流入,即flow模式,也提供了fork模式进行并行处理,上例中,工作demoDag fork了两个子过程(“demoDagOne”和“demoDagTwo”),即demoDagOne和demoDagTwo同时解决完了再触发demoDag工作。
Chaconne部署阐明
chaconne除了依靠SpringBoot框架外,默认用MySQL存储工作信息(目前仅反对MySQL,后续会反对更多类型的数据库), 用Redis保留集群元数据和进行音讯播送
所以无论应用哪种部署形式,你都须要在你的利用中设置DataSource和RedisConnectionFactory
示例代码:
@Slf4j
@Configuration
public class ResourceConfig {
@Setter
@Configuration(proxyBeanMethods = false)
@ConfigurationProperties(prefix = "spring.datasource")
public static class DataSourceConfig {
private String jdbcUrl;
private String username;
private String password;
private String driverClassName;
private HikariConfig getDbConfig() {
if (log.isTraceEnabled()) {
log.trace("DataSourceConfig JdbcUrl: " + jdbcUrl);
log.trace("DataSourceConfig Username: " + username);
log.trace("DataSourceConfig Password: " + password);
log.trace("DataSourceConfig DriverClassName: " + driverClassName);
}
final HikariConfig config = new HikariConfig();
config.setDriverClassName(driverClassName);
config.setJdbcUrl(jdbcUrl);
config.setUsername(username);
config.setPassword(password);
config.setMinimumIdle(5);
config.setMaximumPoolSize(50);
config.setMaxLifetime(60 * 1000);
config.setIdleTimeout(60 * 1000);
config.setValidationTimeout(3000);
config.setReadOnly(false);
config.setConnectionInitSql("SELECT UUID()");
config.setConnectionTestQuery("SELECT 1");
config.setConnectionTimeout(60 * 1000);
config.setTransactionIsolation("TRANSACTION_READ_COMMITTED");
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
return config;
}
@Primary
@Bean
public DataSource dataSource() {
return new HikariDataSource(getDbConfig());
}
}
@Setter
@Configuration(proxyBeanMethods = false)
@ConfigurationProperties(prefix = "spring.redis")
public static class RedisConfig {
private String host;
private String password;
private int port;
private int dbIndex;
@Bean
@ConditionalOnMissingBean(RedisConnectionFactory.class)
public RedisConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName(host);
redisStandaloneConfiguration.setPort(port);
redisStandaloneConfiguration.setDatabase(dbIndex);
redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
JedisClientConfiguration.JedisClientConfigurationBuilder jedisClientConfiguration = JedisClientConfiguration.builder();
jedisClientConfiguration.connectTimeout(Duration.ofMillis(60000)).readTimeout(Duration.ofMillis(60000)).usePooling()
.poolConfig(jedisPoolConfig());
JedisConnectionFactory factory = new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration.build());
return factory;
}
@Bean
public JedisPoolConfig jedisPoolConfig() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMinIdle(1);
jedisPoolConfig.setMaxIdle(10);
jedisPoolConfig.setMaxTotal(200);
jedisPoolConfig.setMaxWaitMillis(-1);
jedisPoolConfig.setTestWhileIdle(true);
return jedisPoolConfig;
}
}
}
Chaconne去中心化部署
在你的Spring应用程序的主函数上加上@EnableChaconneEmbeddedMode注解,而后启动
示例代码:
@EnableChaconneEmbeddedMode
@SpringBootApplication
@ComponentScan
public class YourApplicationMain {
public static void main(String[] args) {
final int port = 8088;
System.setProperty("server.port", String.valueOf(port));
SpringApplication.run(YourApplicationMain.class, args);
}
}
Chaconne中心化部署
-
启动调度核心,这须要你新建一个SpringBoot我的项目,在主函数上加上@EnableChaconneDetachedMode注解,并指定为生产端
示例代码:@EnableChaconneDetachedMode(DetachedMode.PRODUCER) @SpringBootApplication public class ChaconneManagementMain { public static void main(String[] args) { SpringApplication.run(ChaconneManagementMain.class, args); } }
别忘了配置DataSource和RedisConnectionFactory
-
在你的Spring应用程序的主函数上加上@EnableChaconneDetachedMode注解(默认为生产端),而后启动
@EnableChaconneDetachedMode @SpringBootApplication @ComponentScan public class YourApplicationMain { public static void main(String[] args) { final int port = 8088; System.setProperty("server.port", String.valueOf(port)); SpringApplication.run(YourApplicationMain.class, args); } }
Chaconne Console应用阐明
Chaconne Console是chaconne框架提供的工作治理和查看的Web我的项目,它也反对去中心化部署和中心化部署模式,默认端口6140
提供了如下性能:
- 保留工作和查看工作信息
- 暂停和持续工作
- 删除工作
- 手动运行工作
- 查看工作统计(按天)
- 查看工作运行时日志
目前Chaconne Console我的项目还在一直的保护中,有些性能略显毛糙(工作JSON编辑器),有些性能暂未凋谢。
同样,Chaconne Console也是一个SpringBoot的工程
源码:
@EnableChaconneClientMode
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
public class ChaconneConsoleMain {
static {
System.setProperty("spring.devtools.restart.enabled", "false");
File logDir = FileUtils.getFile(FileUtils.getUserDirectory(), "logs", "indi", "atlantis", "framework", "chaconne", "console");
if (!logDir.exists()) {
logDir.mkdirs();
}
System.setProperty("DEFAULT_LOG_BASE", logDir.getAbsolutePath());
}
public static void main(String[] args) {
SpringApplication.run(ChaconneConsoleMain.class, args);
System.out.println(Env.getPid());
}
}
注解@EnableChaconneClientMode示意启用一个工作治理客户端
启动后,输出首页地址:http://localhost:6140/chaconn…
首先进入的overview页面:
工作列表:
创立工作:
点击ShowJson能够展现Json格局的数据:
工作详情:
工作日志:
工作统计:
可查看每一个工作的统计(按天)
最初,附上分布式任务调度零碎chaconne的源码地址:https://github.com/paganini20…
有趣味的敌人能够钻研一下它的源码
发表回复