xxl-job官网
https://www.xuxueli.com/xxl-job/
调用xxl-job中的xxl-job-admin模块启用
引入依赖
<!-- 调度工作 --> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.3.0</version> </dependency>
配置信息(application.properties)
### 调度核心部署根地址 [选填]:如调度核心集群部署存在多个地址则用逗号分隔。执行器将会应用该地址进行"执行器心跳注册"和"工作后果回调";为空则敞开主动注册;xxl.job.admin.addresses=http://127.0.0.1:8883/xxl-job-admin### 执行器通信TOKEN [选填]:非空时启用;xxl.job.accessToken=default_token### 执行器AppName [选填]:执行器心跳注册分组根据;为空则敞开主动注册xxl.job.executor.appname=xxl-job-executor-sample### 执行器注册 [选填]:优先应用该配置作为注册地址,为空时应用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵便的反对容器类型执行器动静IP和动静映射端口问题。xxl.job.executor.address=### 执行器IP [选填]:默认为空示意主动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通信实用;地址信息用于 "执行器注册" 和 "调度核心申请并触发工作";xxl.job.executor.ip=### 执行器端口号 [选填]:小于等于0则主动获取;默认端口为9999,单机部署多个执行器时,留神要配置不同执行器端口;xxl.job.executor.port=9999### 执行器运行日志文件存储磁盘门路 [选填] :须要对该门路领有读写权限;为空则应用默认门路;xxl.job.executor.logpath=/Users/linyanxia/Downloads/common/log/jobhandler### 执行器日志文件保留天数 [选填] : 过期日志主动清理, 限度值大于等于3时失效; 否则, 如-1, 敞开主动清理性能;xxl.job.executor.logretentiondays=-1
配置类(XxlJobConfiguration)
@Slf4j@Configurationpublic class XxlJobConfiguration { @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { log.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; }}
调用xxl-job-admin模块的接口
@Component@Slf4jpublic class XxlJobUtil {// public static Logger logger = LoggerFactory.getLogger(ApiUtil.class); private static String cookie=""; private static String prePath = "/jobinfo"; private static String xxlJobAdminHost; public static JSONObject getPageList(Integer jobId,Integer pageSize,Integer pageNum) throws Exception { //jobGroup: 2 //triggerStatus: -1 //jobDesc: //executorHandler: //author: //start: 0 //length: 10 String path = prePath + "/pageList"; Map<String,Object> formMap = new HashMap<>(); formMap.put("jobGroup",1); formMap.put("triggerStatus",-1); formMap.put("start",(pageNum-1)*pageSize); formMap.put("start",(pageNum-1)*pageSize); formMap.put("length",pageSize); formMap.put("jobId",jobId); return doPost(xxlJobAdminHost,path,formMap); }// /findById public static JSONObject getJobById(Integer jobId) throws Exception { //jobGroup: 2 //triggerStatus: -1 //jobDesc: //executorHandler: //author: //start: 0 //length: 10 String path = prePath + "/findById"; Map<String,Object> formMap = new HashMap<>(); formMap.put("jobId",jobId); return doPost(xxlJobAdminHost,path,formMap); } /** * 新增/编辑工作 * @param url * @param requestInfo */ public static JSONObject addJob(String url,Map requestInfo) throws Exception { String path = "/jobinfo/add"; return doPost(url,path,requestInfo); } /** * 编辑工作 * /xxl-job-admin/jobinfo/update */ public static JSONObject editJobSchedule(Integer jobId,String schedule) throws Exception { JSONObject jsonObject = XxlJobUtil.getJobById(jobId); if (!jsonObject.getString("code").equals("200")){ throw new RuntimeException("没有该调度工作"); } Map<String,Object> formMap = new HashMap<>();// formMap.put("id",jobId); Iterator iter = jsonObject.getJSONObject("content").entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next();// System.out.println(entry.getKey().toString());// System.out.println(entry.getValue().toString()); if (entry.getKey().toString().contains("scheduleConf")){ formMap.put(entry.getKey().toString(),schedule); }else { if (entry.getKey().toString().contains("Time")||entry.getKey().toString().contains("time")){ continue; } formMap.put(entry.getKey().toString(),entry.getValue()); } } formMap.put("cronGen_display",schedule); formMap.put("schedule_conf_CRON",schedule);// System.out.println(jsonObject); //jobGroup: 1 //jobDesc: TB_SCYX_GCZB_TOP10_FACT //author: admin //alarmEmail: //scheduleType: CRON //scheduleConf: 1 17 11 ? * 3//= //cronGen_display: 1 17 11 ? * 3//= // schedule_conf_CRON: 1 17 11 ? * 3//= //schedule_conf_FIX_RATE://= //schedule_conf_FIX_DELAY: //executorHandler: testjob //executorParam: TB_SCYX_GCZB_TOP10_FACT //executorRouteStrategy: ROUND //childJobId: //misfireStrategy: DO_NOTHING //executorBlockStrategy: SERIAL_EXECUTION //executorTimeout: 0 //executorFailRetryCount: 0 //id: 26 String path = "/jobinfo/update"; JSONObject resultJson = doPost(xxlJobAdminHost,path,formMap); if (!resultJson.getString("code").equals("200")){ throw new Exception("更新任务调度工夫失败"); } return resultJson; } /** * 删除工作 * @param id * @return */ public static JSONObject deleteJob(int id) throws Exception { String path = "/jobinfo/remove"; Map<String,Object> formMap = new HashMap<>(); formMap.put("id",id); return doPost(xxlJobAdminHost,path,formMap); } /** * 单次执行工作 * @param url * @param id * @param executorParam * @return */ public static JSONObject triggerJob(String url,Integer id,String executorParam) throws Exception { String path = "/jobinfo/trigger"; Map<String,Object> formMap = new HashMap<>(); formMap.put("id",id); formMap.put("executorParam",executorParam); return doPost(url,path,formMap); } /** * 开始工作 * @param url * @param id * @return */ public static JSONObject startJob(String url,Integer id) throws Exception { String path = "/jobinfo/start"; Map<String,Object> formMap = new HashMap<>(); formMap.put("id",id); return doPost(url,path,formMap); } /** * 进行工作 * @param url * @param id * @return */ public static JSONObject stopJob(String url, Integer id) throws Exception { String path = "/jobinfo/stop"; Map<String,Object> formMap = new HashMap<>(); formMap.put("id",id); return doPost(url,path,formMap); } public static JSONObject getLogPage(Integer jobId,String filterTime,Integer pageNum,Integer pageSize) throws Exception { String path = "/joblog/pageList"; //jobGroup: 1 //jobId: 0 //logStatus: -1 //filterTime: 2021-09-01 00:00:00 - 2021-09-30 23:59:59 //start: 0 //length: 10 Map<String,Object> formMap = new HashMap<>(); formMap.put("jobId",jobId); formMap.put("jobGroup",1); formMap.put("logStatus",-1); formMap.put("start",pageNum); formMap.put("length",pageSize); formMap.put("filterTime",filterTime); return doPost(xxlJobAdminHost,path,formMap); } public static JSONObject getLogDetailCat(Integer logId,String executorAddress,Long triggerTime,Integer fromLineNum) throws Exception { String path = "/joblog/logDetailCat"; //executorAddress: http://192.168.14.207:9946/ //triggerTime: 1633679504000 //logId: 1157 //fromLineNum: 159 Map<String,Object> formMap = new HashMap<>(); formMap.put("executorAddress",executorAddress); formMap.put("triggerTime",triggerTime); formMap.put("logId",logId); formMap.put("fromLineNum",fromLineNum); return doPost(xxlJobAdminHost,path,formMap); } public static JSONObject doPost(String url,String path,Map<String,Object> formMap) throws Exception { String targetUrl = url + path; HttpResponse response = HttpRequest.post(targetUrl) .header("cookie", getCookie()) .form(formMap) .execute(); JSONObject result = JSONObject.parseObject(response.body()); // && result.getString("code").equals("200") if (response.getStatus() == 200){ return JSONObject.parseObject(response.body()); }else { log.info("申请失败,path:{}\n "+response.body(),path); throw new RuntimeException("申请失败\n "+response.body()); } } public static JSONObject doGet(String url,String path) { String targetUrl = url + path; JSONObject result = JSONObject.parseObject(HttpRequest.get(targetUrl) .header("cookie", getCookie()).execute().body()); return result; } public static String login(String url, String userName, String password) { String path = "/login"; String targetUrl = url + path; Map<String,Object> loginMap = new HashMap<>(); loginMap.put("userName",userName); loginMap.put("password",password); HttpResponse response = HttpRequest.post(targetUrl) .form(loginMap).execute(); if (response.getStatus() == 200) { List<HttpCookie> cookies = response.getCookies(); StringBuffer tmpcookies = new StringBuffer(); for (HttpCookie c : cookies) { tmpcookies.append(c.toString() + ";"); } cookie = tmpcookies.toString(); } else { cookie = ""; } return cookie; } public static String getCookie(){// System.out.println("getcookie:"+xxlJobAdminHost); if (StringUtils.isNotBlank(cookie)){ return cookie; }else { return login(xxlJobAdminHost,"admin","123456"); } } @Value("${xxl.job.admin.addresses}") public void setXxlJobAdminHost(String xxlJobAdminHost) { XxlJobUtil.xxlJobAdminHost = xxlJobAdminHost; }}
增加调度工作
Map<String,Object> requestInfo = new HashMap<>(); requestInfo.put("jobGroup",1); requestInfo.put("jobDesc",collectorTask.getTaskName()); requestInfo.put("executorRouteStrategy","ROUND"); requestInfo.put("scheduleType","CRON"); requestInfo.put("cronGen_display",collectorTask.getDispatchTime()); requestInfo.put("scheduleConf",collectorTask.getDispatchTime()); requestInfo.put("schedule_conf_CRON",collectorTask.getDispatchTime()); requestInfo.put("glueType","BEAN"); requestInfo.put("executorHandler","startTasks"); requestInfo.put("executorBlockStrategy","SERIAL_EXECUTION"); requestInfo.put("misfireStrategy","DO_NOTHING"); requestInfo.put("executorTimeout",0); requestInfo.put("executorFailRetryCount",0); requestInfo.put("author","admin"); requestInfo.put("alarmEmail",""); requestInfo.put("executorParam",id); requestInfo.put("glueRemark","GLUE代码初始化"); com.alibaba.fastjson.JSONObject response= XxlJobUtil.addJob(addresses,requestInfo); log.info("新增xxlJob工作 {}",response); if (!response.getString("code").equals("200")){ throw new Exception("新增任务调度失败"); }// 获取生成调度工作ID。依据此ID去执行调度工作 collectorTask.setXxlJobId(response.getInteger("content"));// 删除调度工作 XxlJobUtil.deleteJob(task_id.getXxlJobId());// 批改调度工作 XxlJobUtil.editJobSchedule(collectorTask.getXxlJobId(),collectorTask.getDispatchTime());
调度工作
@RestController@RequestMapping("/apiSyncTask")public class ApiSyncTaskController { @Value("${xxl.job.admin.addresses}") private String xxlJobAdminHost; @Autowired(required = false) private CollectorTaskMapper collectorTaskMapper;// 单次启动工作 @GetMapping("/trigger") public R triggerTask(Integer id, String executorParam) { try { System.out.println(xxlJobAdminHost); return R.ok(XxlJobUtil.triggerJob(xxlJobAdminHost, id, executorParam)); } catch (Exception e) { e.printStackTrace(); return R.failed(e.getMessage()); } }// 启动工作 @GetMapping("/start") public R startTask(Integer id) { try { JSONObject jsonObject = XxlJobUtil.startJob(xxlJobAdminHost, id); if (jsonObject.getString("code").equals("200")) { CollectorTask task = new CollectorTask(); task.setStates(1); collectorTaskMapper.update(task,new QueryWrapper<CollectorTask>().eq("XXL_JOB_ID",id)); return R.ok(jsonObject); } return R.failed(jsonObject); } catch (Exception e) { e.printStackTrace(); return R.failed(e.getMessage()); } }// 进行工作 @GetMapping("/stop") public R stopTask(Integer id) { try { JSONObject jsonObject = XxlJobUtil.stopJob(xxlJobAdminHost, id); if (jsonObject.getString("code").equals("200")) { CollectorTask task = new CollectorTask(); task.setStates(0); collectorTaskMapper.update(task,new QueryWrapper<CollectorTask>().eq("XXL_JOB_ID",id)); return R.ok(jsonObject); } else { return R.failed(jsonObject); } } catch (Exception e) { e.printStackTrace(); return R.failed(e.getMessage()); } }// 获取工作列表 @GetMapping("/getJobPage") public R getJobPage(Integer pageSize,Integer pageNum,Integer jobId){ try { return R.ok(XxlJobUtil.getPageList(jobId,pageSize,pageNum)); } catch (Exception e) { e.printStackTrace(); return R.failed(e.getMessage()); } }// 调度工作记录// 获取工作日志列表 @GetMapping("/getLogPage") public R getLogPage(Integer pageSize,Integer pageNum,Integer jobId,String filterTime){ try { return R.ok(XxlJobUtil.getLogPage(jobId, filterTime, (pageNum-1)*pageSize, pageSize)); } catch (Exception e) { e.printStackTrace(); return R.failed(e.getMessage()); } }// 获取工作 @GetMapping("/getJob") public R getLogPage(Integer jobId){ try { return R.ok(XxlJobUtil.getJobById(jobId)); } catch (Exception e) { e.printStackTrace(); return R.failed(e.getMessage()); } }// 获取工作日志详情 @GetMapping("/getLogDetail") public R getLogDetail(Integer logId,String executorAddress,Long triggerTime,Integer fromLineNum){ try { return R.ok(XxlJobUtil.getLogDetailCat(logId, executorAddress, triggerTime, fromLineNum)); } catch (Exception e) { e.printStackTrace(); return R.failed(e.getMessage()); } }}