这两天应用Scheduled注解来解决定时问题的时候,发现不能失常应用。所以就有了这一篇博客
@Scheduled(initialDelay = 2000,fixedDelay = 1000)private void test(){ System.out.println(Math.random());}
单从源码的doc文件中能够看到这么一段
You can add the `@Scheduled` annotation to a method, along with trigger metadata. Forexample, the following method is invoked every five seconds with a fixed delay,meaning that the period is measured from the completion time of each precedinginvocation:[source,java,indent=0][subs="verbatim,quotes"]---- @Scheduled(fixedDelay=5000) public void doSomething() { // something that should run periodically }----
将源码放进我的环境运行,发现并不能失效。那就只能先看看源码来看看它到底是怎么失效的
注解Scheduled的源码
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@Repeatable(Schedules.class)public @interface Scheduled { String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED; String cron() default ""; String zone() default ""; long fixedDelay() default -1; String fixedDelayString() default ""; long fixedRate() default -1; String fixedRateString() default ""; long initialDelay() default -1; String initialDelayString() default "";}
而后,动静加载的代码在 ScheduledAnnotationBeanPostProcessor 的 postProcessAfterInitialization 办法中
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
首先,通过AopProxyUtils.ultimateTargetClass获取传入的Bean的最终类(是哪个类),而后判断以后类有没有在this.nonAnnotatedClasses中,如果没有在,则持续应用AnnotationUtils.isCandidateClass判断以后类是不是一个非抽象类或者接口,如果都满足,则调用
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); });
获取到某个办法的所有Scheduled。也就是说,一个办法,是能够同时被屡次定义周期化的。也就是这样
@Scheduled(fixedDelay = 5000)@Schedules({@Scheduled(fixedDelay = 5000),@Scheduled(fixedDelay = 3000)})public void test(){ logger.info("123");}
持续剖析源码,咱们能够发现,在失去targetClass(指标类)的所有带有@Scheduled或者@Schedules注解的办法并放到annotatedMethods中后,如果annotatedMethods的大小为0,则将以后指标targetClass放到this.nonAnnotatedClasses中,标记这个类中没有被相干注解润饰,不便新的调用方进行判断。如果annotatedMethods的大小不为空,则
nnotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
将独自解决每个周期性工作。上面来看看到底是怎么解决的
protected void processScheduled(Scheduled scheduled, Method method, Object bean) { // 能够看见,scheduled method的运行必须在Bean环境中,所以用@Schedules或者@Scheduled的办法必须在一个bean类外面 try { Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // Determine initial delay 定义开始工夫 long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); // initialDelay 和 initialDelayString 只能同时定义一个 if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } // Check cron expression String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { // 调用 this.embeddedValueResolver.resolveStringValue 解析cron cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { // 如果在initialDelay定义的状况下,cron是不失效的 Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; // String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED; // public static final String CRON_DISABLED = "-"; // 如果cron不等于 '-' if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; // 解析timeZone if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } // 应用 new CronTrigger(cron, timeZone) 创立定时触发器 // 应用 new CronTask(runnable, new CronTrigger(cron, timeZone)) 创立一个定时工作,定时触发器会触发runnable // 调用 this.registrar.scheduleCronTask 注册工作到以后环境中 // tasks是一个汇合,防止反复注册雷同的工作 tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // At this point we don't need to differentiate between initial delay set or not anymore if (initialDelay < 0) { initialDelay = 0; } // Check fixed delay long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0) { // 如果当前任务没有被退出到tasks Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; // 应用 new FixedDelayTask(runnable, fixedDelay, initialDelay) 来注册提早工作 tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { // 如果没有传fixedDelay,然而传了fixedDelayString,能够应用它的值 if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = parseDelayAsLong(fixedDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } // Check fixed rate // 如果下面的都没满足,则判断fixedDate和fixedDateString的值 long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = parseDelayAsLong(fixedRateString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks // 同步的注册工作 退出缓存,方便使用 synchronized (this.scheduledTasks) { Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); }}
上面为某个类外面的办法是创立Runnabled的办法,传入办法和指标类就能够失去
protected Runnable createRunnable(Object target, Method method) { Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod);}
上面是scheduleCronTask办法的定义,能够看见这里会对task去重
@Nullablepublic ScheduledTask scheduleCronTask(CronTask task) { ScheduledTask scheduledTask = this.unresolvedTasks.remove(task); boolean newTask = false; if (scheduledTask == null) { scheduledTask = new ScheduledTask(task); newTask = true; } if (this.taskScheduler != null) { scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); } else { addCronTask(task); this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null);}
通过processScheduled办法会将某个被@Scheduled或者@Schedules注解润饰的办法注册进全局的scheduledTask环境中。
也就是说,办法postProcessAfterInitialization会将整个bean中的所有被@Scheduled或者@Schedules注解润饰的办法都注册进全局定时执行环境。
哪些地方调用了postProcessAfterInitialization
眼帘移到抽象类AbstractAutowireCapableBeanFactory的applyBeanPostProcessorsAfterInitialization办法中,这里是源码中惟一调用postProcessAfterInitialization的中央,也就是说,所有的周期工作都是在这里被注入到环境中的(其实不只是被@Scheduled或者@Schedules润饰的周期性工作)
@Overridepublic Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName) throws BeansException { Object result = existingBean; for (BeanPostProcessor processor : getBeanPostProcessors()) { Object current = processor.postProcessAfterInitialization(result, beanName); if (current == null) { return result; } result = current; } return result;}
那么既然来了,咱们还是剖析一下,这里到底做了什么。
首先,通过调用getBeanPostProcessors()获取到了所有的BeanPostProcessor,这个类能够了解为是各种Bean的加载器。而咱们的ScheduledAnnotationBeanPostProcessor就是其中之一。依据调用栈能够发现,最终追溯到了AbstractBeanFactory.beanPostProcessors(List\<BeanPostProcessor>类型),上面两个办法是增加函数
@Overridepublic void addBeanPostProcessor(BeanPostProcessor beanPostProcessor) { Assert.notNull(beanPostProcessor, "BeanPostProcessor must not be null"); // Remove from old position, if any this.beanPostProcessors.remove(beanPostProcessor); // Add to end of list this.beanPostProcessors.add(beanPostProcessor);}public void addBeanPostProcessors(Collection<? extends BeanPostProcessor> beanPostProcessors) { this.beanPostProcessors.removeAll(beanPostProcessors); this.beanPostProcessors.addAll(beanPostProcessors);}
能够看进去,beanPostProcessors中没有反复的Processors。咱们把下面那个函数定义为办法一,前面那个办法定义为办法二,办法二在上面这个办法中被调用了,整个调用栈如下:
private static void registerBeanPostProcessors( ConfigurableListableBeanFactory beanFactory, List<BeanPostProcessor> postProcessors) { if (beanFactory instanceof AbstractBeanFactory) { // Bulk addition is more efficient against our CopyOnWriteArrayList there ((AbstractBeanFactory) beanFactory).addBeanPostProcessors(postProcessors); } else { for (BeanPostProcessor postProcessor : postProcessors) { beanFactory.addBeanPostProcessor(postProcessor); } }}
这四次调用都呈现在同一个函数registerBeanPostProcessors中,那么咱们能够假如,这里的调用程序,就是Bean加载的先后顺序(做java开发的应该都晓得,如果代码写得不当,定义了谬误的Bean加载程序回导致注入无奈实现,从而造成代码无奈运行的问题)。那么,Bean的注册程序就是
priorityOrderedPostProcessors > orderedPostProcessors > nonOrderedPostProcessors > internalPostProcessors
registerBeanPostProcessors的源码剖析与题目没有什么关系,这里就不做剖析了。留着下次剖析Bean的时候再仔细分析。
从调用方剖析的路走不通,咱们能够尝试从最远头登程
咱们都晓得,应用@Scheduled或者@Schedules之前,必须要在全局加上@EnableScheduling的注解。那么咱们就能够从这个注解动手进行剖析。
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Import(SchedulingConfiguration.class)@Documentedpublic @interface EnableScheduling {}
惋惜的是,源码中并没有对注解@EnableScheduling进行解析的代码。可是这是为什么呢?咱们留神到,润饰这个注解的有一个咱们素来没有见过的注解@Import,会不会是@Import
其中,@Import的源码如下
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface Import { Class<?>[] value();}
其中,定义@Import注解行为的源码在类ConfigurationClassParser的collectImports办法中,来看看吧
private void collectImports(SourceClass sourceClass, Set<SourceClass> imports, Set<SourceClass> visited) throws IOException { if (visited.add(sourceClass)) { for (SourceClass annotation : sourceClass.getAnnotations()) { String annName = annotation.getMetadata().getClassName(); if (!annName.equals(Import.class.getName())) { collectImports(annotation, imports, visited); } } imports.addAll(sourceClass.getAnnotationAttributes(Import.class.getName(), "value")); }}
这个函数是一个递归函数,会一直地查找某个注解以及润饰它的注解所有被Import注解导入的配置文件。这个函数的调用栈如下
private Set<SourceClass> getImports(SourceClass sourceClass) throws IOException { Set<SourceClass> imports = new LinkedHashSet<>(); Set<SourceClass> visited = new LinkedHashSet<>(); collectImports(sourceClass, imports, visited); return imports;}// getImports在ConfigurationClassParser的doProcessConfigurationClass办法中被调用processImports(configClass, sourceClass, getImports(sourceClass), filter, true);// doProcessConfigurationClass在ConfigurationClassParser的processConfigurationClass办法中被调用do { sourceClass = doProcessConfigurationClass(configClass, sourceClass, filter);}while (sourceClass != null);this.configurationClasses.put(configClass, configClass);
因为调用栈切实是太深,最初会到FrameworkServlet的refresh()办法上,临时我只能下一个论断就是,在Application的主类下面润饰的注解并不会独自写反射办法来实现,而是会通过spring提供的对立解决形式进行解决。因为在整个spring框架源码中都没有找到对该注解进行反射操作的内容。
总结
通过这一篇文章,咱们从源码中学习了@Scheduled 和 @Schedules 这两个注解的,他们是如何解析参数,如何退出工夫触发器,不过目前还欠缺工夫触发器到底是如何工作的这部分的内容,后续我会补上。
另外,咱们也首次理解了,这种注解是如何被spring框架调用到的,晓得了BeanFactory,也晓得了ConfigurationClassParser,这给咱们接下来全面钻研Spring容器这一块提供了契机。
最初的最初,我代码外面的问题就是没有在主类外面加@EnableScheduling注解
炒鸡辣鸡原创文章,转载请注明起源