关于多线程:springboot多线程

springboot多线程

  • 新建AsyncTaskConfig,开启@EnableAsync
  • 新建IAsyncService接口,及其实现类,新建办法,并开启 @Async
  • AsyncService,调用多线程办法

AsyncTaskConfig

@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {

    // ThredPoolTaskExcutor的解决流程
    // 当池子大小小于corePoolSize,就新建线程,并解决申请
    // 当池子大小等于corePoolSize,把申请放入workQueue中,池子里的闲暇线程就去workQueue中取工作并解决
    // 当workQueue放不下工作时,就新建线程入池,并解决申请,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做回绝解决
    // 当池子的线程数大于corePoolSize时,多余的线程会期待keepAliveTime长时间,如果无申请可解决就自行销毁

    @Override
    @Bean
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //设置外围线程数
        threadPool.setCorePoolSize(10);
        //设置最大线程数
        threadPool.setMaxPoolSize(15);
        //线程池所应用的缓冲队列
        threadPool.setQueueCapacity(20);
        //期待工作在关机时实现--表明期待所有线程执行完
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        // 等待时间 (默认为0,此时立刻进行),并没期待xx秒后强制进行
        threadPool.setAwaitTerminationSeconds(60);
        //  线程名称前缀
        threadPool.setThreadNamePrefix("mds-async-task-");
        // 初始化线程
        threadPool.initialize();
        return threadPool;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }

//    @Bean("doSomethingExecutor")
//    public Executor doSomethingExecutor() {
//        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//        // 外围线程数:线程池创立时候初始化的线程数
//        executor.setCorePoolSize(10);
//        // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过外围线程数的线程
//        executor.setMaxPoolSize(20);
//        // 缓冲队列:用来缓冲执行工作的队列
//        executor.setQueueCapacity(500);
//        // 容许线程的闲暇工夫60秒:当超过了外围线程之外的线程在闲暇工夫达到之后会被销毁
//        executor.setKeepAliveSeconds(60);
//        // 线程池名的前缀:设置好了之后能够不便咱们定位解决工作所在的线程池
//        executor.setThreadNamePrefix("do-something-");
//        // 缓冲队列满了之后的回绝策略:由调用线程解决(个别是主线程)
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//        executor.initialize();
//        return executor;
//    }

}

IAsyncService

public interface IAsyncService {
    Future<Long> saveItemNewInfoAsync(Long shopId) ;
}

@Slf4j
@Service
public class AsyncServiceImpl implements IAsyncService {

    @Value("${url.}")
    private String Url;
    @Value("${url..account}")
    private String Account;
    @Value("${url..appcode}")
    private String Apppcode;
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private ShopMapper ShopMapper;
    @Autowired
    private ShopNewInfoMapper ShopNewInfoMapper;
    @Autowired
    private ItemNewInfoMapper ItemNewInfoMapper;
  
    @Async
    @Override
    public Future<Long> saveItemNewInfoAsync(Long shopId) {
        String url=Url+"/item-new-info?account="+Account+"&appcode="+Apppcode;
        int pageNo=1;
        int pageSize=200;
        while (pageNo>0){
            String reqUrl=url+"&shopId="+shopId+"&pageNo="+pageNo+"&pageSize="+pageSize;
            log.info("开始 2、获取商品昨日最新数据 shopId={},以后页码={},每页数量={}",shopId,pageNo,pageSize);
            ParameterizedTypeReference<ItemNewInfoDto> reference = new ParameterizedTypeReference<ItemNewInfoDto>() {};
            ResponseEntity<ItemNewInfoDto> entity = restTemplate.exchange(reqUrl, HttpMethod.GET,null,reference);
            ItemNewInfoDto rsp = entity.getBody();
            List<ItemNewInfoWithBLOBs> rspList=new ArrayList<>();
            if( rsp.getResult()==null|| CollectionUtils.isEmpty(rsp.getResult().getResultList())){
                pageNo=0;
            }else {
                pageNo++;

                rspList=rsp.getResult().getResultList();
                //if(rsp.getResult() instanceof  ArrayList<?>){
                List<Object> ids= rspList.stream().map(t->t.getId()).collect(Collectors.toList());
                Mapper.markFail("_item_new_info","id",ids);
                ItemNewInfoMapper.insertBatch(rspList);
            }
            log.info("完结 2、获取商品昨日最新数据 shopId={},以后页码={},每页数量={},result={}",shopId,pageNo,pageSize,rspList.size());
        }
        return new AsyncResult<>(shopId);
    }
}

ServiceImpl

 @Autowired
 private IAsyncService  AsyncService;
 private final Integer  SELECT_LIMIT=1000;
        
/**
     * 2、获取商品昨日最新数据
     * @return
     */
    @Override
    public Integer saveItemNewInfo(){
        String url=Url+"/item-new-info?account="+Account+"&appcode="+Apppcode;
        int count=1;
        while (count>0){
            List<Shop> findList = this.findShopPage(SELECT_LIMIT,(count-1)*SELECT_LIMIT);
            if(findList.size()==0){
                count=0;
            }else {
                count++;
            }
            Integer threadCount=5;
            int times = (int) Math.ceil(findList.size() / 5.0);
            for(int i=0;i<times;i++){
                List<Long> shopIds= findList.subList(i*threadCount,Math.min ((i+1)*threadCount,findList.size()))
                        .stream()
                        .map(t->t.getShopId()).collect(Collectors.toList());
                this.doSaveItemNewInfo(shopIds);
            }
        }
        return 1;
    }


    @SneakyThrows
    private Integer doSaveItemNewInfo(List<Long> shopIds){
        List<Future> futureList=new ArrayList<>();
        for(Long shopId :shopIds){
            futureList.add(AsyncService.saveItemNewInfoAsync(shopId)) ;
        }
        //判断线程池全副执行结束
        for (Future<Long> future : futureList) {
            while (true) {
                //CPU高速轮询:每个future都并发轮循,判断实现状态而后获取后果,这一行,是本实现计划的精华所在。即有10个future在高速轮询,实现一个future的获取后果,就敞开一个轮询
                if (future.isDone() && !future.isCancelled()) {
                    //获取future胜利实现状态,如果想要限度每个工作的超时工夫,勾销本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异样应用即可。
                    Long shopId = future.get();
                    log.info("2、获取商品昨日最新数据 实现线程工作 ,shopId=" + future.get());
//                    futureList.remove(future);
                    break;//以后future获取后果结束,跳出while
                } else {
                    //每次轮询劳动1毫秒(CPU纳秒级),防止CPU高速轮循耗空CPU---》老手别忘记这个
                    Thread.sleep(1);
                }
            }
        }
        return  1;
    }

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理