springboot多线程
- 新建AsyncTaskConfig,开启@EnableAsync
- 新建IAsyncService接口,及其实现类,新建办法,并开启 @Async
- AsyncService,调用多线程办法
AsyncTaskConfig
@Configuration@EnableAsyncpublic class AsyncTaskConfig implements AsyncConfigurer { // ThredPoolTaskExcutor的解决流程 // 当池子大小小于corePoolSize,就新建线程,并解决申请 // 当池子大小等于corePoolSize,把申请放入workQueue中,池子里的闲暇线程就去workQueue中取工作并解决 // 当workQueue放不下工作时,就新建线程入池,并解决申请,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做回绝解决 // 当池子的线程数大于corePoolSize时,多余的线程会期待keepAliveTime长时间,如果无申请可解决就自行销毁 @Override @Bean public Executor getAsyncExecutor() { ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); //设置外围线程数 threadPool.setCorePoolSize(10); //设置最大线程数 threadPool.setMaxPoolSize(15); //线程池所应用的缓冲队列 threadPool.setQueueCapacity(20); //期待工作在关机时实现--表明期待所有线程执行完 threadPool.setWaitForTasksToCompleteOnShutdown(true); // 等待时间 (默认为0,此时立刻进行),并没期待xx秒后强制进行 threadPool.setAwaitTerminationSeconds(60); // 线程名称前缀 threadPool.setThreadNamePrefix("mds-async-task-"); // 初始化线程 threadPool.initialize(); return threadPool; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; }// @Bean("doSomethingExecutor")// public Executor doSomethingExecutor() {// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// // 外围线程数:线程池创立时候初始化的线程数// executor.setCorePoolSize(10);// // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过外围线程数的线程// executor.setMaxPoolSize(20);// // 缓冲队列:用来缓冲执行工作的队列// executor.setQueueCapacity(500);// // 容许线程的闲暇工夫60秒:当超过了外围线程之外的线程在闲暇工夫达到之后会被销毁// executor.setKeepAliveSeconds(60);// // 线程池名的前缀:设置好了之后能够不便咱们定位解决工作所在的线程池// executor.setThreadNamePrefix("do-something-");// // 缓冲队列满了之后的回绝策略:由调用线程解决(个别是主线程)// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());// executor.initialize();// return executor;// }}
IAsyncService
public interface IAsyncService { Future<Long> saveItemNewInfoAsync(Long shopId) ;}@Slf4j@Servicepublic class AsyncServiceImpl implements IAsyncService { @Value("${url.}") private String Url; @Value("${url..account}") private String Account; @Value("${url..appcode}") private String Apppcode; @Autowired private RestTemplate restTemplate; @Autowired private ShopMapper ShopMapper; @Autowired private ShopNewInfoMapper ShopNewInfoMapper; @Autowired private ItemNewInfoMapper ItemNewInfoMapper; @Async @Override public Future<Long> saveItemNewInfoAsync(Long shopId) { String url=Url+"/item-new-info?account="+Account+"&appcode="+Apppcode; int pageNo=1; int pageSize=200; while (pageNo>0){ String reqUrl=url+"&shopId="+shopId+"&pageNo="+pageNo+"&pageSize="+pageSize; log.info("开始 2、获取商品昨日最新数据 shopId={},以后页码={},每页数量={}",shopId,pageNo,pageSize); ParameterizedTypeReference<ItemNewInfoDto> reference = new ParameterizedTypeReference<ItemNewInfoDto>() {}; ResponseEntity<ItemNewInfoDto> entity = restTemplate.exchange(reqUrl, HttpMethod.GET,null,reference); ItemNewInfoDto rsp = entity.getBody(); List<ItemNewInfoWithBLOBs> rspList=new ArrayList<>(); if( rsp.getResult()==null|| CollectionUtils.isEmpty(rsp.getResult().getResultList())){ pageNo=0; }else { pageNo++; rspList=rsp.getResult().getResultList(); //if(rsp.getResult() instanceof ArrayList<?>){ List<Object> ids= rspList.stream().map(t->t.getId()).collect(Collectors.toList()); Mapper.markFail("_item_new_info","id",ids); ItemNewInfoMapper.insertBatch(rspList); } log.info("完结 2、获取商品昨日最新数据 shopId={},以后页码={},每页数量={},result={}",shopId,pageNo,pageSize,rspList.size()); } return new AsyncResult<>(shopId); }}
ServiceImpl
@Autowired private IAsyncService AsyncService; private final Integer SELECT_LIMIT=1000; /** * 2、获取商品昨日最新数据 * @return */ @Override public Integer saveItemNewInfo(){ String url=Url+"/item-new-info?account="+Account+"&appcode="+Apppcode; int count=1; while (count>0){ List<Shop> findList = this.findShopPage(SELECT_LIMIT,(count-1)*SELECT_LIMIT); if(findList.size()==0){ count=0; }else { count++; } Integer threadCount=5; int times = (int) Math.ceil(findList.size() / 5.0); for(int i=0;i<times;i++){ List<Long> shopIds= findList.subList(i*threadCount,Math.min ((i+1)*threadCount,findList.size())) .stream() .map(t->t.getShopId()).collect(Collectors.toList()); this.doSaveItemNewInfo(shopIds); } } return 1; } @SneakyThrows private Integer doSaveItemNewInfo(List<Long> shopIds){ List<Future> futureList=new ArrayList<>(); for(Long shopId :shopIds){ futureList.add(AsyncService.saveItemNewInfoAsync(shopId)) ; } //判断线程池全副执行结束 for (Future<Long> future : futureList) { while (true) { //CPU高速轮询:每个future都并发轮循,判断实现状态而后获取后果,这一行,是本实现计划的精华所在。即有10个future在高速轮询,实现一个future的获取后果,就敞开一个轮询 if (future.isDone() && !future.isCancelled()) { //获取future胜利实现状态,如果想要限度每个工作的超时工夫,勾销本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异样应用即可。 Long shopId = future.get(); log.info("2、获取商品昨日最新数据 实现线程工作 ,shopId=" + future.get());// futureList.remove(future); break;//以后future获取后果结束,跳出while } else { //每次轮询劳动1毫秒(CPU纳秒级),防止CPU高速轮循耗空CPU---》老手别忘记这个 Thread.sleep(1); } } } return 1; }