关于java:Spring-Boot-实现定时任务动态管理太爽了

7次阅读

共计 13803 个字符,预计需要花费 35 分钟才能阅读完成。

一、性能阐明

SpringBoot 的定时工作的增强工具,实现对 SpringBoot 原生的定时工作进行动静治理, 齐全兼容原生 @Scheduled 注解, 无需对本来的定时工作进行批改

二、疾速应用

具体的性能曾经封装成 SpringBoot-starter 即插即用:

<dependency>
    <groupId>com.github.guoyixing</groupId>
    <artifactId>spring-boot-starter-super-scheduled</artifactId>
    <version>0.3.1</version>
</dependency>

应用办法和源码:

  • 码云:https://gitee.com/qiaodaimadewangcai/super-scheduled
  • github:https://github.com/guoyixing/super-scheduled

举荐一个开源收费的 Spring Boot 实战我的项目:

https://github.com/javastacks/spring-boot-best-practice

三、实现原理

1、动静治理实现

(1) 配置管理介绍

@Component("superScheduledConfig")
public class SuperScheduledConfig {
    /**
     * 执行定时工作的线程池
     */
    private ThreadPoolTaskScheduler taskScheduler;

    /**
     * 定时工作名称与定时工作回调钩子  的关联关系容器
     */
    private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>();

    /**
     * 定时工作名称与定时工作须要执行的逻辑  的关联关系容器
     */
    private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>();

    /**
     * 定时工作名称与定时工作的源信息  的关联关系容器
     */
    private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>();
 /* 一般的 get/sets 省略 */
}

(2) 应用后处理器拦挡 SpringBoot 本来的定时工作

  • 实现 ApplicationContextAware 接口拿到 SpringBoot 的上下文
  • 实现 BeanPostProcessor 接口,将这个类标记为后处理器,后处理器会在每个 bean 实例化之后执行
  • 应用 @DependsOn 注解强制依赖 SuperScheduledConfig 类,让 SpringBoot 实例化 SuperScheduledPostProcessor 类之前先实例化 SuperScheduledConfig
  • 次要实现逻辑在 postProcessAfterInitialization() 办法中
@DependsOn({"superScheduledConfig"})
@Component
@Order
public class SuperScheduledPostProcessor implements BeanPostProcessor, ApplicationContextAware {protected final Log logger = LogFactory.getLog(getClass());

    private ApplicationContext applicationContext;

    /**
     * 实例化 bean 之前的操作
     * @param bean bean 实例
     * @param beanName bean 的 Name
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {return bean;}

    /**
     * 实例化 bean 之后的操作
     * @param bean bean 实例
     * @param beanName bean 的 Name
     */
    @Override
    public Object postProcessAfterInitialization(Object bean,
                                                 String beanName) throws BeansException {
        //1. 获取配置管理器
        SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class);

