关于java:Java-定时任务技术趋势

48次阅读

共计 9060 个字符,预计需要花费 23 分钟才能阅读完成。

作者:黄晓萌(学仁)

定时工作是每个业务常见的需要,比方每分钟扫描超时领取的订单,每小时清理一次数据库历史数据,每天统计前一天的数据并生成报表等等。

Java 中自带的解决方案

应用 Timer

创立 java.util.TimerTask 工作,在 run 办法中实现业务逻辑。通过 java.util.Timer 进行调度,反对依照固定频率执行。所有的 TimerTask 是在同一个线程中串行执行,相互影响。也就是说,对于同一个 Timer 里的多个 TimerTask 工作,如果一个 TimerTask 工作在执行中,其它 TimerTask 即便达到执行的工夫,也只能排队期待。如果有异样产生,线程将退出,整个定时工作就失败。

import java.util.Timer;
import java.util.TimerTask;
public class TestTimerTask {public static void main(String[] args) {TimerTask timerTask = new TimerTask() {
            @Override
            public void run() {System.out.println("hell world");
            }
        };
        Timer timer = new Timer();
        timer.schedule(timerTask, 10, 3000);
    }  
}

应用 ScheduledExecutorService

基于线程池设计的定时工作解决方案,每个调度工作都会调配到线程池中的一个线程去执行,解决 Timer 定时器无奈并发执行的问题,反对 fixedRate 和 fixedDelay。

import java.util.Timer;
import java.util.TimerTask;
public class TestTimerTask {public static void main(String[] args) {TimerTask timerTask = new TimerTask() {
            @Override
            public void run() {System.out.println("hell world");
            }
        };
        Timer timer = new Timer();
        timer.schedule(timerTask, 10, 3000);
    }  
}

Spring 中自带的解决方案

Springboot 中提供了一套轻量级的定时工作工具 Spring Task,通过注解能够很不便的配置,反对 cron 表达式、fixedRate、fixedDelay。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TestTimerTask {public static void main(String[] args) {ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
        // 依照固定频率执行,每隔 5 秒跑一次
        ses.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {System.out.println("hello fixedRate");
            }
        }, 0, 5, TimeUnit.SECONDS);
        // 依照固定延时执行,上次执行完后隔 3 秒再跑
        ses.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {System.out.println("hello fixedDelay");
            }
        }, 0, 3, TimeUnit.SECONDS);
    }
}

Spring Task 绝对于下面提到的两种解决方案,最大的劣势就是反对 cron 表达式,能够解决依照规范工夫固定周期执行的业务,比方每天几点几分执行。

业务幂等解决方案

当初的利用根本都是分布式部署,所有机器的代码都是一样的,后面介绍的 Java 和 Spring 自带的解决方案,都是过程级别的,每台机器在同一时间点都会执行定时工作。这样会导致须要业务幂等的定时工作业务有问题,比方每月定时给用户推送音讯,就会推送屡次。

于是,很多利用很天然的就想到了应用分布式锁的解决方案。即每次定时工作执行之前,先去抢锁,抢到锁的执行工作,抢不到锁的不执行。怎么抢锁,又是形形色色,比方应用 DB、zookeeper、redis。

## 应用 DB 或者 Zookeeper 抢锁

应用 DB 或者 Zookeeper 抢锁的架构差不多,原理如下:

  1. 定时工夫到了,在回调办法里,先去抢锁。
  2. 抢到锁,则继续执行办法,没抢到锁间接返回。
  3. 执行完办法后,开释锁。

示例代码如下:

import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@EnableScheduling
public class MyTask {
    /**
     * 每分钟的第 30 秒跑一次
     */
    @Scheduled(cron = "30 * * * * ?")
    public void task1() throws InterruptedException {System.out.println("hello cron");
    }
    /**
     * 每隔 5 秒跑一次
     */
    @Scheduled(fixedRate = 5000)
    public void task2() throws InterruptedException {System.out.println("hello fixedRate");
    }
    /**
     * 上次跑完隔 3 秒再跑
     */
    @Scheduled(fixedDelay = 3000)
    public void task3() throws InterruptedException {System.out.println("hello fixedDelay");
    }
}

以后的这个设计,认真一点的同学能够发现,其实还是有可能导致工作反复执行的。比方工作执行的十分快,A 这台机器抢到锁,执行完工作后很快就开释锁了。B 这台机器后抢锁,还是会抢到锁,再执行一遍工作。

