共计 12338 个字符,预计需要花费 31 分钟才能阅读完成。
1 线程状态简述
Java 线程在运行的生命周期中可能处于如下 6 种不同的状态,在给定的一个时刻,线程只能处于其中的一个状态。
线程状态 | 阐明 |
---|---|
NEW | 初始状态,线程刚被创立,然而并未启动(还未调用 start 办法)。 |
RUNNABLE | 运行状态,JAVA 线程将操作系统中的就绪(READY)和运行(RUNNING)两种状态抽象地称为“运行中”。 |
BLOCKED | 阻塞状态,示意线程阻塞于锁。 |
WAITING | 期待状态,示意该线程无限期期待另一个线程执行一个特地的动作。 |
TIMED_WAITING | 超时期待状态,不同于 WAITING 的是,它能够在指定工夫主动返回。 |
TERMINATED | 终止状态,示意以后状态曾经执行结束。 |
线程在本身的生命周期中,并不是固定地处于某个状态,而是随着代码的执行在不同的状态之间进行切换。
<img src=”https://img-blog.csdnimg.cn/20200929220833538.png” style=”zoom:80%;” />
在【并发编程根底】线程根底(罕用办法、状态)一文中,次要学习了 wait()、join()和 sleep()等办法,在【并发编程】深刻了解 synchronized 原理一文中,次要探讨了 synchronized 原理。上面就进行 park()/unpark()、wait()/notify()/notifyAll()的学习。
2 wait 和 notify/notifyAll
2.1 源码简析
wait(),notify(),notifyAll()都是 Object 根底类中的办法,所以在任何 Java 对象上都能够应用。
public class Object {// 导致以后线程期待,直到另一个线程调用此对象的 notify()办法或 notifyAll()办法。public final void wait() throws InterruptedException {wait(0);
}
// 导致以后线程期待,直到另一个线程调用此对象的 notify()办法或 notifyAll()办法,或者曾经过了指定的工夫。public final native void wait(long timeout) throws InterruptedException;
// 唤醒正在此对象监视器上期待的单个线程。public final native void notify();
// 唤醒期待此对象监视器的所有线程。public final native void notifyAll();}
关上 objectMonitor.cpp,查看 wait 办法:
...
// create a node to be put into the queue
// Critically, after we reset() the event but prior to park(), we must check
// for a pending interrupt.
ObjectWaiter node(Self); // 将以后线程封装成 ObjectWatier
node.TState = ObjectWaiter::TS_WAIT ; // 状态改为期待状态
Self->_ParkEvent->reset() ;
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag
// Enter the waiting queue, which is a circular doubly linked list in this case
// but it could be a priority queue or any data structure.
// _WaitSetLock protects the wait queue. Normally the wait queue is accessed only
// by the the owner of the monitor *except* in the case where park()
// returns because of a timeout of interrupt. Contention is exceptionally rare
// so we use a simple spin-lock instead of a heavier-weight blocking lock.
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;// 自旋操作
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ; // 增加到_WaitSet 节点中
...
查看 AddWaiter()办法:
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev == NULL, "node already in list");
assert(node->_next == NULL, "node already in list");
// put node at end of queue (circular doubly linked list)
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet ; // 通过双向链表的形式,将 ObjectWaiter 对象增加到_WaitSet 列表中
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
查看 notify 办法源码:
void ObjectMonitor::notify(TRAPS) {CHECK_OWNER();
if (_WaitSet == NULL) {TEVENT (Empty-Notify) ;// _WaitSet=NULL 表明没有期待状态的线程,间接返回。return ;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
ObjectWaiter * iterator = DequeueWaiter() ;// 获取一个 ObjectWaiter 对象
if (iterator != NULL) {
...
ObjectWaiter * List = _EntryList ;
if (List != NULL) {assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
// 依据不同状态采取不同策略,将从_WaitSet 列表中移出来的 ObjectWaiter 对象退出到_EntryList 列表中。if (Policy == 0) { // prepend to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) {...} else // append to EntryList
if (Policy == 2) {...} else // prepend to cxq
if (Policy == 3) { // append to cxq
...
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ; // 被唤醒的线程又变成 run 状态。ev->unpark() ;}
}
查看 notifyAll 办法源码:
void ObjectMonitor::notifyAll(TRAPS) {CHECK_OWNER();
ObjectWaiter* iterator;
if (_WaitSet == NULL) {TEVENT (Empty-NotifyAll) ;// _WaitSet=NULL 表明没有期待状态的线程,间接返回。return ;
}
DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
int Tally = 0 ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ;
for (;;) {iterator = DequeueWaiter () ;// 循环获取所以 ObjectWaiter 对象
...
ObjectWaiter * List = _EntryList ;
if (List != NULL) {assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
// 依据不同状态采取不同策略,将从_WaitSet 列表中移出来的 ObjectWaiter 对象退出到_EntryList 列表中。if (Policy == 0) { // prepend to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { // append to EntryList
...
} else
if (Policy == 2) { // prepend to cxq
...
} else
if (Policy == 3) { // append to cxq
...
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;// 被唤醒的线程又变成 run 状态。OrderAccess::fence() ;
ev->unpark() ;}
...
可见,wait()与 notify()/notifyAll()的实现都跟 Monitor 有很大关联。
- 当多线程拜访一段同步代码块时,这些都线程会被被封装成一个个 ObjectWatier 对象,并被放入 _EntryList 列表中,也就是被放到 Entry Set(入口区)中期待获取锁。
- 如果该线程获取到了锁(acquire),线程就会成为以后锁的 Owner。
- 获取到锁的线程可也以通过调用 wait 办法将锁开释(release),而后该线程对象会被放入_WaitSet 列表中,进入 Wait Set(期待区)进行期待(阻塞 BLOCKED)。
- 当获取到锁的对象调用 notify/notifyAll 办法唤醒期待区被阻塞的线程时,线程从新竞争锁。如果竞争锁胜利,那么线程就进入 RUNNABLE 状态;如果竞争锁失败,这些线程会从新进入到 Entry Set 区再从新去竞争锁。
wait 办法的应用对应上图的第 3 步,也就是说,调用 wait()
、notify()
/notifyAll()
办法的对象必须曾经获取到锁。
如何确保调用对象获取到锁呢?应用 sychronized 关键字呗!所以说这些办法调用也必须产生在 sychronized 润饰的同步代码块内。
2.2 期待 / 告诉机制
(1)什么是期待 / 告诉机制
期待 / 告诉机制是多个线程间的一种 合作 机制。谈到线程咱们常常想到的是线程间的 竞争(race),比方去竞争锁。但这并不是故事的全副,线程间也有合作机制。就好比咱们在公司中与共事关系,可能存在在降职时的竞争,但更多时候是一起单干以实现某些工作。
wait/notify 就是线程间的一种合作机制。
当一个线程调用 wait()/wait(long)办法后,进入 WAITING 状态或者 TIMED_WAITING 状态(阻塞),并开释锁与 CPU 资源。只有其余获取到锁的线程执行完他们的指定代码过后,再通过 notify()办法将其唤醒。如果须要,也能够应用 notifyAll()
来唤醒所有的阻塞线程。
(2)期待 / 告诉应用办法
期待 / 告诉机制就是用于解决线程间通信的问题的,应用到的 3 个办法的含意如下:
- wait:线程不再流动,不再参加调度,开释 它对锁的拥有权。它还要等着别的线程执行一个 特地的动作 ,也即是“ 告诉(notify)”在这个对象上期待的线程从 WAITING 状态中释放出来,从新进入到调度队列(ready queue)中。
- notify:唤醒一个期待以后对象的锁的线程。唤醒在此对象监视器上期待的单个线程。
- notifyAll:唤醒在此对象监视器上期待的所有线程。
留神:
哪怕只告诉了一个期待的线程,被告诉线程也不能立刻复原执行,因为它当初中断的中央是在同步块内,而此刻它曾经不持有锁,所以它须要 再次尝试去获取锁(很可能面临其它线程的竞争),胜利后能力在当初调用 wait 办法之后的中央复原执行。
总结如下:
- 如果能获取锁,线程就从 WAITING/TIMED_WAITING 状态转换为 RUNNABLE 状态;
- 否则,从 Wait Set 区进去,又进入 Entry Set 区,线程就从 WAITING 状态又变成 BLOCKED 状态。
(3)调用 wait 和 notify 办法须要留神的细节
- wait 办法与 notify 办法 必须要由同一个锁对象调用。因为对应的锁对象能够通过 notify 唤醒应用同一个锁对象调用的 wait 办法后的线程。
- wait 办法与 notify 办法是属于 Object 类的办法的。因为锁对象能够是任意对象,而任意对象的所属类都是继承了 Object 类的。
- wait 办法与 notify 办法 必须要在同步代码块或者是同步函数中应用。因为必须要通过锁对象调用这 2 个办法。
上面就通过一个案例来进一步理解期待 / 告诉机制:
public class WaitAndNotify {
static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws Exception {Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
notifyThread.start();}
static class Wait implements Runnable {
@Override
public void run() {
// 加锁,领有 lock 的 Monitor
synchronized (lock) {
// 当条件不满足时,持续 wait,同时开释了 lock 的锁
while (flag) {
try {System.out.println(Thread.currentThread().getName() + "flag is true. wait @" +
new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.wait();} catch (InterruptedException e) {}}
// 当条件满足时,实现工作
System.out.println(Thread.currentThread().getName() + "flag is false. running @" +
new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
}
static class Notify implements Runnable {
@Override
public void run() {
// 加锁,领有 lock 的 Monitor
synchronized (lock) {
// 取得 lock 的锁,而后进行告诉
System.out.println(Thread.currentThread().getName() + "hold lock. notify @" +
new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.notifyAll();
flag = false;
SleepUtils.second(5);
}
// 再次加锁
synchronized (lock) {System.out.println(Thread.currentThread().getName() + "hold lock. again. sleep @" +
new SimpleDateFormat("HH:mm:ss").format(new Date()));
SleepUtils.second(5);
}
}
}
}
class SleepUtils {public static final void second(long seconds) {
try {TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {}}
}
输入后果可能如下:
WaitThread flag is true. wait @ 00:38:53
NotifyThread hold lock. notify @ 00:38:54
NotifyThread hold lock. again. sleep @ 00:38:59
WaitThread flag is false. running @ 00:39:04
也可能如下:
WaitThread flag is true. wait @ 00:38:53
NotifyThread hold lock. notify @ 00:38:54
WaitThread flag is false. running @ 00:39:04
NotifyThread hold lock. again. sleep @ 00:38:59
之所以呈现这类状况,在于调用 notify()或 notifyAll()办法调用后,waitThread 是否胜利获取到锁。竞争胜利,则进入 RUNNABLE 状态;如果竞争失败,waitThread 会从新进入到 Entry Set 区再从新去竞争锁。也就是说,从 wait()办法返回的前提是取得了调用对象的锁。
从上述细节中能够看到,期待 / 告诉机制依靠于同步机制,其目标就是确保期待线程从 wait()办法返回时可能感知到告诉线程对变量做出的批改。
下图形容了上述示例的过程:
2.3 生产者 / 消费者模式
从下面案例中,能够提炼出期待 / 告诉的经典范式——生产者 / 消费者模式。该范式次要分为两局部,别离针对生产者(告诉方)和消费者(期待方)。
消费者 遵循如下准则:
(1)获取对象的锁。
(2)如果条件不满足,那么调用对象的 wait()办法,被告诉后仍要查看条件。
(3)条件满足则执行对应的逻辑。
对应的伪代码如下。
synchronized (对象) {while (条件) {对象.wait();
}
对应的解决逻辑
}
生产者 遵循如下准则:
(1)取得对象的锁。
(2)扭转条件。
(3)告诉所有期待在对象上的线程。对应的伪代码如下。
对应的伪代码如下。
synchronized (对象) {
扭转的条件
对象.notifyAll();// 或者 对象.notify()
}
代码实例:
首先建了一个简略的 Product
类,用来示意生产和生产的产品。
public class Product {
private String name;
public Product(String name) {this.name = name;}
public String getName() {return name;}
public void setName(String name) {this.name = name;}
}
创立生产者类:
public class Producer implements Runnable {
private Queue<Product> queue;
private int maxCapacity;
public Producer(Queue<Product> queue, int maxCapacity) {
this.queue = queue;
this.maxCapacity = maxCapacity;
}
@Override
public void run() {synchronized (queue) {while (queue.size() == maxCapacity) {
try {System.out.println("生产者" + Thread.currentThread().getName() + "Queue 已满,WAITING");
wait();
System.out.println("生产者" + Thread.currentThread().getName() + "退出期待");
} catch (InterruptedException e) {e.printStackTrace();
}
}
if (queue.size() == 0) { // 队列里的产品从无到有,须要告诉在期待的消费者
queue.notifyAll();}
Integer i = new Random().nextInt(50);
queue.offer(new Product("产品" + i.toString()));
System.out.println("生产者" + Thread.currentThread().getName() + "生产了产品" + i.toString());
}
}
}
创立消费者类:
public class Consumer implements Runnable {
private Queue<Product> queue;
private int maxCapacity;
public Consumer(Queue queue, int maxCapacity) {
this.queue = queue;
this.maxCapacity = maxCapacity;
}
@Override
public void run() {synchronized (queue) {while (queue.isEmpty()) {
try {System.out.println("消费者" + Thread.currentThread().getName() + "Queue 已空,WAITING");
wait();
System.out.println("消费者" + Thread.currentThread().getName() + "退出期待");
} catch (InterruptedException e) {e.printStackTrace();
}
}
if (queue.size() == maxCapacity) {queue.notifyAll();
}
Product product = queue.poll();
System.out.println("消费者" + Thread.currentThread().getName() + "生产了" + product.getName());
}
}
}
开启多线程:
public class TreadTest {public static void main(String[] args) {Queue<Product> queue = new ArrayDeque<>();
for (int i = 0; i < 10; i++) {new Thread(new Producer((Queue<Product>) queue, 10)).start();
new Thread(new Consumer((Queue) queue, 10)).start();}
}
}
测试后果:
生产者 Thread- 0 生产了产品 35
消费者 Thread- 1 生产了产品 35
生产者 Thread- 2 生产了产品 43
消费者 Thread- 3 生产了产品 43
消费者 Thread-5Queue 已空,WAITING
生产者 Thread- 6 生产了产品 17
生产者 Thread- 8 生产了产品 39
消费者 Thread- 7 生产了产品 17
生产者 Thread-10 生产了产品 17
生产者 Thread-12 生产了产品 3
消费者 Thread-13 生产了产品 39
生产者 Thread-14 生产了产品 10
消费者 Thread-17 生产了产品 17
生产者 Thread-16 生产了产品 8
消费者 Thread-19 生产了产品 3
生产者 Thread- 4 生产了产品 29
消费者 Thread- 9 生产了产品 10
消费者 Thread-11 生产了产品 8
消费者 Thread-15 生产了产品 29
生产者 Thread-18 生产了产品 33
3 park 与 unpark
LockSupport 类是 Java6(JSR166-JUC)引入的一个类,用来 创立锁和其余同步工具类的根本线程阻塞原语 。应用 LockSupport 类中的 park() 和 unpark()办法也能够实现线程的阻塞与唤醒。Park 有停车的意思,假如线程为车辆,那么 park 办法代表着停车,而 unpark 办法则是指车辆启动来到。
public static void park() {UNSAFE.park(false, 0L);
}
public static void park(Object blocker) {Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
public static void unpark(Thread thread) {if (thread != null)
UNSAFE.unpark(thread);
}
归根到底还是调用了 UNSAFE 类中的函数:
public native void unpark(Object var1);
public native void park(boolean var1, long var2);
与 wait/notify 相比,park/unpark 办法更贴近操作系统层面的阻塞与唤醒线程,并不需要获取对象的监视器。
park/unpark 原理可参考 LockSupport 中的 park 与 unpark 原理一文。
须要明确的是,每个 java 线程都有一个 Parker 对象,次要三局部组成 _counter、_cond 和 _mutex。Parker 类是这样定义的:
class Parker : public os::PlatformParker {
private:
// 示意许可
volatile int _counter ;
...
public:
Parker() : PlatformParker() {
// 初始化_counter
_counter = 0 ;
...
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
}
class PlatformParker : public CHeapObj<mtInternal> {
protected:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
...
}
Parker 类里的_counter 字段,就是用来记录所谓的“许可 ”的。 当调用 park 时,这个变量置为了 0;当调用 unpark 时,这个变量置为 1 。
park 和 unpark 的灵便之处在于,unpark 函数能够先于 park 调用。比方线程 B 调用 unpark 函数,给线程 A 发了一个“许可”,那么当线程 A 调用 park 时,它发现曾经有“许可”了,那么它会马上再持续运行。
先调用 park 再调用 upark 时:
1. 先调用 park
(1)以后线程调用 Unsafe.park() 办法,查看_counter 状况(为 0),取得 _mutex 互斥锁。
(2)mutex 对象有个期待队列 _cond,线程进入期待队列中阻塞。
(4)设置 _counter = 0。
2. 再调用 upark
(1)调用 Unsafe.unpark 办法,设置 _counter 为 1
(2)唤醒 _cond 条件变量中的 阻塞线程,线程复原运行。
(3)设置 _counter 为 0
先调用 upark 再调用 park 时:
(1)调用 Unsafe.unpark 办法,设置 _counter 为 1。
(2)以后线程调用 Unsafe.park() 办法。
(3)查看 _counter,本状况为 1,这时线程无需阻塞,持续运行。
(4)设置 _counter 为 0。
特地留神的是,LockSupport 是不可重入 的,如果一个线程间断 2 次调用 LockSupport.park(),那么该线程肯定会始终阻塞上来。
public static void main(String[] args) throws Exception {Thread thread = Thread.currentThread();
LockSupport.unpark(thread);
System.out.println("线程处于运行状态");
LockSupport.park();
System.out.println("线程处于阻塞状态");
LockSupport.park();
System.out.println("线程处于阻塞状态");
LockSupport.unpark(thread);
System.out.println("线程处于运行状态???");
}
运行后果如下:
线程处于运行状态
线程处于阻塞状态
可见,在第二次调用 park 后,线程无奈再获取许可呈现了死锁。
参考资料
JAVA 并发编程的艺术
Java 精通并发 - 透过 openjdk 源码剖析 wait 与 notify 办法的本地实现
LockSupport 中的 park 与 unpark 原理
Java 的 LockSupport.park()实现剖析