通过前文咱们根本梳理了定时工作体系:Timer和ScheduledExecutorService是JDK内置的定时工作计划,以及Netty外部基于工夫轮实现的HashedWheelTimer,再到Quartz以及分布式工作(ElasticJob,xxl-job等等)。对于Springboot简略利用,还能够采纳Spring自带task形式,本文次要介绍Spring自带的Task的案例和其实现形式。@pdai
  • SpringBoot定时工作 - Spring自带的定时工作是如何实现的?有何留神点?

    • 实现案例

      • @EnableScheduling+@Scheduled
    • 进一步了解

      • 应用Spring Task要留神什么?
      • Spring Task的原理?

        • @EnableScheduling注解
        • SchedulingConfiguration中初始化ScheduledAnnotationBeanPostProcessor
        • ScheduledTaskRegistrar注册task
        • ScheduledAnnotationBeanPostProcessor加载Scheduled注解
        • ScheduledTaskRegistrar 中解析task
        • TaskScheduler对Task解决
    • 示例源码
    • 更多内容

实现案例

Spring Task封装的比拟好,应用非常简单。

@EnableScheduling+@Scheduled

  • 通过@EnableScheduling启用定时工作,@Scheduled定义工作
@EnableScheduling@Configurationpublic class ScheduleDemo {    /**     * 每隔1分钟执行一次。     */    @Scheduled(fixedRate = 1000 * 60 * 1)    public void runScheduleFixedRate() {        log.info("runScheduleFixedRate: current DateTime, {}", LocalDateTime.now());    }    /**     * 每个整点小时执行一次。     */    @Scheduled(cron = "0 0 */1 * * ?")    public void runScheduleCron() {        log.info("runScheduleCron: current DateTime, {}", LocalDateTime.now());    }}
  • @Scheduled所反对的参数
  1. cron:cron表达式,指定工作在特定工夫执行;
  2. fixedDelay:示意上一次工作执行实现后多久再次执行,参数类型为long,单位ms;
  3. fixedDelayString:与fixedDelay含意一样,只是参数类型变为String;
  4. fixedRate:示意按肯定的频率执行工作,参数类型为long,单位ms;
  5. fixedRateString: 与fixedRate的含意一样,只是将参数类型变为String;
  6. initialDelay:示意提早多久再第一次执行工作,参数类型为long,单位ms;
  7. initialDelayString:与initialDelay的含意一样,只是将参数类型变为String;
  8. zone:时区,默认为以后时区,个别没有用到。

进一步了解

咱们再通过一些问题来帮忙你更深刻了解Spring Task实现形式。@pdai

应用Spring Task要留神什么?

  • 对于异样解决

倡议自行处理异样

  • 对于超时解决

在理论的开发中,这个问题常常会呈现,比方执行一段时间后定时工作不再执行了; 这种状况会产生在,比方你调用一个第三方接口,没有设置调用超时,继而引发异样,这时候当前任务便阻塞了。

Spring Task的原理?

Spring Task的源码在这里:

@EnableScheduling注解

增加@EnableScheduling注解会主动注入SchedulingConfiguration

 * @author Chris Beams * @author Juergen Hoeller * @since 3.1 * @see Scheduled * @see SchedulingConfiguration * @see SchedulingConfigurer * @see ScheduledTaskRegistrar * @see Trigger * @see ScheduledAnnotationBeanPostProcessor */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Import(SchedulingConfiguration.class)@Documentedpublic @interface EnableScheduling {}

SchedulingConfiguration中初始化ScheduledAnnotationBeanPostProcessor

SchedulingConfiguration配置中主动初始化ScheduledAnnotationBeanPostProcessor

@Configuration(proxyBeanMethods = false)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class SchedulingConfiguration {    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {        return new ScheduledAnnotationBeanPostProcessor();    }}

什么是BeanPostProcessor? 咱们在前文中有详解的解说,具体看Spring外围之管制反转(IOC)源码解析

Spring 容器中 Bean 的生命周期流程

ScheduledTaskRegistrar注册task

在ScheduledAnnotationBeanPostProcessor构造函数中初始化了ScheduledTaskRegistrar

/**    * Create a default {@code ScheduledAnnotationBeanPostProcessor}.    */public ScheduledAnnotationBeanPostProcessor() {    this.registrar = new ScheduledTaskRegistrar();}

