线程处理二三事一

37次阅读

共计 6751 个字符,预计需要花费 17 分钟才能阅读完成。

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。以上是《阿里巴巴》java 开发手册中关于并发的强制规定,为什么做这个规定?一下是我个人解惑和做出的相关延伸,此篇作为我个人学习笔记的开篇。

一. 为什么要手动创建:
1. 单例线程池

Executor executor = Executors.newSingleThreadExecutor();

2. 可扩容线程池

Executor executor1 = Executors.newCachedThreadPool();

3. 定长线程池

Executor executor2 = Executors.newFixedThreadPool(10);

线程池的工作原理如下:

以上是三种平时常见的线程池,下面看创建源码:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

都是和 ThreadPoolExecutor 相关,ThreadPoolExecutor 构造函数:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

采用 Executor 创建线程池会出现资源耗尽即 OOM 错误,主要出问题的地方是阻塞队列的问题即 workQueue。
1. 看 newSingleThreadExecutor 创建源码, 线程池核心线程数和最大线程数都是 1, 剩下多余的线程任务全都塞到 LinkedBlockingQueue<Runnable>()中,以下是 LinkedBlockingQueue<Runnable>()无参构造函数源码,

    public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
    }

Integer.MAX_VALUE=2147483647, 根据线程池的工作原理,如果自动创建线程池即表示不限制 LinkedBlockingQueue 的长度,这个阻塞队列在高并发的情况下可能会撑爆线程池。
2. 看 newFixedThreadPool 创建源码,和 1 同理。
3. 看 newCachedThreadPool 源码,此线程池允许最多 Integer.MAX_VALUE 个线程的创建,工作队列是 SynchronousQueue,以下是 SynchronousQueue 创建源码:

    /**
     * Creates a {@code SynchronousQueue} with nonfair access policy.
     */
    public SynchronousQueue() {this(false);
    }

    /**
     * Creates a {@code SynchronousQueue} with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}

看注释,SynchronousQueue 在这里是一个非公平的栈,这里 SynchronousQueue 的问题不深究,日后会专门学习 jdk 中阻塞队列
newCachedThreadPool 线程池的大小由线程池最大线程数控制,自动创建的线程池很可能创建非常多的线程撑爆内存。
以上简单从源码角度回答问题原因。
自动创建线程池就是直接使用 Executors 创建,弊端以上是简单分析,但是看源码,手动创建线程池就是脱了一层壳直接 new ThreadPoolExecutor 而已,本质一样的。

二.ThreadFactory
看上面 ThreadPoolExecutor 的构造函数源码,我发现了 ThreadFactory。~~~~
我们创建一个 task,交付给 Executor,Executor 会帮我们创建一个线程去执行这个任务。ThreadFactory 就是一个创建线程的工厂。
先看源码

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

/**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

ThreadFactory 是个接口,DefaultThreadFactory 是 JUC 包中对此接口的实现。
先简单看下这个工厂中的 newThread 方法,入参实现了 Runnable 接口,方法里有 Thread 的构造函数,两个 if 判断,默认情况下是非守护线程,线程优先级是 5。线程的名字就是 1 的数据累加,在根据 jvm 是否开启了安全管理器(jvm 默认关闭)来决定线程组。如果我们不自定一个 ThreadFactory,那么无论是采用自动创建线程池 Executor,还是手动创建线程池 ThreadPoolExecutor,都是由 DefaultThreadFactory 来创建线程。
手动创建线程和自动创建线程有什么不一样的吗?最粗略简单的不一样就是线程名字的不一样,不同业务需要的线程池的线程取不同的名字,这样在需要用到线程池的复杂业务情况下,日志会清爽很多,在实际开发中,这个优势很重要。
guava 提供一个 ThreadFactoryBuilder 可以帮助我们实现自己的 ThreadFactory。
ThreadFactoryBuilder 中核心功能源码

private static ThreadFactory build(ThreadFactoryBuilder builder) {
        final String nameFormat = builder.nameFormat;
        final Boolean daemon = builder.daemon;
        final Integer priority = builder.priority;
        final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;
        final ThreadFactory backingThreadFactory = builder.backingThreadFactory != null ? builder.backingThreadFactory : Executors.defaultThreadFactory();
        final AtomicLong count = nameFormat != null ? new AtomicLong(0L) : null;
        return new ThreadFactory() {public Thread newThread(Runnable runnable) {Thread thread = backingThreadFactory.newThread(runnable);
                if (nameFormat != null) {thread.setName(ThreadFactoryBuilder.format(nameFormat, count.getAndIncrement()));
                }

                if (daemon != null) {thread.setDaemon(daemon);
                }

                if (priority != null) {thread.setPriority(priority);
                }

                if (uncaughtExceptionHandler != null) {thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
                }

                return thread;
            }
        };
    }

build 设计模式,我们可以自己去定义线程名字,是否为守护线程,线程优先级,线程的异常处理器。如果要选择自定义 ThreadFactory,强烈推荐 ThreadFactoryBuilder。

三.submit 和 execute
task 提交到线程池有两种方式
1.submit 2.execute
demo:

public static void testSubmitAndExecute() throws ExecutionException, InterruptedException {ThreadFactoryBuilder orderThreadFactoryBuilder = new ThreadFactoryBuilder();
        ThreadFactory orderThread = orderThreadFactoryBuilder.setNameFormat("---order---").build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 30, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),
                orderThread);
        for (int i = 0; i < 5; i++) {
            int fina = i;
            Runnable runnable = () -> {if (fina == 3) {throw new RuntimeException("demo exception");
                } else {System.out.println("runnable---" + fina + "---" + Thread.currentThread().getName());
                    System.out.println();}
            };
//            threadPoolExecutor.submit(runnable);
            threadPoolExecutor.execute(runnable);
//            Future<?> submit = threadPoolExecutor.submit(runnable);
//            System.out.println(submit.get());
        }
    }

如果采用了 execute 执行,那么结果就是
发现当某个线程出现异常的时候,其实并不会影响到其他线程的执行。这里是直接抛出了一个异常。也可以通过 ThreadFactory 来定义 Thread 自己的 UncaughtExceptionHandler 属性,即实现 UncaughtExceptionHandler 接口的类。

改成 submit 形式处理:

            Future<?> submit = threadPoolExecutor.submit(runnable);
            System.out.println(submit.get());


发现 get 会拿到一个返回结果,这里是 null,这表示线程执行成功没有异常,而且会卡住,即当第 i = 3 的线程异常之后,程序就抛出异常停止了。这是和 execute 不一样的地方。

改成 submit 形式处理 2:

            threadPoolExecutor.submit(runnable);


看日志完全看不到异常,明明是有异常抛出的。没有异常打印,但是不影响其他线程的运行。

这里简单记录下实际线程池提交的不同情况。


写到目前为止,从手册中的一条规则开始思考,看源码,简单了解了为什么,主要是跟线程池的工作队列有关,线程池中的线程怎么来的,找到了 ThreadFactory,

1.newCachedThreadPool 的 SynchronousQueue 这个需要学习
2.ThreadGroup 需要学习,System.getSecurityManager()java 安全器需要去了解。
3. 线程池 submit 和 execute 两种方式在源码层析的学习。
4.Thread,runnable,callable 这三种的本质不同和联系。

正文完
 0