        //2. 获取以后实例化实现的 bean 的所有办法
        Method[] methods = bean.getClass().getDeclaredMethods();
        // 循环解决对每个办法逐个解决
        if (methods.length > 0) {for (Method method : methods) {
             //3. 尝试在该办法上获取 @Scheduled 注解(SpringBoot 的定时工作注解)Scheduled annotation = method.getAnnotation(Scheduled.class);
                // 如果无奈获取到 @Scheduled 注解,就跳过这个办法
                if (annotation == null) {continue;}
                //4. 创立定时工作的源属性
                // 创立定时工作的源属性(用来记录定时工作的配置,初始化的时候记录的是注解上本来的属性)ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean);
                // 对注解上获取到源属性中的属性进行检测
                if (!scheduledSource.check()) {throw new SuperScheduledException("在" + beanName + "Bean 中" + method.getName() + "办法的注解参数谬误");
                }
                // 生成定时工作的名称(id),应用 beanName+“.”+ 办法名
                String name = beanName + "." + method.getName();
                // 将以 key-value 的模式,将源数据存入配置管理器中,key:定时工作的名称 value:源数据
                superScheduledConfig.addScheduledSource(name, scheduledSource);
                try {
                 //5. 将本来 SpringBoot 的定时工作勾销掉
                    clearOriginalScheduled(annotation);
                } catch (Exception e) {throw new SuperScheduledException("在敞开原始办法" + beanName + method.getName() + "时呈现谬误");
                }
            }
        }
        // 最初 bean 放弃原有返回
        return bean;
    }

    /**
     * 批改注解原先的属性
     * @param annotation 注解实例对象
     * @throws Exception
     */
    private void clearOriginalScheduled(Scheduled annotation) throws Exception {changeAnnotationValue(annotation, "cron", Scheduled.CRON_DISABLED);
        changeAnnotationValue(annotation, "fixedDelay", -1L);
        changeAnnotationValue(annotation, "fixedDelayString", "");
        changeAnnotationValue(annotation, "fixedRate", -1L);
        changeAnnotationValue(annotation, "fixedRateString", "");
        changeAnnotationValue(annotation, "initialDelay", -1L);
        changeAnnotationValue(annotation, "initialDelayString", "");
    }

    /**
     * 获取 SpringBoot 的上下文
     * @param applicationContext SpringBoot 的上下文
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}
}

(3) 应用 ApplicationRunner 初始化自定义的定时工作运行器

  • 实现 ApplicationContextAware 接口拿到 SpringBoot 的上下文
  • 应用 @DependsOn 注解强制依赖 threadPoolTaskScheduler
  • 实现 ApplicationRunner 接口,在所有 bean 初始化完结之后,运行自定义逻辑
  • 次要实现逻辑在 run() 办法中
@DependsOn("threadPoolTaskScheduler")
@Component
public class SuperScheduledApplicationRunner implements ApplicationRunner, ApplicationContextAware {protected final Log logger = LogFactory.getLog(getClass());
    private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private ApplicationContext applicationContext;

 /**
     * 定时工作配置管理器
     */
    @Autowired
    private SuperScheduledConfig superScheduledConfig;
    /**
     * 定时工作执行线程
     */
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    @Override
    public void run(ApplicationArguments args) {
     //1. 定时工作配置管理器中缓存  定时工作执行线程
        superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler);
        //2. 获取所有定时工作源数据
        Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource();
        // 逐个解决定时工作
        for (String name : nameToScheduledSource.keySet()) {
            //3. 获取定时工作源数据
            ScheduledSource scheduledSource = nameToScheduledSource.get(name);
            //4. 获取所有加强类
            String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class);
            //5. 创立执行控制器
            SuperScheduledRunnable runnable = new SuperScheduledRunnable();
            // 配置执行控制器
            runnable.setMethod(scheduledSource.getMethod());
            runnable.setBean(scheduledSource.getBean());
            //6. 逐个解决加强类(增强器实现原理前面具体分析)List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
            for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
             //7. 将增强器代理成 point
                Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
                // 创立代理
                Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
                proxy.setSuperScheduledName(name);
                //8. 所有的 points 连成起来
                points.add(proxy);
            }
   // 将 point 造成调用链
            runnable.setChain(new Chain(points));
            // 将执行逻辑封装并缓存到定时工作配置管理器中
            superScheduledConfig.addRunnable(name, runnable::invoke);
            try {
             //8. 启动定时工作
                ScheduledFuture<?> schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler
                        , scheduledSource, runnable::invoke);
                // 将线程回调钩子存到工作配置管理器中
                superScheduledConfig.addScheduledFuture(name, schedule);
                logger.info(df.format(LocalDateTime.now()) + "工作" + name + "曾经启动...");

            } catch (Exception e) {throw new SuperScheduledException("工作" + name + "启动失败,错误信息:" + e.getLocalizedMessage());
            }
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}
}

(4) 进行动静治理

@Component
public class SuperScheduledManager {protected final Log logger = LogFactory.getLog(getClass());
    private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private SuperScheduledConfig superScheduledConfig;

    /**
     * 批改 Scheduled 的执行周期
     *
     * @param name scheduled 的名称
     * @param cron cron 表达式
     */
    public void setScheduledCron(String name, String cron) {
        // 终止原先的工作
        cancelScheduled(name);
        // 创立新的工作
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource.clear();
        scheduledSource.setCron(cron);
        addScheduled(name, scheduledSource);
    }

    /**
     * 批改 Scheduled 的 fixedDelay
     *
     * @param name       scheduled 的名称
     * @param fixedDelay 上一次执行结束工夫点之后多长时间再执行
     */
    public void setScheduledFixedDelay(String name, Long fixedDelay) {
        // 终止原先的工作
        cancelScheduled(name);
        // 创立新的工作
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource.clear();
        scheduledSource.setFixedDelay(fixedDelay);
        addScheduled(name, scheduledSource);
    }

    /**
     * 批改 Scheduled 的 fixedRate
     *
     * @param name      scheduled 的名称
     * @param fixedRate 上一次开始执行之后多长时间再执行
     */
    public void setScheduledFixedRate(String name, Long fixedRate) {
        // 终止原先的工作
        cancelScheduled(name);
        // 创立新的工作
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource.clear();
        scheduledSource.setFixedRate(fixedRate);
        addScheduled(name, scheduledSource);
    }

