共计 6031 个字符,预计需要花费 16 分钟才能阅读完成。
Java 并发学习
使用并发的一个重要原因是提高执行效率。由于 I / O 等情况阻塞,单个任务并不能充分利用 CPU 时间。所以在单处理器的机器上也应该使用并发。
为了实现并发,操作系统层面提供了多进程。但是进程的数量和开销都有限制,并且多个进程之间的数据共享比较麻烦。另一种比较轻量的并发实现是使用线程,一个进程可以包含多个线程。线程在进程中没有数量限制, 数据共享相对简单。线程的支持跟语言是有关系的。Java 语言中支持多线程。
Java 中的多线程是抢占式的。这意味着一个任务随时可能中断并切换到其它任务。所以我们需要在代码中足够的谨慎,防范好这种切换带来的副作用。
基础
Runnable 它可以理解成一个任务。它的 run()方法就是任务的逻辑,执行顺序。Thread 它是一个任务的载体,虚拟机通过它来分配任务执行的时间片。Thread 中的 start 方法可以作为一个并发任务的入口。不通过 start 方法来执行任务,那么 run 方法就只是一个普通的方法
线程的状态有四种:
NEW 线程创建的时候短暂的处于这种状态。这种状态下已经可以获得 CPU 时间了,随后可能进入 RUNNABLE,BLOCKED 状态。
RUNNABLE 此状态下只要 CPU 将时间分配给线程,线程中的任务就可以执行。随后可能进入 BLOCKED,DEAD 状态。
BLOCKED 线程可以运行,但是有某个条件阻止着它。当线程处于阻塞状态时,CPU 不会分配时间片给它,直到它重新进入 RUNNABLE 状态。
DEAD 此状态的线程将永远不会获得 CPU 时间片。通常是因为 run()方法返回才会到达此状态。此时任务还是可以被中断的。
Callable 它是一个带返回的异步任务,返回的结果放到一个 Future 对象中。Future 它可以接受 Callable 任务的返回结果。在任务没有返回的时候调用 get 方法会阻塞当前线程。cancel 方法会尝试取消未完成的任务(未执行 -> 直接不执行,已经完成 -> 返回 false, 正在执行 -> 尝试中断)。FutureTask 同时继承了 Runnable, Callable 接口。Java 1.5 之后,不再推荐直接使用 Thread 对象作为任务的入口。推荐使用 Executor 管理 Thread 对象。Executor 是线程与任务之间的的一个中间层,它屏蔽了线程的生命周期,不再需要显式的管理线程。并且 ThreadPoolExecutor 实现了此接口,我们可以通过它来利用线程池的优点。线程池涉及到的类有:Executor, ExecutorService, ThreadExecutorPool, Executors, FixedThreadPool, CachedThreadPool, SingleThreadPool。Executor 只有一个方法,execute 来提交一个任务
ExecutorService 提供了管理异步任务的方法,也可以产生一个 Future 对象来跟踪一个异步任务。
主要的方法如下:
submit 可以提交一个任务
shutdown 可以拒绝接受新任务
shutdownNow 可以拒绝新任务并向正在执行的任务发出中断信号
invokeXXX 批量执行任务
ThreadPoolExecutor 线程池的具体实现类。线程池的好处在于提高效率,能避免频繁申请 / 回收线程带来的开销。
它的使用方法复杂一些,构造线程池的可选参数有:
corePoolSize : int 工作的 Worker 的数量。
maximumPoolSize : int 线程池中持有的 Worker 的最大数量
keepAliveTime : long 当超过 Workder 的数量 corePoolSize 的时候,如果没有新的任务提交,超过 corePoolSize 的 Worker 的最长等待时间。超过这个时间之后,一部分 Worker 将被回收。
unit : TimeUnit keepAliveTime 的单位
workQueue : BlockingQueue 缓存任务的队列, 这个队列只缓存提交的 Runnable 任务。
threadFactory : ThreadFactory 产生线程的“工厂”
handler : RejectedExecutionHandler 当一个任务被提交的时候,如果所有 Worker 都在工作并且超过了缓存队列的容量的时候。会交给这个 Handler 处理。Java 中提供了几种默认的实现,AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy。
这里的 Worker 可以理解为一个线程。这里之前想不通,觉得线程不可能重新利用绑定新任务。看了下源码发现原来确实不是重新绑定任务。每一个 Worker 的核心部分只是一个循环,不断从缓存队列中取任务执行。这样达到了重用的效果。
[Java] 纯文本查看 复制代码
?
final void runWorker(Worker w) {
Runnable task = w.firstTask;
// ...
try {while(task != null || (task=getTask())!=null) {
try{task.run();
} catch(Exception e){ }
// ...
}
} finally {// ...}
// ...
}
Executors 类提供了几种默认线程池的实现方式。
CachedThreadExecutor 工作线程的数量没有上限 (Integer 的最大值), 有需要就创建新线程。
FixedThreadExecutor 预先一次分配固定数量的线程,之后不再需要创建新线程。
SingleThreadExecutor 只有一个线程的线程池。如果提交了多个任务,那么这些人物将排队,每个任务都在上一个人物执行完之后执行。所有任务都是按照它们的提交顺序执行的。
sleep(long) 当前线程 中止 一段时间。它不会释放锁。Java1.5 之后提供了更加灵活的版本。
TimeUnit 可以指定睡眠的时间单位。
优先级 绝大多数情况下我们都应该使用默认的优先级。不同的虚拟机中对应的优先级级别的总数,一般用三个就可以了 MAX_PRIORITY, NORM_PRIORITY, MIN_PRIORITY。
让步 Thread.yield()建议相同优先级的其它线程先运行,但是不保证一定运行其它线程。
后台线程 一个进程中的所有非后台线程都终止的时候整个进程也就终止,同时杀死所有后台线程。与优先级没有什么关系。
join() 线程 A 持有线程 T,当在线程 T 调用 T.join()之后,A 会阻塞,直到 T 的任务结束。可以加一个超时参数,这样在超时之后线程 A 可以放弃等待继续执行任务。
捕获异常 不能跨线程捕获异常。比如说不能在 main 线程中添加 try-catch 块来捕获其它线程中抛出的异常。每一个 Thread 对象都可以设置一个 UncaughtExceptionHandler 对象来处理本线程中抛出的异常。线程池中可以通过参数 ThreadFactory 来为每一个线程设置一个 UncaughtExceptionHandler 对象。
访问共享资源 在处理并发的时候,将变量设置为 private 非常的重要,这可以防止其它线程直接访问变量。
synchronized 修饰方法在不加参数情况下,使用对象本身作为锁。静态方法使用 Class 对象作为锁。同一个任务可以多次获得对象锁。
显式锁 Lock,相比 synchronized 更加灵活。但是需要的代码更多,编写出错的可能性也更高。只有在解决特殊问题或者提高效率的时候才用它。
原子性 原子操作就是永远不会被线程切换中断的操作。很多看似原子的操作都是非原子的,比如说 long,double 是由两个 byte 表示的,它们的所有操作都是非原子的。所以,涉及到并发异常的地方都加上同步吧。除非你对虚拟机十分的了解。
volatile 这个关键字的作用在于防止多线程环境下读取变量的脏数据。这个关键字在 c 语言中也有,作用是相同的。
原子类 AtomicXXX 类,它们能够保证对数据的操作是满足原子性的。这些类可以用来优化多线程的执行效率,减少锁的使用。然而,使用难度还是比较高的。
临界区 synchronized 关键字的用法。不是修饰整个方法,而是修饰一个代码块。它的作用在于尽量利用并发的效率,减少同步控制的区域。
ThreadLocal 这个概念与同步的概念不同。它是给每一个线程都创建一个变量的副本,并保持副本之间相互独立,互不干扰。所以各个线程操作自己的副本,不会产生冲突。
终结任务
这里我讲一下自己当前的理解。
一个线程不是可以随便中断的。即使我们给线程设置了中断状态,它也还是可以获得 CPU 时间片的。只有因为 sleep()方法而阻塞的线程可以立即收到 InterruptedException 异常,所以在 sleep 中断任务的情况下可以直接使用 try-catch 跳出任务。其它情况下,均需要通过判断线程状态来判断是否需要跳出任务 (Thread.interrupted() 方法)。
synchronized 方法修饰的代码不会在收到中断信号后立即中断。ReentrantLock 锁控制的同步代码可以通过 InterruptException 中断。
Thread.interrupted 方法调用一次之后会立即清空中断状态。可以自己用变量保存状态。
线程协作
wait/notifyAll wait/notifyAll 是 Object 类中的方法。调用 wait/notifyAll 方法的对象是互斥对象。因为 Java 中所有的 Object 都可以做互斥量(synchronized 关键字的参数), 所以 wait/notify 方法是在 Object 类中的。wait 与 sleep 不同在于 sleep 方法是 Thread 类中的方法,调用它的时候不会释放锁;wait 方法是 Object 类中的方法,调用它的时候会释放锁。调用 wait 方法之前,当前线程必须持有这段逻辑的锁。否则会抛出异常,不能继续执行。wait 方法可以将当前线程放入等待集合中,并释放当前线程持有的锁。此后,该线程不会接收到 CPU 的调度,并进入休眠状态。有四种情况肯能打破这种状态:
有其它线程在此互斥对象上调用了 notify 方法,并且刚好选中了这个线程被唤醒;
有其它线程在此互斥对象上调用了 notifyAll 方法;
其它线程向此线程发出了中断信号;
等待时间超过了参数设置的时间。
线程一旦被唤醒之后,它会像正常线程一样等待之前持有的所有锁。直到恢复到 wait 方法调用之前的状态。还有一种不常见的情况,spurious wakeup(虚假唤醒)。就是在没有 notify,notifyAll,interrupt 的时候线程自动醒来。查了一些资料并没有弄清楚是为什么。不过为了防止这种现象,我们要在 wait 的条件上加一层循环。当一个线程调用 wait 方法之后,其它线程调用该线程的 interrupt 方法。该线程会唤醒,并尝试恢复之前的状态。当状态恢复之后,该线程会抛出一个异常。notify 唤醒一个等待此对象的线程。notifyAll 唤醒所有等待此对象的线程。
错失的信号
当两个线程使用 notify/wait 或者 notifyAll/wait 进行协作的时候,不恰当的使用它们可能会导致一些信号丢失。例子:
[Java] 纯文本查看 复制代码
?
T1:
synchronized(shareMonitor){
// set up condition for T2
shareMonitor.notify();
}
T2:
while(someCondition){
// Point 1
synchronized(shareMonitor){shareMonitor.wait();
}
}
信号丢失是这样发生的:
当 T2 执行到 Point1 的时候,线程调度器将工作线程从 T2 切换到 T1。T1 完成 T2 条件的设置工作之后,线程调度器将工作线程从 T1 切换回 T2。虽然 T2 线程等待的条件已经满足,但还是会被挂起。
解决的方法比较简单:
[Java] 纯文本查看 复制代码
?
T2:
synchronized(sharedMonitor) {
while(someCondition) {sharedMonitor.wait();
}
}
将竞争条件放到 while 循环的外面即可。在进入 while 循环之后,在没有调用 wait 方法释放锁之前,将不会进入到 T1 线程造成信号丢失。
notify & notifyAll 前面已经提过这两个方法的区别。notify 是随机唤醒一个等待此锁的线程,notifyAll 是唤醒所有等待此锁的线程。Condition 他是 concurrent 类库中显式的挂起 / 唤醒任务的工具。它是真正的锁 (Lock) 对象产生的一个对象。其实用法跟 wait/notify 是一致的。await 挂起任务,signalAll()唤醒任务。生产者消费者队列 Java 中提供了一种非常简便的容器,BlockingQueue。已经帮你写好了阻塞式的队列。除了 BlockingQueue,使用 PipedWriter/PipedReader 也可以方便的在线程之间传递数据。
死锁
死锁有四个必要条件,打破一个即可去除死锁。
四个必要条件:
互斥条件。互斥条件:一个资源每次只能被一个进程使用。
请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
不剥夺条件: 线程已获得的资源,在末使用完之前,不能强行剥夺。
循环等待条件: 若干线程之间形成一种头尾相接的循环等待资源关系。
本来自己翻译,但发现百度上描述的更好一些,直接 copy 到这里来,并把进程换成了线程。
其它工具
CountDownLatch 同步多个任务,强制等待其它任务完成。它有两个重要方法 countDown,await 以及构造时传入的参数 SIZE。当一个线程调用 await 方法的时候会挂起,直到该对象收到 SIZE 次 countDown。一个对象只能使用一次。CyclicBarrier 也是有一个 SIZE 参数。当有 SIZE 个线程调用 await 的时候,全部线程都会被唤醒。可以理解为所有运动员就位后才能起跑,早就位的运动员只能挂起等待。它可以重复利用。DelayQueue 一个无界的 BlockingQueue,用来放置实现了 Delay 接口的对象,在队列中的对象只有在到期之后才能被取走。如果没有任何对象到期,就没有头元素。PriorityBlockingQueue 一种自带优先级的阻塞式队列。ScheduledExecutor 可以把它想象成一种线程池式的 Timer, TimerTask。Semaphore 互斥锁只允许一个线程访问资源,但是 Semaphore 允许 SIZE 个线程同时访问资源。
Exchanger 生产者消费者问题的特殊版。两个线程可以在都‘准备好了’之后交换一个对象的控制权。
ReadWriteLock 读写锁。读 - 读不互斥,读 - 写互斥,写 - 写互斥。
文章摘自:https://juejin.im/entry/5777c…