ScheduledTaskRegistrar最次要的是注册各种类型的task (这种形式在新的版本中曾经废除了)

protected void scheduleTasks() {    if (this.taskScheduler == null) {        this.localExecutor = Executors.newSingleThreadScheduledExecutor();        this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);    }    if (this.triggerTasks != null) {        for (TriggerTask task : this.triggerTasks) {            addScheduledTask(scheduleTriggerTask(task));        }    }    if (this.cronTasks != null) {        for (CronTask task : this.cronTasks) {            addScheduledTask(scheduleCronTask(task));        }    }    if (this.fixedRateTasks != null) {        for (IntervalTask task : this.fixedRateTasks) {            addScheduledTask(scheduleFixedRateTask(task));        }    }    if (this.fixedDelayTasks != null) {        for (IntervalTask task : this.fixedDelayTasks) {            addScheduledTask(scheduleFixedDelayTask(task));        }    }}

注册哪些Task,怎么设计类的呢?

ScheduledAnnotationBeanPostProcessor加载Scheduled注解

在BeanPostProcessor的postProcessAfterInitialization阶段加载Scheduled注解

@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) {    if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||            bean instanceof ScheduledExecutorService) {        // Ignore AOP infrastructure such as scoped proxies.        return bean;    }    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);    if (!this.nonAnnotatedClasses.contains(targetClass) &&            AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {        Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,                (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {                    Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(                            method, Scheduled.class, Schedules.class);                    return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);                });        if (annotatedMethods.isEmpty()) {            this.nonAnnotatedClasses.add(targetClass);            if (logger.isTraceEnabled()) {                logger.trace("No @Scheduled annotations found on bean class: " + targetClass);            }        }        else {            // Non-empty set of methods            annotatedMethods.forEach((method, scheduledAnnotations) ->                    scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));            if (logger.isTraceEnabled()) {                logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +                        "': " + annotatedMethods);            }        }    }    return bean;}

Scheduled注解是增加到办法级别,具体如下

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@Repeatable(Schedules.class)public @interface Scheduled {    /**     * A special cron expression value that indicates a disabled trigger: {@value}.     * <p>This is primarily meant for use with <code>${...}</code> placeholders,     * allowing for external disabling of corresponding scheduled methods.     * @since 5.1     * @see ScheduledTaskRegistrar#CRON_DISABLED     */    String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;    /**     * A cron-like expression, extending the usual UN*X definition to include triggers     * on the second, minute, hour, day of month, month, and day of week.     * <p>For example, {@code "0 * * * * MON-FRI"} means once per minute on weekdays     * (at the top of the minute - the 0th second).     * <p>The fields read from left to right are interpreted as follows.     * <ul>     * <li>second</li>     * <li>minute</li>     * <li>hour</li>     * <li>day of month</li>     * <li>month</li>     * <li>day of week</li>     * </ul>     * <p>The special value {@link #CRON_DISABLED "-"} indicates a disabled cron     * trigger, primarily meant for externally specified values resolved by a     * <code>${...}</code> placeholder.     * @return an expression that can be parsed to a cron schedule     * @see org.springframework.scheduling.support.CronExpression#parse(String)     */    String cron() default "";    /**     * A time zone for which the cron expression will be resolved. By default, this     * attribute is the empty String (i.e. the server's local time zone will be used).     * @return a zone id accepted by {@link java.util.TimeZone#getTimeZone(String)},     * or an empty String to indicate the server's default time zone     * @since 4.0     * @see org.springframework.scheduling.support.CronTrigger#CronTrigger(String, java.util.TimeZone)     * @see java.util.TimeZone     */    String zone() default "";    /**     * Execute the annotated method with a fixed period between the end of the     * last invocation and the start of the next.     * <p>The time unit is milliseconds by default but can be overridden via     * {@link #timeUnit}.     * @return the delay     */    long fixedDelay() default -1;    /**     * Execute the annotated method with a fixed period between the end of the     * last invocation and the start of the next.     * <p>The time unit is milliseconds by default but can be overridden via     * {@link #timeUnit}.     * @return the delay as a String value &mdash; for example, a placeholder     * or a {@link java.time.Duration#parse java.time.Duration} compliant value     * @since 3.2.2     */    String fixedDelayString() default "";    /**     * Execute the annotated method with a fixed period between invocations.     * <p>The time unit is milliseconds by default but can be overridden via     * {@link #timeUnit}.     * @return the period     */    long fixedRate() default -1;    /**     * Execute the annotated method with a fixed period between invocations.     * <p>The time unit is milliseconds by default but can be overridden via     * {@link #timeUnit}.     * @return the period as a String value &mdash; for example, a placeholder     * or a {@link java.time.Duration#parse java.time.Duration} compliant value     * @since 3.2.2     */    String fixedRateString() default "";    /**     * Number of units of time to delay before the first execution of a     * {@link #fixedRate} or {@link #fixedDelay} task.     * <p>The time unit is milliseconds by default but can be overridden via     * {@link #timeUnit}.     * @return the initial     * @since 3.2     */    long initialDelay() default -1;    /**     * Number of units of time to delay before the first execution of a     * {@link #fixedRate} or {@link #fixedDelay} task.     * <p>The time unit is milliseconds by default but can be overridden via     * {@link #timeUnit}.     * @return the initial delay as a String value &mdash; for example, a placeholder     * or a {@link java.time.Duration#parse java.time.Duration} compliant value     * @since 3.2.2     */    String initialDelayString() default "";    /**     * The {@link TimeUnit} to use for {@link #fixedDelay}, {@link #fixedDelayString},     * {@link #fixedRate}, {@link #fixedRateString}, {@link #initialDelay}, and     * {@link #initialDelayString}.     * <p>Defaults to {@link TimeUnit#MICROSECONDS}.     * <p>This attribute is ignored for {@linkplain #cron() cron expressions}     * and for {@link java.time.Duration} values supplied via {@link #fixedDelayString},     * {@link #fixedRateString}, or {@link #initialDelayString}.     * @return the {@code TimeUnit} to use     * @since 5.3.10     */    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;}

@Scheduled所反对的参数:

  1. cron:cron表达式,指定工作在特定工夫执行;
  2. fixedDelay:示意上一次工作执行实现后多久再次执行,参数类型为long,单位ms;
  3. fixedDelayString:与fixedDelay含意一样,只是参数类型变为String;
  4. fixedRate:示意按肯定的频率执行工作,参数类型为long,单位ms;
  5. fixedRateString: 与fixedRate的含意一样,只是将参数类型变为String;
  6. initialDelay:示意提早多久再第一次执行工作,参数类型为long,单位ms;
  7. initialDelayString:与initialDelay的含意一样,只是将参数类型变为String;
  8. zone:时区,默认为以后时区,个别没有用到。

获取到办法上Scheduled注解(对工作的定义),通过processScheduled解决具体类型的task

/**    * Process the given {@code @Scheduled} method declaration on the given bean.    * @param scheduled the {@code @Scheduled} annotation    * @param method the method that the annotation has been declared on    * @param bean the target bean instance    * @see #createRunnable(Object, Method)    */protected void processScheduled(Scheduled scheduled, Method method, Object bean) {    try {        Runnable runnable = createRunnable(bean, method);        boolean processedSchedule = false;        String errorMessage =                "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";        Set<ScheduledTask> tasks = new LinkedHashSet<>(4);        // Determine initial delay        long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());        String initialDelayString = scheduled.initialDelayString();        if (StringUtils.hasText(initialDelayString)) {            Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");            if (this.embeddedValueResolver != null) {                initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);            }            if (StringUtils.hasLength(initialDelayString)) {                try {                    initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());                }                catch (RuntimeException ex) {                    throw new IllegalArgumentException(                            "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");                }            }        }        // Check cron expression        String cron = scheduled.cron();        if (StringUtils.hasText(cron)) {            String zone = scheduled.zone();            if (this.embeddedValueResolver != null) {                cron = this.embeddedValueResolver.resolveStringValue(cron);                zone = this.embeddedValueResolver.resolveStringValue(zone);            }            if (StringUtils.hasLength(cron)) {                Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");                processedSchedule = true;                if (!Scheduled.CRON_DISABLED.equals(cron)) {                    TimeZone timeZone;                    if (StringUtils.hasText(zone)) {                        timeZone = StringUtils.parseTimeZoneString(zone);                    }                    else {                        timeZone = TimeZone.getDefault();                    }                    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));                }            }        }        // At this point we don't need to differentiate between initial delay set or not anymore        if (initialDelay < 0) {            initialDelay = 0;        }        // Check fixed delay        long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());        if (fixedDelay >= 0) {            Assert.isTrue(!processedSchedule, errorMessage);            processedSchedule = true;            tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));        }        String fixedDelayString = scheduled.fixedDelayString();        if (StringUtils.hasText(fixedDelayString)) {            if (this.embeddedValueResolver != null) {                fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);            }            if (StringUtils.hasLength(fixedDelayString)) {                Assert.isTrue(!processedSchedule, errorMessage);                processedSchedule = true;                try {                    fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());                }                catch (RuntimeException ex) {                    throw new IllegalArgumentException(                            "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");                }                tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));            }        }        // Check fixed rate        long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());        if (fixedRate >= 0) {            Assert.isTrue(!processedSchedule, errorMessage);            processedSchedule = true;            tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));        }        String fixedRateString = scheduled.fixedRateString();        if (StringUtils.hasText(fixedRateString)) {            if (this.embeddedValueResolver != null) {                fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);            }            if (StringUtils.hasLength(fixedRateString)) {                Assert.isTrue(!processedSchedule, errorMessage);                processedSchedule = true;                try {                    fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());                }                catch (RuntimeException ex) {                    throw new IllegalArgumentException(                            "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");                }                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));            }        }        // Check whether we had any attribute set        Assert.isTrue(processedSchedule, errorMessage);        // Finally register the scheduled tasks        synchronized (this.scheduledTasks) {            Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));            regTasks.addAll(tasks);        }    }    catch (IllegalArgumentException ex) {        throw new IllegalStateException(                "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());    }}

ScheduledTaskRegistrar 中解析task

以CronTask为例,如果定义了taskScheduler则由taskScheduler执行,如果没有放到unresolvedTasks中。

/**    * Schedule the specified cron task, either right away if possible    * or on initialization of the scheduler.    * @return a handle to the scheduled task, allowing to cancel it    * (or {@code null} if processing a previously registered task)    * @since 4.3    */@Nullablepublic ScheduledTask scheduleCronTask(CronTask task) {    ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);    boolean newTask = false;    if (scheduledTask == null) {        scheduledTask = new ScheduledTask(task);        newTask = true;    }    if (this.taskScheduler != null) {        scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());    }    else {        addCronTask(task);        this.unresolvedTasks.put(task, scheduledTask);    }    return (newTask ? scheduledTask : null);}

TaskScheduler对Task解决

默认是ConcurrentTaskScheduler, 解决办法如下

@Override@Nullablepublic ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {    try {        if (this.enterpriseConcurrentScheduler) {            return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);        }        else {            ErrorHandler errorHandler =                    (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));            return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();        }    }    catch (RejectedExecutionException ex) {        throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);    }}