    /**
     * 查问所有启动的 Scheduled
     */
    public List<String> getRunScheduledName() {Set<String> names = superScheduledConfig.getNameToScheduledFuture().keySet();
        return new ArrayList<>(names);
    }

    /**
     * 查问所有的 Scheduled
     */
    public List<String> getAllSuperScheduledName() {Set<String> names = superScheduledConfig.getNameToRunnable().keySet();
        return new ArrayList<>(names);
    }

    /**
     * 终止 Scheduled
     *
     * @param name scheduled 的名称
     */
    public void cancelScheduled(String name) {ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name);
        scheduledFuture.cancel(true);
        superScheduledConfig.removeScheduledFuture(name);
        logger.info(df.format(LocalDateTime.now()) + "工作" + name + "曾经终止...");
    }

    /**
     * 启动 Scheduled
     *
     * @param name            scheduled 的名称
     * @param scheduledSource 定时工作的源信息
     */
    public void addScheduled(String name, ScheduledSource scheduledSource) {if (getRunScheduledName().contains(name)) {throw new SuperScheduledException("定时工作" + name + "曾经被启动过了");
        }
        if (!scheduledSource.check()) {throw new SuperScheduledException("定时工作" + name + "源数据内容谬误");
        }

        scheduledSource.refreshType();

        Runnable runnable = superScheduledConfig.getRunnable(name);
        ThreadPoolTaskScheduler taskScheduler = superScheduledConfig.getTaskScheduler();

        ScheduledFuture<?> schedule = ScheduledFutureFactory.create(taskScheduler, scheduledSource, runnable);
        logger.info(df.format(LocalDateTime.now()) + "工作" + name + "曾经启动...");

        superScheduledConfig.addScheduledSource(name, scheduledSource);
        superScheduledConfig.addScheduledFuture(name, schedule);
    }

    /**
     * 以 cron 类型启动 Scheduled
     *
     * @param name scheduled 的名称
     * @param cron cron 表达式
     */
    public void addCronScheduled(String name, String cron) {ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setCron(cron);

        addScheduled(name, scheduledSource);
    }

    /**
     * 以 fixedDelay 类型启动 Scheduled
     *
     * @param name         scheduled 的名称
     * @param fixedDelay   上一次执行结束工夫点之后多长时间再执行
     * @param initialDelay 第一次执行的延迟时间
     */
    public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) {ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setFixedDelay(fixedDelay);
        if (initialDelay != null && initialDelay.length == 1) {scheduledSource.setInitialDelay(initialDelay[0]);
        } else if (initialDelay != null && initialDelay.length > 1) {throw new SuperScheduledException("第一次执行的延迟时间只能传入一个参数");
        }

        addScheduled(name, scheduledSource);
    }

    /**
     * 以 fixedRate 类型启动 Scheduled
     *
     * @param name         scheduled 的名称
     * @param fixedRate    上一次开始执行之后多长时间再执行
     * @param initialDelay 第一次执行的延迟时间
     */
    public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) {ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setFixedRate(fixedRate);
        if (initialDelay != null && initialDelay.length == 1) {scheduledSource.setInitialDelay(initialDelay[0]);
        } else if (initialDelay != null && initialDelay.length > 1) {throw new SuperScheduledException("第一次执行的延迟时间只能传入一个参数");
        }

        addScheduled(name, scheduledSource);
    }

    /**
     * 手动执行一次工作
     *
     * @param name scheduled 的名称
     */
    public void runScheduled(String name) {Runnable runnable = superScheduledConfig.getRunnable(name);
        runnable.run();}
}
2、加强接口实现

增强器实现的整体思路与 SpringAop 的思路统一,实现没有 Aop 简单

(1) 加强接口

@Order(Ordered.HIGHEST_PRECEDENCE)
public interface BaseStrengthen {
    /**
     * 前置强化办法
     *
     * @param bean   bean 实例(或者是被代理的 bean)* @param method 执行的办法对象
     * @param args   办法参数
     */
    void before(Object bean, Method method, Object[] args);

    /**
     * 后置强化办法
     * 出现异常不会执行
     * 如果未出现异常,在 afterFinally 办法之后执行
     *
     * @param bean   bean 实例(或者是被代理的 bean)* @param method 执行的办法对象
     * @param args   办法参数
     */
    void after(Object bean, Method method, Object[] args);

    /**
     * 异样强化办法
     *
     * @param bean   bean 实例(或者是被代理的 bean)* @param method 执行的办法对象
     * @param args   办法参数
     */
    void exception(Object bean, Method method, Object[] args);

    /**
     * Finally 强化办法,出现异常也会执行
     *
     * @param bean   bean 实例(或者是被代理的 bean)* @param method 执行的办法对象
     * @param args   办法参数
     */
    void afterFinally(Object bean, Method method, Object[] args);
}

