1 线程状态简述

Java线程在运行的生命周期中可能处于如下6种不同的状态,在给定的一个时刻,线程只能处于其中的一个状态。

线程状态阐明
NEW初始状态,线程刚被创立,然而并未启动(还未调用start办法)。
RUNNABLE运行状态,JAVA线程将操作系统中的就绪(READY)和运行(RUNNING)两种状态抽象地称为“运行中”。
BLOCKED阻塞状态,示意线程阻塞于锁。
WAITING期待状态,示意该线程无限期期待另一个线程执行一个特地的动作。
TIMED_WAITING超时期待状态,不同于WAITING的是,它能够在指定工夫主动返回。
TERMINATED终止状态,示意以后状态曾经执行结束。

线程在本身的生命周期中,并不是固定地处于某个状态,而是随着代码的执行在不同的状态之间进行切换。

<img src="https://img-blog.csdnimg.cn/20200929220833538.png" style="zoom:80%;" />

在【并发编程根底】线程根底(罕用办法、状态)一文中,次要学习了wait()、join()和sleep()等办法,在【并发编程】深刻了解synchronized原理一文中,次要探讨了synchronized原理。上面就进行park()/unpark()、wait()/notify()/notifyAll()的学习。

2 wait和notify/notifyAll

2.1 源码简析

wait( ),notify( ),notifyAll( )都是Object根底类中的办法,所以在任何 Java 对象上都能够应用。

public class Object {    // 导致以后线程期待,直到另一个线程调用此对象的notify()办法或notifyAll()办法。    public final void wait() throws InterruptedException {        wait(0);    }    // 导致以后线程期待,直到另一个线程调用此对象的notify()办法或notifyAll()办法,或者曾经过了指定的工夫。    public final native void wait(long timeout) throws InterruptedException;    //  唤醒正在此对象监视器上期待的单个线程。    public final native void notify();    //  唤醒期待此对象监视器的所有线程。    public final native void notifyAll();}

关上objectMonitor.cpp,查看wait办法:

     ...   // create a node to be put into the queue   // Critically, after we reset() the event but prior to park(), we must check   // for a pending interrupt.   ObjectWaiter node(Self);                   // 将以后线程封装成ObjectWatier   node.TState = ObjectWaiter::TS_WAIT ; // 状态改为期待状态   Self->_ParkEvent->reset() ;   OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag   // Enter the waiting queue, which is a circular doubly linked list in this case   // but it could be a priority queue or any data structure.   // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only   // by the the owner of the monitor *except* in the case where park()   // returns because of a timeout of interrupt.  Contention is exceptionally rare   // so we use a simple spin-lock instead of a heavier-weight blocking lock.   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;// 自旋操作   AddWaiter (&node) ;   Thread::SpinRelease (&_WaitSetLock) ;                 // 增加到_WaitSet节点中    ...

查看AddWaiter()办法:

inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {  assert(node != NULL, "should not dequeue NULL node");  assert(node->_prev == NULL, "node already in list");  assert(node->_next == NULL, "node already in list");  // put node at end of queue (circular doubly linked list)  if (_WaitSet == NULL) {    _WaitSet = node;    node->_prev = node;    node->_next = node;  } else {    ObjectWaiter* head = _WaitSet ; // 通过双向链表的形式,将ObjectWaiter对象增加到_WaitSet列表中    ObjectWaiter* tail = head->_prev;    assert(tail->_next == head, "invariant check");    tail->_next = node;    head->_prev = node;    node->_next = head;    node->_prev = tail;  }}

查看notify办法源码:

void ObjectMonitor::notify(TRAPS) {  CHECK_OWNER();  if (_WaitSet == NULL) {     TEVENT (Empty-Notify) ;// _WaitSet=NULL表明没有期待状态的线程,间接返回。     return ;  }  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);  int Policy = Knob_MoveNotifyee ;  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;  ObjectWaiter * iterator = DequeueWaiter() ;// 获取一个ObjectWaiter对象  if (iterator != NULL) {      ...     ObjectWaiter * List = _EntryList ;     if (List != NULL) {        assert (List->_prev == NULL, "invariant") ;        assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;        assert (List != iterator, "invariant") ;     }     // 依据不同状态采取不同策略,将从_WaitSet列表中移出来的ObjectWaiter对象退出到_EntryList列表中。     if (Policy == 0) {       // prepend to EntryList         if (List == NULL) {             iterator->_next = iterator->_prev = NULL ;             _EntryList = iterator ;         } else {             List->_prev = iterator ;             iterator->_next = List ;             iterator->_prev = NULL ;             _EntryList = iterator ;        }     } else     if (Policy == 1) {...} else     // append to EntryList     if (Policy == 2) {...} else     // prepend to cxq     if (Policy == 3) {                 // append to cxq         ...          } else {        ParkEvent * ev = iterator->_event ;        iterator->TState = ObjectWaiter::TS_RUN ;        OrderAccess::fence() ;         // 被唤醒的线程又变成run状态。        ev->unpark() ;     }}

查看notifyAll办法源码:

void ObjectMonitor::notifyAll(TRAPS) {  CHECK_OWNER();  ObjectWaiter* iterator;  if (_WaitSet == NULL) {      TEVENT (Empty-NotifyAll) ;// _WaitSet=NULL表明没有期待状态的线程,间接返回。      return ;  }  DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);  int Policy = Knob_MoveNotifyee ;  int Tally = 0 ;  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ;  for (;;) {     iterator = DequeueWaiter () ;// 循环获取所以ObjectWaiter对象       ...     ObjectWaiter * List = _EntryList ;     if (List != NULL) {        assert (List->_prev == NULL, "invariant") ;        assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;        assert (List != iterator, "invariant") ;     }      // 依据不同状态采取不同策略,将从_WaitSet列表中移出来的ObjectWaiter对象退出到_EntryList列表中。         if (Policy == 0) {       // prepend to EntryList         if (List == NULL) {             iterator->_next = iterator->_prev = NULL ;             _EntryList = iterator ;         } else {             List->_prev = iterator ;             iterator->_next = List ;             iterator->_prev = NULL ;             _EntryList = iterator ;        }     } else     if (Policy == 1) {      // append to EntryList        ...     } else     if (Policy == 2) {      // prepend to cxq        ...     } else     if (Policy == 3) {      // append to cxq        ...     } else {        ParkEvent * ev = iterator->_event ;        iterator->TState = ObjectWaiter::TS_RUN ;// 被唤醒的线程又变成run状态。        OrderAccess::fence() ;        ev->unpark() ;     }      ...

可见,wait()与notify()/notifyAll()的实现都跟Monitor有很大关联。

  • 当多线程拜访一段同步代码块时,这些都线程会被被封装成一个个ObjectWatier对象,并被放入 _EntryList列表中,也就是被放到 Entry Set(入口区) 中期待获取锁。
  • 如果该线程获取到了锁(acquire),线程就会成为以后锁的 Owner。
  • 获取到锁的线程可也以通过调用 wait 办法将锁开释(release),而后该线程对象会被放入_WaitSet列表中,进入Wait Set (期待区)进行期待(阻塞BLOCKED)。
  • 当获取到锁的对象调用notify/notifyAll办法唤醒期待区被阻塞的线程时,线程从新竞争锁。如果竞争锁胜利,那么线程就进入RUNNABLE状态;如果竞争锁失败,这些线程会从新进入到Entry Set区再从新去竞争锁。

wait办法的应用对应上图的第3步,也就是说,调用wait()notify()/notifyAll()办法的对象必须曾经获取到锁

如何确保调用对象获取到锁呢?应用sychronized关键字呗!所以说这些办法调用也必须产生在sychronized润饰的同步代码块内

2.2 期待/告诉机制

(1)什么是期待/告诉机制

期待/告诉机制是多个线程间的一种合作机制。谈到线程咱们常常想到的是线程间的竞争(race),比方去竞争锁。但这并不是故事的全副,线程间也有合作机制。就好比咱们在公司中与共事关系,可能存在在降职时的竞争,但更多时候是一起单干以实现某些工作。

wait/notify 就是线程间的一种合作机制。

当一个线程调用wait()/wait(long)办法后,进入WAITING状态或者TIMED_WAITING状态(阻塞),并开释锁与CPU资源。只有其余获取到锁的线程执行完他们的指定代码过后,再通过notify()办法将其唤醒。 如果须要,也能够应用 notifyAll()来唤醒所有的阻塞线程。

(2)期待/告诉应用办法

期待/告诉机制就是用于解决线程间通信的问题的,应用到的3个办法的含意如下:

  1. wait:线程不再流动,不再参加调度,开释它对锁的拥有权。它还要等着别的线程执行一个特地的动作,也即是“告诉(notify)”在这个对象上期待的线程从WAITING状态中释放出来,从新进入到调度队列(ready queue)中。
  2. notify:唤醒一个期待以后对象的锁的线程。唤醒在此对象监视器上期待的单个线程。
  3. notifyAll:唤醒在此对象监视器上期待的所有线程。

留神:

哪怕只告诉了一个期待的线程,被告诉线程也不能立刻复原执行,因为它当初中断的中央是在同步块内,而此刻它曾经不持有锁,所以它须要再次尝试去获取锁(很可能面临其它线程的竞争),胜利后能力在当初调用 wait 办法之后的中央复原执行。

总结如下:

  • 如果能获取锁,线程就从 WAITING/TIMED_WAITING 状态转换为RUNNABLE 状态;
  • 否则,从 Wait Set 区进去,又进入 Entry Set区,线程就从 WAITING 状态又变成 BLOCKED 状态。

(3)调用wait和notify办法须要留神的细节

  • wait办法与notify办法必须要由同一个锁对象调用。因为对应的锁对象能够通过notify唤醒应用同一个锁对象调用的wait办法后的线程。
  • wait办法与notify办法是属于Object类的办法的。因为锁对象能够是任意对象,而任意对象的所属类都是继承了Object类的。
  • wait办法与notify办法必须要在同步代码块或者是同步函数中应用。因为必须要通过锁对象调用这2个办法。

上面就通过一个案例来进一步理解期待/告诉机制:

public class WaitAndNotify {    static boolean flag = true;    static Object lock = new Object();    public static void main(String[] args) throws Exception {        Thread waitThread = new Thread(new Wait(), "WaitThread");        waitThread.start();        TimeUnit.SECONDS.sleep(1);        Thread notifyThread = new Thread(new Notify(), "NotifyThread");        notifyThread.start();    }    static class Wait implements Runnable {        @Override        public void run() {            // 加锁,领有lock的Monitor            synchronized (lock) {                // 当条件不满足时,持续wait,同时开释了lock的锁                while (flag) {                    try {                        System.out.println(Thread.currentThread().getName() + " flag is true. wait @ " +                                new SimpleDateFormat("HH:mm:ss").format(new Date()));                        lock.wait();                    } catch (InterruptedException e) {                    }                }                // 当条件满足时,实现工作                System.out.println(Thread.currentThread().getName() + " flag is false. running @ " +                        new SimpleDateFormat("HH:mm:ss").format(new Date()));            }        }    }    static class Notify implements Runnable {        @Override        public void run() {            // 加锁,领有lock的Monitor            synchronized (lock) {                // 取得lock的锁,而后进行告诉                System.out.println(Thread.currentThread().getName() + " hold lock. notify @ " +                        new SimpleDateFormat("HH:mm:ss").format(new Date()));                lock.notifyAll();                flag = false;                SleepUtils.second(5);            }            // 再次加锁            synchronized (lock) {                System.out.println(Thread.currentThread().getName() + " hold lock. again. sleep @ " +                        new SimpleDateFormat("HH:mm:ss").format(new Date()));                SleepUtils.second(5);            }        }    }}class SleepUtils {    public static final void second(long seconds) {        try {            TimeUnit.SECONDS.sleep(seconds);        } catch (InterruptedException e) {        }    }}

输入后果可能如下:

WaitThread flag is true. wait @ 00:38:53NotifyThread hold lock. notify @ 00:38:54NotifyThread hold lock. again. sleep @ 00:38:59WaitThread flag is false. running @ 00:39:04

也可能如下:

WaitThread flag is true. wait @ 00:38:53NotifyThread hold lock. notify @ 00:38:54WaitThread flag is false. running @ 00:39:04NotifyThread hold lock. again. sleep @ 00:38:59

之所以呈现这类状况,在于调用notify()或notifyAll()办法调用后,waitThread是否胜利获取到锁。竞争胜利,则进入RUNNABLE状态;如果竞争失败,waitThread会从新进入到Entry Set区再从新去竞争锁。也就是说,从wait()办法返回的前提是取得了调用对象的锁

从上述细节中能够看到,期待/告诉机制依靠于同步机制,其目标就是确保期待线程从wait()办法返回时可能感知到告诉线程对变量做出的批改。

下图形容了上述示例的过程:

2.3 生产者/消费者模式

从下面案例中,能够提炼出期待/告诉的经典范式——生产者/消费者模式。该范式次要分为两局部,别离针对生产者(告诉方)和消费者(期待方)。

消费者遵循如下准则:

(1)获取对象的锁。

(2)如果条件不满足,那么调用对象的wait()办法,被告诉后仍要查看条件。

(3)条件满足则执行对应的逻辑。

对应的伪代码如下。

 synchronized (对象) {     while (条件) {         对象.wait();     }     对应的解决逻辑 }

生产者遵循如下准则:

(1)取得对象的锁。

(2)扭转条件。

(3)告诉所有期待在对象上的线程。对应的伪代码如下。

对应的伪代码如下。

 synchronized (对象) {    扭转的条件    对象.notifyAll();//或者 对象.notify() }

代码实例

首先建了一个简略的 Product 类,用来示意生产和生产的产品。

public class Product {    private String name;    public Product(String name) {        this.name = name;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }}

创立生产者类:

public class Producer implements Runnable {    private Queue<Product> queue;    private int maxCapacity;    public Producer(Queue<Product> queue, int maxCapacity) {        this.queue = queue;        this.maxCapacity = maxCapacity;    }    @Override    public void run() {        synchronized (queue) {            while (queue.size() == maxCapacity) {                try {                    System.out.println("生产者" + Thread.currentThread().getName() + "Queue 已满,WAITING");                    wait();                    System.out.println("生产者" + Thread.currentThread().getName() + "退出期待");                } catch (InterruptedException e) {                    e.printStackTrace();                }            }            if (queue.size() == 0) { //队列里的产品从无到有,须要告诉在期待的消费者                queue.notifyAll();            }            Integer i = new Random().nextInt(50);            queue.offer(new Product("产品" + i.toString()));            System.out.println("生产者" + Thread.currentThread().getName() + "生产了产品" + i.toString());        }    }}

创立消费者类:

public class Consumer implements Runnable {    private Queue<Product> queue;    private int maxCapacity;    public Consumer(Queue queue, int maxCapacity) {        this.queue = queue;        this.maxCapacity = maxCapacity;    }    @Override    public void run() {        synchronized (queue) {            while (queue.isEmpty()) {                try {                    System.out.println("消费者" + Thread.currentThread().getName() + "Queue已空,WAITING");                    wait();                    System.out.println("消费者" + Thread.currentThread().getName() + "退出期待");                } catch (InterruptedException e) {                    e.printStackTrace();                }            }            if (queue.size() == maxCapacity) {                queue.notifyAll();            }            Product product = queue.poll();            System.out.println("消费者" + Thread.currentThread().getName() + "生产了" + product.getName());        }    }}

开启多线程:

public class TreadTest {    public static void main(String[] args) {        Queue<Product> queue = new ArrayDeque<>();        for (int i = 0; i < 10; i++) {            new Thread(new Producer((Queue<Product>) queue, 10)).start();            new Thread(new Consumer((Queue) queue, 10)).start();        }    }}

测试后果:

生产者Thread-0生产了产品35消费者Thread-1生产了产品35生产者Thread-2生产了产品43消费者Thread-3生产了产品43消费者Thread-5Queue已空,WAITING生产者Thread-6生产了产品17生产者Thread-8生产了产品39消费者Thread-7生产了产品17生产者Thread-10生产了产品17生产者Thread-12生产了产品3消费者Thread-13生产了产品39生产者Thread-14生产了产品10消费者Thread-17生产了产品17生产者Thread-16生产了产品8消费者Thread-19生产了产品3生产者Thread-4生产了产品29消费者Thread-9生产了产品10消费者Thread-11生产了产品8消费者Thread-15生产了产品29生产者Thread-18生产了产品33

3 park与unpark

LockSupport类是Java6(JSR166-JUC)引入的一个类,用来创立锁和其余同步工具类的根本线程阻塞原语。应用LockSupport类中的park()和unpark()办法也能够实现线程的阻塞与唤醒。Park有停车的意思,假如线程为车辆,那么park办法代表着停车,而unpark办法则是指车辆启动来到。

    public static void park() {        UNSAFE.park(false, 0L);    }    public static void park(Object blocker) {        Thread t = Thread.currentThread();        setBlocker(t, blocker);        UNSAFE.park(false, 0L);        setBlocker(t, null);    }    public static void unpark(Thread thread) {        if (thread != null)            UNSAFE.unpark(thread);    }

归根到底还是调用了UNSAFE类中的函数:

    public native void unpark(Object var1);    public native void park(boolean var1, long var2);

与 wait/notify 相比,park/unpark 办法更贴近操作系统层面的阻塞与唤醒线程,并不需要获取对象的监视器

park/unpark 原理可参考LockSupport中的park与unpark原理一文。

须要明确的是,每个java线程都有一个Parker对象,次要三局部组成 _counter、 _cond和 _mutex。Parker类是这样定义的:

class Parker : public os::PlatformParker {private:  //示意许可  volatile int _counter ;  ...public:  Parker() : PlatformParker() {    //初始化_counter    _counter       = 0 ; ...public:  void park(bool isAbsolute, jlong time);  void unpark();  ...}class PlatformParker : public CHeapObj<mtInternal> {  protected:    pthread_mutex_t _mutex [1] ;    pthread_cond_t  _cond  [1] ;    ...}

Parker类里的_counter字段,就是用来记录所谓的“许可”的。当调用park时,这个变量置为了0;当调用unpark时,这个变量置为1

park和unpark的灵便之处在于,unpark函数能够先于park调用。比方线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现曾经有“许可”了,那么它会马上再持续运行。

先调用park再调用upark时

1.先调用park

(1)以后线程调用 Unsafe.park() 办法,查看_counter状况(为0),取得 _mutex 互斥锁。

(2)mutex对象有个期待队列 _cond,线程进入期待队列中阻塞。

(4)设置 _counter = 0。

2.再调用upark

(1)调用 Unsafe.unpark办法,设置 _counter 为 1

(2)唤醒 _cond 条件变量中的 阻塞线程,线程复原运行。

(3)设置 _counter 为 0

先调用upark再调用park时

(1)调用 Unsafe.unpark办法,设置 _counter 为 1。

(2)以后线程调用 Unsafe.park() 办法。

(3)查看 _counter ,本状况为 1,这时线程无需阻塞,持续运行。

(4)设置 _counter 为 0。

特地留神的是,LockSupport是不可重入的,如果一个线程间断2次调用LockSupport.park(),那么该线程肯定会始终阻塞上来。

    public static void main(String[] args) throws Exception {        Thread thread = Thread.currentThread();        LockSupport.unpark(thread);        System.out.println("线程处于运行状态");        LockSupport.park();        System.out.println("线程处于阻塞状态");        LockSupport.park();        System.out.println("线程处于阻塞状态");        LockSupport.unpark(thread);        System.out.println("线程处于运行状态???");    }

运行后果如下:

线程处于运行状态线程处于阻塞状态

可见,在第二次调用park后,线程无奈再获取许可呈现了死锁。

参考资料

JAVA并发编程的艺术

Java精通并发-透过openjdk源码剖析wait与notify办法的本地实现

LockSupport中的park与unpark原理

Java的LockSupport.park()实现剖析