EnterpriseConcurrentTriggerScheduler 是 JSR-236 Trigger规范,它的解决办法如下

/**    * Delegate that adapts a Spring Trigger to a JSR-236 Trigger.    * Separated into an inner class in order to avoid a hard dependency on the JSR-236 API.    */private class EnterpriseConcurrentTriggerScheduler {    public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) {        ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;        return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {            @Override            @Nullable            public Date getNextRunTime(@Nullable LastExecution le, Date taskScheduledTime) {                return (trigger.nextExecutionTime(le != null ?                        new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :                        new SimpleTriggerContext()));            }            @Override            public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {                return false;            }        });    }}

如果没有应用EnterpriseConcurrentTriggerScheduler, 则应用ReschedulingRunnable,实质上由ScheduledExecutorService解决

public ReschedulingRunnable(Runnable delegate, Trigger trigger, Clock clock,            ScheduledExecutorService executor, ErrorHandler errorHandler) {    super(delegate, errorHandler);    this.trigger = trigger;    this.triggerContext = new SimpleTriggerContext(clock);    this.executor = executor;}@Nullablepublic ScheduledFuture<?> schedule() {    synchronized (this.triggerContextMonitor) {        this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);        if (this.scheduledExecutionTime == null) {            return null;        }        long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();        this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);        return this;    }}

示例源码

https://github.com/realpdai/t...

更多内容

辞别碎片化学习,无套路一站式体系化学习后端开发: Java 全栈常识体系 https://pdai.tech