关于java:动态创建与修改定时任务

39次阅读

共计 6277 个字符,预计需要花费 16 分钟才能阅读完成。

  最近遇到一个需要,须要可能依照特定的配置执行定时工作,而且定时工作须要在利用不重启的状况下动静增删改,Spring 提供的 @Scheduled 注解是硬编码模式,只能实现固定的定时工作,随后通过一番探索,依靠注明的 quartz 框架终于实现了该性能,上面来分享一下我的计划。

  • 首先引入 quartz maven 依赖:

    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <!-- 因为我的我的项目继承了 spring-boot-starter-parent,因而这里能够不必写版本号,会间接应用父 pom 文档中 dependencyManagement 的 quartz 版本号 -->
        <!-- <version>2.3.2</version> -->
    </dependency>
  • 我的定时工作配置是存储在 MySQL 数据库当中的,当程序启动时,初始化过程会的 init() 办法会读取一遍所有无效的定时工作配置,而后将其实例化为一个个对象,一个对象便代表了一个定时工作,我定义的类为 public class ScheduledClauseTriggerEngine implements ClauseTriggerEngine<Void>, Job, AutoCloseable,其中 ClauseTriggerEngine 为我自定义的接口,因为在我的项目中除定时触发外还有其余的工作触发形式,不再过多赘述,Job(org.quartz.Job)是 quartz 框架的一个接口,代表一个工作,在任务调度时,Jobexecute(JobExecutionContext context) 办法会被调用,用以执行工作内容。所有实例化后的 ScheduledClauseTriggerEngine 对象会被存在一个 Map<Integer, ScheduledClauseTriggerEngine> 容器中去,Key 为定时工作的 id,后续在定时工作增删改的时候,也会同步批改这个 Map 的内容。
  • 创立SchedulerFactoryBean(org.springframework.scheduling.quartz.SchedulerFactoryBean),并设置 JobFactory,因为我这里只应用到 SchedulerFactoryBean 一次,因而这一小段代码写在构造方法中的,若是全局应用,须要在 Spring 的 Configuration 类中专门定义一个 SchedulerFactoryBean 类型的 Bean(比拟标准的用法)。

    SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
            schedulerFactoryBean.setJobFactory((bundle, scheduler) ->
                    triggers.get(Integer.parseInt(bundle.getJobDetail().getKey().getName())));
            schedulerFactoryBean.afterPropertiesSet();
            this.scheduler = schedulerFactoryBean.getScheduler();
            this.scheduler.start();

    setJobFactory这一步比拟重要,默认的调度计划是通过反射实现的,即传入一个 Job 类型的 class,而后反射实例化一个此类的对象,再去调用 execute 办法,通过 JobExecutionContext 传参,而我的计划是所有工作类都已实例化实现,我心愿在触发时间接返回对应的对象实例去执行即可,因而须要去批改 JobFactory,triggers是上一步中所说的存储定时工作的 Map,而 bundle.getJobDetail().getKey().getName() 其实就是获取到了工作的 key,在我这里其实也就是定时工作 id,这个与前面步骤的代码绝对应,也就是此时定时工作触发时,会拿到我提前准备好的 Map 中的对应工作实例去执行,笼罩默认行为。

  • 实例化 ScheduledClauseTriggerEngine 并创立定时工作,上面为比拟重要的几行代码:

    // 此局部代码为实例化并创立一个定时工作的代码,我将其封装为了一个办法,不便调用,在初始化办法中查问所有定时工作循环调用即可
    // ... 疏忽数据库查问及参数校验的过程
    
    // 创立 JobDetail
    JobDetail jobDetail = new JobDetailImpl()
                  .getJobBuilder()
                  // 工作的 id 以及 group,id 为数据库中的 id,在上一步设置的 JobFactory,当改工作被调度时,会依据此 id 去获取到要执行的工作
                  .withIdentity(clauseTriggerId.toString(), SCHEDULE_JOB_GROUP_NAME)
                  // 工作形容,可选
                  .withDescription(clauseTrigger.getName())
                  // 这里间接传入 ScheduledClauseTriggerEngine.class 即可
                  .ofType(ScheduledClauseTriggerEngine.class)
                  .build();
    // 增加触发器并开始调度工作
    scheduler.scheduleJob(jobDetail, TriggerBuilder.newTrigger()
            // 要调度的工作的 key
            .withIdentity(clauseTriggerId.toString())
            // 触发周期,triggerConfigToCronExpression 是将数据库中的工夫配置转换为 corn 表达式的办法
            .withSchedule(CronScheduleBuilder.cronSchedule(triggerConfigToCronExpression(clauseTrigger.getTriggerConfig())))
            .build());
    
    // 此时其实定时工作曾经开始失效
    
    // 后续操作...
    triggerEngine.setTemplateList(scheduledTemplates);
    log.info("add one scheduled clause trigger: {}", clauseTrigger.getName());
    return triggerEngine;
  • 定时工作的增删改:

    • 依据 id 新增一个工作:

      public void addTrigger(int triggerId) {
        // 依据 triggerId 实例化和启动一个定时工作,并增加到定时工作的 Map 中去,instanceOneScheduledClauseTriggerEngine 就是上一步所封装的办法
        ScheduledClauseTriggerEngine triggerEngine = triggers.computeIfAbsent(triggerId, k -> instanceOneScheduledClauseTriggerEngine(triggerId));
      }
    • 依据 id 删除一个工作:

      public void removeTrigger(int clauseTriggerId) {
            try {
                // 生成要删除的 jobKey,留神此处必须传入方才所应用的的 group name,否则会应用默认的组名,便无奈查问到咱们想要删除的工作
                JobKey jobKey = JobKey.jobKey(String.valueOf(clauseTriggerId), SCHEDULE_JOB_GROUP_NAME);
                // 移除定时工作
                scheduler.deleteJob(jobKey);
                // 从 Map 中移除对应的对象
                triggers.remove(clauseTriggerId);
                log.info("remove schedule trigger: {}", jobKey);
            } catch (SchedulerException e) {log.error(e.getMessage(), e);
            }
      }
    • 依据 id 更新一个工作:

      public void updateTrigger(int triggerId) {
          // 因为我我的项目中的定时工作可能工作执行内容会变,因而我是将定时工作删除再从新增加,若定时工作只会有触发工夫的变动,也可应用 rescheduleJob 办法只更新触发工夫
          removeTrigger(triggerId);
          addTrigger(triggerId);
      }
  • 集群环境下须要留神的中央

    • 多台机器时,我须要只有一台机器执行工作,而其余机器不执行,在此处我应用了 Redis 作为锁,谋求简便,当然也可应用 zookeeper。

      @Override
      public void execute(JobExecutionContext context) {String jobKey = context.getJobDetail().getKey().toString();
          String redisKey = JOB_REDIS_PREFIX + jobKey;
          // 判断是否获取到锁
          //noinspection ConstantConditions
          if(redisTemplate.opsForValue().increment(redisKey) != 1L){log.info("job {} has in running", jobKey);
              return;
          }
      
          // 留神下面的判断必须放在 try-finally 块之外,否则会导致一个隐秘的 BUG(无论以后机器是否获取到锁,都会执行 finally 中的办法,开释掉锁,产生谬误)
      
          // 为锁加上默认过期工夫
          redisTemplate.expire(redisKey, 3600, TimeUnit.SECONDS);
          try {MDC.put("traceId", randomId());
              log.info("execute schedule job: {}", jobKey);
              long l = System.currentTimeMillis();
              trigger();
              log.info("finish job: {}, used time: {}ms", jobKey, System.currentTimeMillis()-l);
          } catch (Exception ex){log.error(ex.getMessage(), ex);
          }finally {MDC.clear();
              // 开释锁
              redisTemplate.delete(redisKey);
          }
      }

      其实在之前,我写了一个注解:@RedisLock,能够通过注解形式间接为某个办法加分布式锁,然而注解不能传入变量,只能传入常量,在这个我的项目,锁的 key 是动静的,无奈间接应用,便先采纳间接写代码的模式,前期能够增加上此性能,通过注解传入 SpringEL 表达式解析办法入参,就能够实现动静 key 值了。

    • 多台机器定时工作更新问题,当定时工作配置更改时,我须要响应的批改定时工作,然而多台机器,我不能一台一台机器的手动去调用对应的办法,因而我想到了应用 redis 的公布订阅去实现,因为 Redis 的默认音讯模式是群发模式,刚好合乎我的需要,若我的项目中有 MQ,也可配置一个群发的 MQ Topic 去实现,稍微简单一些。
      附我所应用的代码,供参考,基于 spring-redis:

      @Bean
        RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
                                                                    ScheduledTriggerService scheduledTriggerService,
                                                                    DwdDataTriggerService dwdDataTriggerService,
                                                                    ClauseExecuteService clauseExecuteService) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
      
            //trigger 公布订阅
            container.addMessageListener((message, bytes) -> {String body = new String(message.getBody(), StandardCharsets.UTF_8);
      
                if (body.startsWith("scheduled-trigger-change")) {Assert.isTrue(body.matches("^scheduled-trigger-change:((add)|(remove)|(update)):\\d+$"),"invalid scheduled-trigger-change message:" + body);
                    String[] split = body.split(":");
                    String type = split[1];
                    int triggerId = Integer.parseInt(split[2]);
      
                    switch (type) {
                        case "add":
                            scheduledTriggerService.addTrigger(triggerId);
                            break;
                        case "remove":
                            scheduledTriggerService.removeTrigger(triggerId);
                            break;
                        case "update":
                            scheduledTriggerService.updateTrigger(triggerId);
                            break;
                        default:
                            scheduledTriggerService.refreshAllTriggerEngine();
                            break;
                    }
                } else {log.info("receive redis message, topic: {}, body, {}", new String(message.getChannel()), body);
                    dwdDataTriggerService.refreshClauseTrigger();}
            }, new ChannelTopic("trigger-config-change"));
            return container;
        }

      发送音讯:

      redisTemplate.convertAndSend("trigger-config-change", "scheduled-trigger-change:update:0");

      当定时配置变更时,发送 redis 音讯即可。
      总结:
      本篇文章基于我的理论我的项目,解说了借助于 quartz 框架的定时工作动静增删改的计划,然而因我的项目而异,我也做了许多定制化的操作,我的思路就是一项定时工作配置对应一个对象实例,工作触发时间接拿到对应的对象实例进行调用,然而 quartz 框架的默认调度计划不是这样的,所以做了一下调整,此外还减少了集群环境的反对。本篇文章提供一种计划或者说思路,理论应用时还须要大家联合本人的需要进行正当更改或优化,例如当定时工作比拟轻量的时候,我认为可不借助于框架,应用轮询也未尝不是一种简略无效的计划。

正文完
 0