Java-线程池和CountDownLatch结合返回执行结果

45次阅读

共计 2189 个字符,预计需要花费 6 分钟才能阅读完成。

CountDownLatch 和线程池结合,返回执行结果
优雅关闭线程池,参考 https://www.cnblogs.com/goodA…

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TaskWorkerMain {
    static ThreadPoolExecutor taskExecutor = new ThreadPoolExecutor(20, 50, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(100));

    public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(3);
        TaskWorker worker1 = new TaskWorker(latch, "1");
        TaskWorker worker2 = new TaskWorker(latch, "2");
        TaskWorker worker3 = new TaskWorker(latch, "3");
        // 利用线程池执行线程
        taskExecutor.execute(worker1);
        taskExecutor.execute(worker2);
        taskExecutor.execute(worker3);
        try {latch.await(5000, TimeUnit.MILLISECONDS);// 设置超时时间为 5 秒
        } catch (InterruptedException e) {e.printStackTrace();
        } finally {if (latch.getCount() == 0f) {System.out.println("worker1:" + worker1.getResultMsg() + "\nworker2:" + worker2.getResultMsg()
                        + "\nworker3:" + worker3.getResultMsg());
            }
            taskExecutor.shutdown(); // Disable new tasks from being submitted
            // 设定最大重试次数
            try {
                // 等待 60 s
                if (!taskExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
                    // 调用 shutdownNow 取消正在执行的任务
                    taskExecutor.shutdownNow();
                    // 再次等待 60 s,如果还未结束,可以再次尝试,或则直接放弃
                    if (!taskExecutor.awaitTermination(60, TimeUnit.SECONDS))
                        System.err.println("线程池任务未正常执行结束");
                }
            } catch (InterruptedException ie) {
                // 重新调用 shutdownNow
                taskExecutor.shutdownNow();}
        }
    }
}

class TaskWorker implements Runnable{

    private CountDownLatch countDownLatch;
    private String requestParam;
    private String resultMsg = "";
    private String resultErrMsg;
    
    public TaskWorker(CountDownLatch countDownLatch, String requestParam) {super();
        this.countDownLatch = countDownLatch;
        this.requestParam = requestParam;
    }

    @Override
    public void run() {
        String result = "";
        try {
            //TODO 逻辑代码
            result = doRealLogic(requestParam);
            
            // 设置返回值
            setResultMsg(result);
        } catch (Exception e) {setResultErrMsg(e.getMessage());
        }finally {countDownLatch.countDown();    
        }
    }

    /**
     * 真实逻辑代码
     * @param requestParam
     * @return
     */
    private String doRealLogic(String requestParam) {return requestParam;}

    public String getResultMsg() {return resultMsg;}

    public void setResultMsg(String resultMsg) {this.resultMsg = resultMsg;}

    public String getResultErrMsg() {return resultErrMsg;}

    public void setResultErrMsg(String resultErrMsg) {this.resultErrMsg = resultErrMsg;}    
}

正文完
 0