这两天应用 Scheduled 注解来解决定时问题的时候,发现不能失常应用。所以就有了这一篇博客
@Scheduled(initialDelay = 2000,fixedDelay = 1000)
private void test(){System.out.println(Math.random());
}
单从源码的 doc 文件中能够看到这么一段
You can add the `@Scheduled` annotation to a method, along with trigger metadata. For
example, the following method is invoked every five seconds with a fixed delay,
meaning that the period is measured from the completion time of each preceding
invocation:
[source,java,indent=0]
[subs="verbatim,quotes"]
----
@Scheduled(fixedDelay=5000)
public void doSomething() {// something that should run periodically}
----
将源码放进我的环境运行,发现并不能失效。那就只能先看看源码来看看它到底是怎么失效的
注解 Scheduled 的源码
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;
String cron() default "";
String zone() default "";
long fixedDelay() default -1;
String fixedDelayString() default "";
long fixedRate() default -1;
String fixedRateString() default "";
long initialDelay() default -1;
String initialDelayString() default "";}
而后,动静加载的代码在 ScheduledAnnotationBeanPostProcessor 的 postProcessAfterInitialization 办法中
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {logger.trace("No @Scheduled annotations found on bean class:" + targetClass);
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {logger.trace(annotatedMethods.size() + "@Scheduled methods processed on bean'" + beanName +
"':" + annotatedMethods);
}
}
}
return bean;
}
首先,通过 AopProxyUtils.ultimateTargetClass 获取传入的 Bean 的最终类(是哪个类),而后判断以后类有没有在 this.nonAnnotatedClasses 中,如果没有在,则持续应用 AnnotationUtils.isCandidateClass 判断以后类是不是一个非抽象类或者接口,如果都满足,则调用
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
获取到某个办法的所有 Scheduled。也就是说,一个办法,是能够同时被屡次定义周期化的。也就是这样
@Scheduled(fixedDelay = 5000)
@Schedules({@Scheduled(fixedDelay = 5000),@Scheduled(fixedDelay = 3000)})
public void test(){logger.info("123");
}
持续剖析源码,咱们能够发现,在失去 targetClass(指标类)的所有带有 @Scheduled 或者 @Schedules 注解的办法并放到 annotatedMethods 中后,如果 annotatedMethods 的大小为 0,则将以后指标 targetClass 放到 this.nonAnnotatedClasses 中,标记这个类中没有被相干注解润饰,不便新的调用方进行判断。如果 annotatedMethods 的大小不为空,则
nnotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
将独自解决每个周期性工作。上面来看看到底是怎么解决的
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
// 能够看见,scheduled method 的运行必须在 Bean 环境中,所以用 @Schedules 或者 @Scheduled 的办法必须在一个 bean 类外面
try {Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the'cron','fixedDelay(String)', or'fixedRate(String)'attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay 定义开始工夫
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
// initialDelay 和 initialDelayString 只能同时定义一个
if (StringUtils.hasText(initialDelayString)) {Assert.isTrue(initialDelay < 0, "Specify'initialDelay'or'initialDelayString', not both");
if (this.embeddedValueResolver != null) {initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException("Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
// 调用 this.embeddedValueResolver.resolveStringValue 解析 cron
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
// 如果在 initialDelay 定义的状况下,cron 是不失效的
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
// String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;
// public static final String CRON_DISABLED = "-";
// 如果 cron 不等于 '-'
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
// 解析 timeZone
if (StringUtils.hasText(zone)) {timeZone = StringUtils.parseTimeZoneString(zone);
}
else {timeZone = TimeZone.getDefault();
}
// 应用 new CronTrigger(cron, timeZone) 创立定时触发器
// 应用 new CronTask(runnable, new CronTrigger(cron, timeZone)) 创立一个定时工作,定时触发器会触发 runnable
// 调用 this.registrar.scheduleCronTask 注册工作到以后环境中
// tasks 是一个汇合,防止反复注册雷同的工作
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {initialDelay = 0;}
// Check fixed delay
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
// 如果当前任务没有被退出到 tasks
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
// 应用 new FixedDelayTask(runnable, fixedDelay, initialDelay) 来注册提早工作
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
// 如果没有传 fixedDelay,然而传了 fixedDelayString,能够应用它的值
if (this.embeddedValueResolver != null) {fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException("Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// Check fixed rate
// 如果下面的都没满足,则判断 fixedDate 和 fixedDateString 的值
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {if (this.embeddedValueResolver != null) {fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException("Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
// 同步的注册工作 退出缓存,方便使用
synchronized (this.scheduledTasks) {Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Encountered invalid @Scheduled method'" + method.getName() + "':" + ex.getMessage());
}
}
上面为某个类外面的办法是创立 Runnabled 的办法,传入办法和指标类就能够失去
protected Runnable createRunnable(Object target, Method method) {Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
return new ScheduledMethodRunnable(target, invocableMethod);
}
上面是 scheduleCronTask 办法的定义,能够看见这里会对 task 去重
@Nullable
public ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);
newTask = true;
}
if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {addCronTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
通过 processScheduled 办法会将某个被 @Scheduled 或者 @Schedules 注解润饰的办法注册进全局的 scheduledTask 环境中。
也就是说,办法 postProcessAfterInitialization 会将整个 bean 中的所有被 @Scheduled 或者 @Schedules 注解润饰的办法都注册进全局定时执行环境。
哪些地方调用了 postProcessAfterInitialization
眼帘移到抽象类 AbstractAutowireCapableBeanFactory 的 applyBeanPostProcessorsAfterInitialization 办法中,这里是源码中惟一调用 postProcessAfterInitialization 的中央,也就是说,所有的周期工作都是在这里被注入到环境中的(其实不只是被 @Scheduled 或者 @Schedules 润饰的周期性工作)
@Override
public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName)
throws BeansException {
Object result = existingBean;
for (BeanPostProcessor processor : getBeanPostProcessors()) {Object current = processor.postProcessAfterInitialization(result, beanName);
if (current == null) {return result;}
result = current;
}
return result;
}
那么既然来了,咱们还是剖析一下,这里到底做了什么。
首先,通过调用 getBeanPostProcessors() 获取到了所有的 BeanPostProcessor,这个类能够了解为是各种 Bean 的加载器。而咱们的 ScheduledAnnotationBeanPostProcessor 就是其中之一。依据调用栈能够发现,最终追溯到了 AbstractBeanFactory.beanPostProcessors(List\<BeanPostProcessor> 类型),上面两个办法是增加函数
@Override
public void addBeanPostProcessor(BeanPostProcessor beanPostProcessor) {Assert.notNull(beanPostProcessor, "BeanPostProcessor must not be null");
// Remove from old position, if any
this.beanPostProcessors.remove(beanPostProcessor);
// Add to end of list
this.beanPostProcessors.add(beanPostProcessor);
}
public void addBeanPostProcessors(Collection<? extends BeanPostProcessor> beanPostProcessors) {this.beanPostProcessors.removeAll(beanPostProcessors);
this.beanPostProcessors.addAll(beanPostProcessors);
}
能够看进去,beanPostProcessors 中没有反复的 Processors。咱们把下面那个函数定义为办法一,前面那个办法定义为办法二,办法二在上面这个办法中被调用了,整个调用栈如下:
private static void registerBeanPostProcessors(ConfigurableListableBeanFactory beanFactory, List<BeanPostProcessor> postProcessors) {if (beanFactory instanceof AbstractBeanFactory) {
// Bulk addition is more efficient against our CopyOnWriteArrayList there
((AbstractBeanFactory) beanFactory).addBeanPostProcessors(postProcessors);
}
else {for (BeanPostProcessor postProcessor : postProcessors) {beanFactory.addBeanPostProcessor(postProcessor);
}
}
}
这四次调用都呈现在同一个函数 registerBeanPostProcessors 中,那么咱们能够假如,这里的调用程序,就是 Bean 加载的先后顺序(做 java 开发的应该都晓得,如果代码写得不当,定义了谬误的 Bean 加载程序回导致注入无奈实现,从而造成代码无奈运行的问题)。那么,Bean 的注册程序就是
priorityOrderedPostProcessors > orderedPostProcessors > nonOrderedPostProcessors > internalPostProcessors
registerBeanPostProcessors 的源码剖析与题目没有什么关系,这里就不做剖析了。留着下次剖析 Bean 的时候再仔细分析。
从调用方剖析的路走不通,咱们能够尝试从最远头登程
咱们都晓得,应用 @Scheduled 或者 @Schedules 之前,必须要在全局加上 @EnableScheduling 的注解。那么咱们就能够从这个注解动手进行剖析。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {}
惋惜的是,源码中并没有对注解 @EnableScheduling 进行解析的代码。可是这是为什么呢?咱们留神到,润饰这个注解的有一个咱们素来没有见过的注解 @Import,会不会是 @Import
其中,@Import 的源码如下
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Import {Class<?>[] value();}
其中,定义 @Import 注解行为的源码在类 ConfigurationClassParser 的 collectImports 办法中,来看看吧
private void collectImports(SourceClass sourceClass, Set<SourceClass> imports, Set<SourceClass> visited)
throws IOException {if (visited.add(sourceClass)) {for (SourceClass annotation : sourceClass.getAnnotations()) {String annName = annotation.getMetadata().getClassName();
if (!annName.equals(Import.class.getName())) {collectImports(annotation, imports, visited);
}
}
imports.addAll(sourceClass.getAnnotationAttributes(Import.class.getName(), "value"));
}
}
这个函数是一个递归函数,会一直地查找某个注解以及润饰它的注解所有被 Import 注解导入的配置文件。这个函数的调用栈如下
private Set<SourceClass> getImports(SourceClass sourceClass) throws IOException {Set<SourceClass> imports = new LinkedHashSet<>();
Set<SourceClass> visited = new LinkedHashSet<>();
collectImports(sourceClass, imports, visited);
return imports;
}
// getImports 在 ConfigurationClassParser 的 doProcessConfigurationClass 办法中被调用
processImports(configClass, sourceClass, getImports(sourceClass), filter, true);
// doProcessConfigurationClass 在 ConfigurationClassParser 的 processConfigurationClass 办法中被调用
do {sourceClass = doProcessConfigurationClass(configClass, sourceClass, filter);
}
while (sourceClass != null);
this.configurationClasses.put(configClass, configClass);
因为调用栈切实是太深,最初会到 FrameworkServlet 的 refresh() 办法上,临时我只能下一个论断就是,在 Application 的主类下面润饰的注解并不会独自写反射办法来实现,而是会通过 spring 提供的对立解决形式进行解决。因为在整个 spring 框架源码中都没有找到对该注解进行反射操作的内容。
总结
通过这一篇文章,咱们从源码中学习了 @Scheduled 和 @Schedules 这两个注解的,他们是如何解析参数,如何退出工夫触发器,不过目前还欠缺工夫触发器到底是如何工作的这部分的内容,后续我会补上。
另外,咱们也首次理解了,这种注解是如何被 spring 框架调用到的,晓得了 BeanFactory,也晓得了 ConfigurationClassParser,这给咱们接下来全面钻研 Spring 容器这一块提供了契机。
最初的最初,我代码外面的问题就是没有在主类外面加 @EnableScheduling 注解
炒鸡辣鸡原创文章,转载请注明起源