共计 5084 个字符,预计需要花费 13 分钟才能阅读完成。
我曾经对自己的小弟说,如果你实在搞不清楚什么时候用 HashMap,什么时候用 ConcurrentHashMap,那么就用后者,你的代码 bug 会很少。
他问我:ConcurrentHashMap 是什么?-.-
编程不是炫技。大多数情况下,怎么把代码写简单,才是能力。
多线程生来就是复杂的,也是容易出错的。一些难以理解的概念,要规避。本文不讲基础知识,因为你手里就有 jdk 的源码。
线程
Thread
第一类就是 Thread 类。大家都知道有两种实现方式。第一可以继承 Thread 覆盖它的 run 方法;第二种是实现 Runnable 接口,实现它的 run 方法;而第三种创建线程的方法,就是通过线程池。
我们的具体代码实现,就放在 run 方法中。
我们关注两种情况。一个是线程退出条件,一个是异常处理情况。
线程退出
有的 run 方法执行完成后,线程就会退出。但有的 run 方法是永远不会结束的。结束一个线程肯定不是通过 Thread.stop()方法,这个方法已经在 java1.2 版本就废弃了。所以我们大体有两种方式控制线程。
定义退出标志放在 while 中
代码一般长这样。
private volatile boolean flag= true;
public void run() {
while (flag) {
}
}
标志一般使用 volatile 进行修饰,使其读可见,然后通过设置这个值来控制线程的运行,这已经成了约定俗成的套路。
使用 interrupt 方法终止线程
类似这种。
while(!isInterrupted()){……}
对于 InterruptedException,比如 Thread.sleep 所抛出的,我们一般是补获它,然后静悄悄的忽略。中断允许一个可取消任务来清理正在进行的工作,然后通知其他任务它要被取消,最后才终止,在这种情况下,此类异常需要被仔细处理。
interrupt 方法不一定会真正”中断”线程,它只是一种协作机制。interrupt 方法通常不能中断一些处于阻塞状态的 I / O 操作。比如写文件,或者 socket 传输等。这种情况,需要同时调用正在阻塞操作的 close 方法,才能够正常退出。
interrupt 系列使用时候一定要注意,会引入 bug,甚至死锁。
异常处理
java 中会抛出两种异常。一种是必须要捕获的,比如 InterruptedException,否则无法通过编译;另外一种是可以处理也可以不处理的,比如 NullPointerException 等。
在我们的任务运行中,很有可能抛出这两种异常。对于第一种异常,是必须放在 try,catch 中的。但第二种异常如果不去处理的话,会影响任务的正常运行。
有很多同学在处理循环的任务时,没有捕获一些隐式的异常,造成任务在遇到异常的情况下,并不能继续执行下去。如果不能确定异常的种类,可以直接捕获 Exception 或者更通用的 Throwable。
while(!isInterrupted()){
try{
……
}catch(Exception ex){
……
}
}
同步方式
java 中实现同步的方式有很多,大体分为以下几种。
synchronized 关键字
wait、notify 等
Concurrent 包中的 ReentrantLock
volatile 关键字
ThreadLocal 局部变量
生产者、消费者是 wait、notify 最典型的应用场景,这些函数的调用,是必须要放在 synchronized 代码块里才能够正常运行的。它们同信号量一样,大多数情况下属于炫技,对代码的可读性影响较大,不推荐。关于 ObjectMonitor 相关的几个函数,只要搞懂下面的图,就基本 ok 了。
使用 ReentrantLock 最容易发生错误的就是忘记在 finally 代码块里关闭锁。大多数同步场景下,使用 Lock 就足够了,而且它还有读写锁的概念进行粒度上的控制。我们一般都使用非公平锁,让任务自由竞争。非公平锁性能高于公平锁性能,非公平锁能更充分的利用 cpu 的时间片,尽量的减少 cpu 空闲的状态时间。非公平锁还会造成饿死现象:有些任务一直获取不到锁。
synchronized 通过锁升级机制,速度不见得就比 lock 慢。而且,通过 jstack,能够方便的看到其堆栈,使用还是比较广泛。
volatile 总是能保证变量的读可见,但它的目标是基本类型和它锁的基本对象。假如是它修饰的是集合类,比如 Map,那么它保证的读可见是 map 的引用,而不是 map 对象,这点一定要注意。
synchronized 和 volatile 都体现在字节码上(monitorenter、monitorexit),主要是加入了内存屏障。而 Lock,是纯粹的 java api。
ThreadLocal 很方便,每个线程一份数据,也很安全,但要注意内存泄露。假如线程存活时间长,我们要保证每次使用完 ThreadLocal,都调用它的 remove()方法(具体来说是 expungeStaleEntry),来清除数据。
关于 Concurrent 包
concurrent 包是在 AQS 的基础上搭建起来的,AQS 提供了一种实现阻塞锁和一系列依赖 FIFO 等待队列的同步器的框架。
线程池
最全的线程池大概有 7 个参数,想要合理使用线程池,肯定不会不会放过这些参数的优化。
线程池参数
concurrent 包最常用的就是线程池,平常工作建议直接使用线程池,Thread 类就可以降低优先级了。我们常用的主要有 newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool、调度等,使用 Executors 工厂类创建。
newSingleThreadExecutor 可以用于快速创建一个异步线程,非常方便。而 newCachedThreadPool 永远不要用在高并发的线上环境,它用的是无界队列对任务进行缓冲,可能会挤爆你的内存。
我习惯性自定义 ThreadPoolExecutor,也就是参数最全的那个。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
假如我的任务可以预估,corePoolSize,maximumPoolSize 一般都设成一样大的,然后存活时间设的特别的长。可以避免线程频繁创建、关闭的开销。I/ O 密集型和 CPU 密集型的应用线程开的大小是不一样的,一般 I / O 密集型的应用线程就可以开的多一些。
threadFactory 我一般也会定义一个,主要是给线程们起一个名字。这样,在使用 jstack 等一些工具的时候,能够直观的看到我所创建的线程。
监控
高并发下的线程池,最好能够监控起来。可以使用日志、存储等方式保存下来,对后续的问题排查帮助很大。
通常,可以通过继承 ThreadPoolExecutor,覆盖 beforeExecute、afterExecute、terminated 方法,达到对线程行为的控制和监控。
线程池饱和策略
最容易被遗忘的可能就是线程的饱和策略了。也就是线程和缓冲队列的空间全部用完了,新加入的任务将如何处置。jdk 默认实现了 4 种策略,默认实现的是 AbortPolicy,也就是直接抛出异常。下面介绍其他几种。
DiscardPolicy 比 abort 更加激进,直接丢掉任务,连异常信息都没有。
CallerRunsPolicy 由调用的线程来处理这个任务。比如一个 web 应用中,线程池资源占满后,新进的任务将会在 tomcat 线程中运行。这种方式能够延缓部分任务的执行压力,但在更多情况下,会直接阻塞主线程的运行。
DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
很多情况下,这些饱和策略可能并不能满足你的需求,你可以自定义自己的策略,比如将任务持久化到一些存储中。
阻塞队列
阻塞队列会对当前的线程进行阻塞。当队列中有元素后,被阻塞的线程会自动被唤醒,这极大的提高的编码的灵活性,非常方便。在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使用最经典的场景就是 socket 数据的读取、解析,读数据的线程不断将数据放入队列,解析线程不断从队列取数据进行处理。
ArrayBlockingQueue 对访问者的调用默认是不公平的,我们可以通过设置构造方法参数将其改成公平阻塞队列。
LinkedBlockingQueue 队列的默认最大长度为 Integer.MAX_VALUE,这在用做线程池队列的时候,会比较危险。
SynchronousQueue 是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。队列本身不存储任何元素,吞吐量非常高。对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。它更像是一个管道,在一些通讯框架中(比如 rpc),通常用来快速处理某个请求,应用较为广泛。
DelayQueue 是一个支持延时获取元素的无界阻塞队列。放入 DelayQueue 的对象需要实现 Delayed 接口,主要是提供一个延迟的时间,以及用于延迟队列内部比较排序。这种方式通常能够比大多数非阻塞的 while 循环更加节省 cpu 资源。
另外还有 PriorityBlockingQueue 和 LinkedTransferQueue 等,根据字面意思就能猜测它的用途。在线程池的构造参数中,我们使用的队列,一定要注意其特性和边界。比如,即使是最简单的 newFixedThreadPool,在某些场景下,也是不安全的,因为它使用了无界队列。
CountDownLatch
假如有一堆接口 A -Y,每个接口的耗时最大是 200ms,最小是 100ms。
我的一个服务,需要提供一个接口 Z,调用 A - Y 接口对结果进行聚合。接口的调用没有顺序需求,接口 Z 如何在 300ms 内返回这些数据?
此类问题典型的还有赛马问题,只有通过并行计算才能完成问题。归结起来可以分为两类:
实现任务的并行性
开始执行前等待 n 个线程完成任务
在 concurrent 包出现之前,需要手工的编写这些同步过程,非常复杂。现在就可以使用 CountDownLatch 和 CyclicBarrier 进行便捷的编码。
CountDownLatch 是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减 1。当计数器值到达 0 时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。CyclicBarrier 与其类似,可以实现同样的功能。不过在日常的工作中,使用 CountDownLatch 会更频繁一些。
信号量
Semaphore 虽然有一些应用场景,但大部分属于炫技,在编码中应该尽量少用。
信号量可以实现限流的功能,但它只是常用限流方式的一种。其他两种是漏桶算法、令牌桶算法。
hystrix 的熔断功能,也有使用信号量进行资源的控制。
Lock && Condition
在 Java 中,对于 Lock 和 Condition 可以理解为对传统的 synchronized 和 wait/notify 机制的替代。concurrent 包中的许多阻塞队列,就是使用 Condition 实现的。
但这些类和函数对于初中级码农来说,难以理解,容易产生 bug,应该在业务代码中严格禁止。但在网络编程、或者一些框架类工程中,这些功能是必须的,万不可将这部分的工作随便分配给某个小弟。
End
不管是 wait、notify,还是同步关键字或者锁,能不用就不用,因为它们会引发程序的复杂性。最好的方式,是直接使用 concurrent 包所提供的机制,来规避一些编码方面的问题。
concurrent 包中的 CAS 概念,在一定程度上算是无锁的一种实现。更专业的有类似 disruptor 的无锁队列框架,但它依然是建立在 CAS 的编程模型上的。近些年,类似 AKKA 这样的事件驱动模型正在走红,但编程模型简单,不代表实现简单,背后的工作依然需要多线程去协调。
golang 引入协程 (coroutine) 概念以后,对多线程加入了更加轻量级的补充。java 中可以通过 javaagent 技术加载 quasar 补充一些功能,但我觉得你不会为了这丁点效率去牺牲编码的可读性。