(2) 代理抽象类

public abstract class Point {
    /**
     * 定时工作名
     */
    private String superScheduledName;

    /**
     * 形象的执行办法,应用代理实现
     * @param runnable 定时工作执行器
     */
    public abstract Object invoke(SuperScheduledRunnable runnable);

    /* 一般的 get/sets 省略 */
}

(3) 调用链类

public class Chain {
    private List<Point> list;
    private int index = -1;
    /**
     * 索引自增 1
     */
    public int incIndex() {return ++index;}

    /**
     * 索引还原
     */
    public void resetIndex() {this.index = -1;}
}

(4) cglib 动静代理实现

应用 cglib 代理增强器,将增强器全副代理成调用链节点 Point

public class RunnableBaseInterceptor implements MethodInterceptor {
    /**
     * 定时工作执行器
     */
    private SuperScheduledRunnable runnable;
    /**
     * 定时工作加强类
     */
    private BaseStrengthen strengthen;

    @Override
    public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
        Object result;
        // 如果执行的是 invoke()办法
        if ("invoke".equals(method.getName())) {
         // 前置强化办法
            strengthen.before(obj, method, args);
            try {// 调用执行器中的 invoke()办法
                result = runnable.invoke();} catch (Exception e) {
             // 异样强化办法
                strengthen.exception(obj, method, args);
                throw new SuperScheduledException(strengthen.getClass() + "中强化执行时产生谬误", e);
            } finally {
             //Finally 强化办法,出现异常也会执行
                strengthen.afterFinally(obj, method, args);
            }
            // 后置强化办法
            strengthen.after(obj, method, args);

        } else {
         // 间接执行办法
            result = methodProxy.invokeSuper(obj, args);
        }
        return result;
    }

    public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) {
        this.runnable = runnable;
        if (BaseStrengthen.class.isAssignableFrom(object.getClass())) {this.strengthen = (BaseStrengthen) object;
        } else {throw new SuperScheduledException(object.getClass() + "对象不是 BaseStrengthen 类型");
        }
    }

    public RunnableBaseInterceptor() {}
}

(5) 定时工作执行器实现

public class SuperScheduledRunnable {
    /**
     * 原始的办法
     */
    private Method method;
    /**
     * 办法所在的 bean
     */
    private Object bean;
    /**
     * 增强器的调用链
     */
    private Chain chain;

    public Object invoke() {
        Object result;
        // 索引自增 1
        if (chain.incIndex() == chain.getList().size()) {
            // 调用链中的加强办法曾经全副执行完结
            try {
                // 调用链索引初始化
                chain.resetIndex();
                // 增强器全副执行结束,执行本来的办法
                result = method.invoke(bean);
            } catch (IllegalAccessException | InvocationTargetException e) {throw new SuperScheduledException(e.getLocalizedMessage());
            }
        } else {
            // 获取被代理后的办法增强器
            Point point = chain.getList().get(chain.getIndex());
            // 执行增强器代理
            // 增强器代理中,会回调办法执行器,造成调用链,逐个运行调用链中的增强器
            result = point.invoke(this);
        }
        return result;
    }

    /* 一般的 get/sets 省略 */
}

(6) 增强器代理逻辑

com.gyx.superscheduled.core.SuperScheduledApplicationRunner类中的代码片段

// 创立执行控制器
SuperScheduledRunnable runnable = new SuperScheduledRunnable();
runnable.setMethod(scheduledSource.getMethod());
runnable.setBean(scheduledSource.getBean());
// 用来寄存 增强器的代理对象
List<Point> points = new ArrayList<>(baseStrengthenBeanNames.length);
// 循环所有的增强器的 beanName
for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
 // 获取增强器的 bean 对象
    Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
    // 将增强器代理成 Point 节点
    Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
    proxy.setSuperScheduledName(name);
    // 增强器的代理对象缓存到 list 中
    points.add(proxy);
}
// 将增强器代理实例的汇合生成调用链
// 执行控制器中设置调用链
runnable.setChain(new Chain(points));

版权申明:本文为 CSDN 博主「敲代码的旺财」的原创文章,遵循 CC 4.0 BY-SA 版权协定,转载请附上原文出处链接及本申明。
原文链接:https://blog.csdn.net/qq_34886352/article/details/106494637

近期热文举荐:

1.1,000+ 道 Java 面试题及答案整顿(2022 最新版)

2. 劲爆!Java 协程要来了。。。

3.Spring Boot 2.x 教程,太全了!

4. 别再写满屏的爆爆爆炸类了,试试装璜器模式,这才是优雅的形式!!

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

正文完
 0