应用 redis 抢锁

应用 redis 抢锁,其实架构上和 DB/zookeeper 差不多,不过 redis 抢锁反对过期工夫,不必被动去开释锁,并且能够充分利用这个过期工夫,解决工作执行过快开释锁导致工作反复执行的问题,架构如下:

示例代码如下:

@Component
@EnableScheduling
public class MyTask {
    /**
     * 每分钟的第 30 秒跑一次
     */
    @Scheduled(cron = "30 * * * * ?")
    public void task1() throws InterruptedException {
        String lockName = "task1";
        if (tryLock(lockName, 30)) {System.out.println("hello cron");
            releaseLock(lockName);
        } else {return;}
    }
    private boolean tryLock(String lockName, long expiredTime) {
        //TODO
        return true;
    }
    private void releaseLock(String lockName) {//TODO}
}

看到这里,可能又会有同学有问题,加一个过期工夫是不是还是不够谨严,还是有可能工作反复执行?

——确实是的,如果有一台机器忽然长时间的 fullgc,或者之前的工作还没解决完(Spring Task 和 ScheduledExecutorService 实质还是通过线程池解决工作),还是有可能隔了 30 秒再去调度工作的。

应用 Quartz

Quartz[1] 是一套轻量级的任务调度框架,只须要定义了 Job(工作),Trigger(触发器)和 Scheduler(调度器),即可实现一个定时调度能力。反对基于数据库的集群模式,能够做到工作幂等执行。

Quartz 反对工作幂等执行,其实实践上还是抢 DB 锁,咱们看下 quartz 的表构造:

其中,QRTZ_LOCKS 就是 Quartz 集群实现同步机制的行锁表,其表构造如下:

--QRTZ_LOCKS 表构造
CREATE TABLE `QRTZ_LOCKS` (`LOCK_NAME` varchar(40) NOT NULL,
  PRIMARY KEY (`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--QRTZ_LOCKS 记录
+-----------------+ 
| LOCK_NAME       |
+-----------------+ 
| CALENDAR_ACCESS |
| JOB_ACCESS      |
| MISFIRE_ACCESS  |
| STATE_ACCESS    |
| TRIGGER_ACCESS  |
+-----------------+

能够看出 QRTZ_LOCKS 中有 5 条记录,代表 5 把锁,别离用于实现多个 Quartz Node 对 Job、Trigger、Calendar 拜访的同步控制。

开源任务调度中间件

下面提到的解决方案,在架构上都有一个问题,那就是每次调度都须要抢锁,特地是应用 DB 和 Zookeeper 抢锁,性能会比拟差,一旦任务量减少到肯定的量,就会有比拟显著的调度延时。还有一个痛点,就是业务想要批改调度配置,或者减少一个工作,得批改代码从新公布利用。

于是开源社区涌现了一堆任务调度中间件,通过任务调度零碎进行工作的创立、批改和调度,这其中国内最火的就是 XXL-JOB 和 ElasticJob。

### ElasticJob

ElasticJob[2] 是一款基于 Quartz 开发,依赖 Zookeeper 作为注册核心、轻量级、无中心化的分布式任务调度框架,目前曾经通过 Apache 开源。

ElasticJob 绝对于 Quartz 来说,从性能上最大的区别就是反对分片,能够将一个工作分片参数分发给不同的机器执行。架构上最大的区别就是应用 Zookeeper 作为注册核心,不同的任务分配给不同的节点调度,不须要抢锁触发,性能上比 Quartz 上弱小很多,架构图如下:

开发上也比较简单,和 springboot 联合比拟好,能够在配置文件定义工作如下:

elasticjob:
  regCenter:
    serverLists: localhost:2181
    namespace: elasticjob-lite-springboot
  jobs:
    simpleJob:
      elasticJobClass: org.apache.shardingsphere.elasticjob.lite.example.job.SpringBootSimpleJob
      cron: 0/5 * * * * ?
      timeZone: GMT+08:00
      shardingTotalCount: 3
      shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
    scriptJob:
      elasticJobType: SCRIPT
      cron: 0/10 * * * * ?
      shardingTotalCount: 3
      props:
        script.command.line: "echo SCRIPT Job:"
    manualScriptJob:
      elasticJobType: SCRIPT
      jobBootstrapBeanName: manualScriptJobBean
      shardingTotalCount: 9
      props:
        script.command.line: "echo Manual SCRIPT Job:"

实现工作接口如下:

@Component
public class SpringBootShardingJob implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {System.out.println("分片总数 ="+context.getShardingTotalCount() + ", 分片号 ="+context.getShardingItem()
            + ", 分片参数 ="+context.getShardingParameter());
    }

运行后果如下:

 分片总数 =3, 分片号 =0, 分片参数 =Beijing
分片总数 =3, 分片号 =1, 分片参数 =Shanghai
分片总数 =3, 分片号 =2, 分片参数 =Guangzhou

同时,ElasticJob 还提供了一个简略的 UI,能够查看工作的列表,同时反对批改、触发、进行、失效、生效操作。

遗憾的是,ElasticJob 暂不反对动态创建工作。

XXL-JOB

XXL-JOB[3] 是一个开箱即用的轻量级分布式任务调度零碎,其外围设计指标是开发迅速、学习简略、轻量级、易扩大,在开源社区宽泛风行。

XXL-JOB 是 Master-Slave 架构,Master 负责工作的调度,Slave 负责工作的执行,架构图如下:

XXL-JOB 接入也很不便,不同于 ElasticJob 定义工作实现类,是通过 @XxlJob 注解定义 JobHandler。

@Component
public class SampleXxlJob {private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
    /**
     * 1、简略工作示例(Bean 模式)*/
    @XxlJob("demoJobHandler")
    public ReturnT<String> demoJobHandler(String param) throws Exception {XxlJobLogger.log("XXL-JOB, Hello World.");
        for (int i = 0; i < 5; i++) {XxlJobLogger.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        }
        return ReturnT.SUCCESS;
    }
    /**
     * 2、分片播送工作
     */
    @XxlJob("shardingJobHandler")
    public ReturnT<String> shardingJobHandler(String param) throws Exception {
        // 分片参数
        ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
        XxlJobLogger.log("分片参数:以后分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
        // 业务逻辑
        for (int i = 0; i < shardingVO.getTotal(); i++) {if (i == shardingVO.getIndex()) {XxlJobLogger.log("第 {} 片, 命中分片开始解决", i);
            } else {XxlJobLogger.log("第 {} 片, 疏忽", i);
            }
        }
        return ReturnT.SUCCESS;
    }
}

XXL-JOB 相较于 ElasticJob,最大的特点就是性能比拟丰盛,可运维能力比拟强,岂但反对控制台动态创建工作,还有调度日志、运行报表等性能。

XXL-JOB 的历史记录、运行报表和调度日志,都是基于数据库实现的:

由此能够看出,XXL-JOB 所有性能都依赖数据库,且调度核心不反对分布式架构,在任务量和调度量比拟大的状况下,会有性能瓶颈。不过如果对工作量级、高可用、监控报警、可视化等没有过高要求的话,XXL-JOB 根本能够满足定时工作的需要。

企业级解决方案

开源软件只能提供根底的调度能力,在监管控上的能力个别都比拟弱。比方日志服务,业界往往应用 ELK 解决方案;短信报警,须要有短信平台;监控大盘,当初支流的解决方案是 Prometheus;等等。企业想要有这些能力,岂但须要额定的开发成本,还须要低廉的资源老本。

另外应用开源软件也随同着稳定性的危险,就是出了问题没人能解决,想要反馈到社区等社区解决,这个链路太长了,早就产生故障了。

阿里云任务调度 SchedulerX[4] 是阿里巴巴自研的基于 Akka 架构的一站式任务调度平台,兼容开源 XXL-JOB、ElasticJob、Quartz(布局中),反对 Cron 定时、一次性工作、工作编排、分布式跑批,具备高可用、可视化、可运维、低延时等能力,自带企业级监控大盘、日志服务、短信报警等服务。

劣势

平安防护

• 多层次平安防护:反对 HTTPS 和 VPC 拜访,同时还有阿里云的多层平安防护,避免歹意攻打。
• 多租户隔离机制:反对多地区、命名空间和利用级别的隔离。
• 权限管控:反对控制台读写的权限治理,客户端接入的鉴权。

企业级高可用
SchedulerX2.0 采纳高可用架构,工作多备份机制,经验阿里团体多年双十一、容灾演练,能够做到任意一个机房挂了,任务调度都不会收到影响。

商业级报警运维
• 报警:反对邮件、钉钉、短信、电话,(其余报警形式在布局中)。反对工作失败、超时、无可用机器报警。报警内容能够间接看出工作失败的起因,以钉钉机器人为例。

• 运维操作:原地重跑、重刷数据、标记胜利、查看堆栈、进行工作、指定机器等。

丰盛的可视化
schedulerx 领有丰盛的可视化能力,比方:

• 用户大盘

• 查看工作历史执行记录

• 查看工作运行日志

• 查看工作运行堆栈

• 查看工作操作记录

兼容开源
Schedulerx 兼容开源 XXL-JOB、ElasticJob、Quartz(布局中),业务不须要改一行代码,即能够将工作托管在 SchedulerX 调度平台,享有企业级可视化和报警的能力。

Spring 原生
SchedulerX 反对通过控制台和 API 动态创建工作,也反对 Spring 申明式工作定义,一份工作配置能够拿到任何环境一键启动,配置如下:

spring:
   schedulerx2:
      endpoint: acm.aliyun.com   #请填写不同 regin 的 endpoint
      namespace: 433d8b23-06e9-xxxx-xxxx-90d4d1b9a4af #region 内全局惟一,倡议应用 UUID 生成
      namespaceName: 学仁测试
      appName: myTest
      groupId: myTest.group      #同一个命名空间下须要惟一
      appKey: myTest123@alibaba  #利用的 key,不要太简略,留神保存好
      regionId: public           #填写对应的 regionId
      aliyunAccessKey: xxxxxxx   #阿里云账号的 ak
      aliyunSecretKey: xxxxxxx   #阿里云账号的 sk
      alarmChannel: sms,ding     #报警通道:短信和钉钉
      jobs: 
         simpleJob: 
            jobModel: standalone
            className: com.aliyun.schedulerx.example.processor.SimpleJob
            cron: 0/30 * * * * ?   # cron 表达式
            jobParameter: hello
            overwrite: true 
         shardingJob: 
            jobModel: sharding
            className: ccom.aliyun.schedulerx.example.processor.ShardingJob
            oneTime: 2022-06-02 12:00:00   # 一次性工作表达式
            jobParameter: 0=Beijing,1=Shanghai,2=Guangzhou
            overwrite: true
         broadcastJob:   # 不填写 cron 和 oneTime,示意 api 工作
            jobModel: broadcast
            className: com.aliyun.schedulerx.example.processor.BroadcastJob
            jobParameter: hello
            overwrite: true
         mapReduceJob: 
            jobModel: mapreduce
            className: com.aliyun.schedulerx.example.processor.MapReduceJob
            cron: 0 * * * * ?
            jobParameter: 100
            overwrite: true
      alarmUsers:     #报警联系人
         user1:
            userName: 张三
            userPhone: 12345678900
         user2:
            userName: 李四
            ding: https://oapi.dingtalk.com/robot/send?access_token=xxxxx

分布式跑批

SchedulerX 提供了丰盛的分布式模型,能够解决各种各样的分布式业务场景。包含单机、播送、分片、MapReduce[5] 等,架构如下:

SchedulerX 的 MapReduce 模型,简略几行代码,就能够将海量工作分布式到多台机器跑批,绝对于大数据跑批来说,具备速度快、数据安全、成本低、简略易学等特点。

工作编排

SchedulerX 通过工作流进行工作编排,并且提供了一个可视化的界面,操作简略,拖拖拽拽即可配置一个工作流。具体的工作状态图能高深莫测看到上游工作为什么没跑,不便定位问题。

可抢占的工作优先级队列

常见场景是夜间离线报表业务,比方很多报表工作是早晨 1、2 点开始跑,要管制利用最大并发的工作数量(否则业务扛不住),达到并发下限的工作会在队列中期待。同时要求早上 9 点前必须把 KPI 报表跑进去,能够设置 KPI 工作高优先级,会抢占低优先级工作优先调度。

SchedulerX 反对可抢占的工作优先级队列,能够在控制台动静配置:

Q&A

  1. Kubernetes 利用能够接入 SchedulerX 吗?
    ——能够的,无论是物理机、容器、还是 Kubernetes pod,都能够接入 SchedulerX。
  2. 我的利用不在阿里云上,可否应用 SchedulerX?
    ——能够的,任何云平台或者本地机器,只有能拜访公网,都能够接入 SchedulerX。

正文完
 0