共计 4605 个字符,预计需要花费 12 分钟才能阅读完成。
springboot 多线程
- 新建 AsyncTaskConfig,开启 @EnableAsync
- 新建 IAsyncService 接口,及其实现类,新建办法,并开启 @Async
- AsyncService,调用多线程办法
AsyncTaskConfig
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
// ThredPoolTaskExcutor 的解决流程
// 当池子大小小于 corePoolSize,就新建线程,并解决申请
// 当池子大小等于 corePoolSize,把申请放入 workQueue 中,池子里的闲暇线程就去 workQueue 中取工作并解决
// 当 workQueue 放不下工作时,就新建线程入池,并解决申请,如果池子大小撑到了 maximumPoolSize,就用 RejectedExecutionHandler 来做回绝解决
// 当池子的线程数大于 corePoolSize 时,多余的线程会期待 keepAliveTime 长时间,如果无申请可解决就自行销毁
@Override
@Bean
public Executor getAsyncExecutor() {ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
// 设置外围线程数
threadPool.setCorePoolSize(10);
// 设置最大线程数
threadPool.setMaxPoolSize(15);
// 线程池所应用的缓冲队列
threadPool.setQueueCapacity(20);
// 期待工作在关机时实现 -- 表明期待所有线程执行完
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间(默认为 0,此时立刻进行),并没期待 xx 秒后强制进行
threadPool.setAwaitTerminationSeconds(60);
// 线程名称前缀
threadPool.setThreadNamePrefix("mds-async-task-");
// 初始化线程
threadPool.initialize();
return threadPool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return null;}
// @Bean("doSomethingExecutor")
// public Executor doSomethingExecutor() {// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// // 外围线程数:线程池创立时候初始化的线程数
// executor.setCorePoolSize(10);
// // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过外围线程数的线程
// executor.setMaxPoolSize(20);
// // 缓冲队列:用来缓冲执行工作的队列
// executor.setQueueCapacity(500);
// // 容许线程的闲暇工夫 60 秒:当超过了外围线程之外的线程在闲暇工夫达到之后会被销毁
// executor.setKeepAliveSeconds(60);
// // 线程池名的前缀:设置好了之后能够不便咱们定位解决工作所在的线程池
// executor.setThreadNamePrefix("do-something-");
// // 缓冲队列满了之后的回绝策略:由调用线程解决(个别是主线程)// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// executor.initialize();
// return executor;
// }
}
IAsyncService
public interface IAsyncService {Future<Long> saveItemNewInfoAsync(Long shopId) ;
}
@Slf4j
@Service
public class AsyncServiceImpl implements IAsyncService {@Value("${url.}")
private String Url;
@Value("${url..account}")
private String Account;
@Value("${url..appcode}")
private String Apppcode;
@Autowired
private RestTemplate restTemplate;
@Autowired
private ShopMapper ShopMapper;
@Autowired
private ShopNewInfoMapper ShopNewInfoMapper;
@Autowired
private ItemNewInfoMapper ItemNewInfoMapper;
@Async
@Override
public Future<Long> saveItemNewInfoAsync(Long shopId) {
String url=Url+"/item-new-info?account="+Account+"&appcode="+Apppcode;
int pageNo=1;
int pageSize=200;
while (pageNo>0){
String reqUrl=url+"&shopId="+shopId+"&pageNo="+pageNo+"&pageSize="+pageSize;
log.info("开始 2、获取商品昨日最新数据 shopId={}, 以后页码 ={}, 每页数量 ={}",shopId,pageNo,pageSize);
ParameterizedTypeReference<ItemNewInfoDto> reference = new ParameterizedTypeReference<ItemNewInfoDto>() {};
ResponseEntity<ItemNewInfoDto> entity = restTemplate.exchange(reqUrl, HttpMethod.GET,null,reference);
ItemNewInfoDto rsp = entity.getBody();
List<ItemNewInfoWithBLOBs> rspList=new ArrayList<>();
if(rsp.getResult()==null|| CollectionUtils.isEmpty(rsp.getResult().getResultList())){pageNo=0;}else {
pageNo++;
rspList=rsp.getResult().getResultList();
//if(rsp.getResult() instanceof ArrayList<?>){List<Object> ids= rspList.stream().map(t->t.getId()).collect(Collectors.toList());
Mapper.markFail("_item_new_info","id",ids);
ItemNewInfoMapper.insertBatch(rspList);
}
log.info("完结 2、获取商品昨日最新数据 shopId={}, 以后页码 ={}, 每页数量 ={},result={}",shopId,pageNo,pageSize,rspList.size());
}
return new AsyncResult<>(shopId);
}
}
ServiceImpl
@Autowired
private IAsyncService AsyncService;
private final Integer SELECT_LIMIT=1000;
/**
* 2、获取商品昨日最新数据
* @return
*/
@Override
public Integer saveItemNewInfo(){
String url=Url+"/item-new-info?account="+Account+"&appcode="+Apppcode;
int count=1;
while (count>0){List<Shop> findList = this.findShopPage(SELECT_LIMIT,(count-1)*SELECT_LIMIT);
if(findList.size()==0){count=0;}else {count++;}
Integer threadCount=5;
int times = (int) Math.ceil(findList.size() / 5.0);
for(int i=0;i<times;i++){List<Long> shopIds= findList.subList(i*threadCount,Math.min ((i+1)*threadCount,findList.size()))
.stream()
.map(t->t.getShopId()).collect(Collectors.toList());
this.doSaveItemNewInfo(shopIds);
}
}
return 1;
}
@SneakyThrows
private Integer doSaveItemNewInfo(List<Long> shopIds){List<Future> futureList=new ArrayList<>();
for(Long shopId :shopIds){futureList.add(AsyncService.saveItemNewInfoAsync(shopId)) ;
}
// 判断线程池全副执行结束
for (Future<Long> future : futureList) {while (true) {
//CPU 高速轮询:每个 future 都并发轮循,判断实现状态而后获取后果,这一行,是本实现计划的精华所在。即有 10 个 future 在高速轮询,实现一个 future 的获取后果,就敞开一个轮询
if (future.isDone() && !future.isCancelled()) {// 获取 future 胜利实现状态,如果想要限度每个工作的超时工夫,勾销本行的状态判断 +future.get(1000*1, TimeUnit.MILLISECONDS)+catch 超时异样应用即可。Long shopId = future.get();
log.info("2、获取商品昨日最新数据 实现线程工作,shopId=" + future.get());
// futureList.remove(future);
break;// 以后 future 获取后果结束,跳出 while
} else {
// 每次轮询劳动 1 毫秒(CPU 纳秒级),防止 CPU 高速轮循耗空 CPU---》老手别忘记这个
Thread.sleep(1);
}
}
}
return 1;
}
正文完