乐趣区

关于java:10分钟搞定-Java-并发队列好吗好的

| 难看请赞,养成习惯

  • 你有一个思维,我有一个思维,咱们替换后,一个人就有两个思维
  • If you can NOT explain it simply, you do NOT understand it well enough

现陆续将 Demo 代码和技术文章整顿在一起 Github 实际精选,不便大家浏览查看,本文同样收录在此,感觉不错,还请 Star


前言

如果依照用处与个性进行粗略的划分,JUC 包中蕴含的工具大体能够分为 6 类:

  1. 执行者与线程池
  2. 并发队列
  3. 同步工具
  4. 并发汇合
  5. 原子变量

在并发系列中,次要解说了 执行者与线程池 同步工具 ,在剖析源码时,或多或少的提及到了「队列」,队列在 JUC 中也是多种多样存在,所以本文就以「远看」视角,帮忙大家疾速理解与辨别这些看似「芜杂」的队列

并发队列

Java 并发队列依照实现形式来进行划分能够分为 2 种:

  1. 阻塞队列
  2. 非阻塞队列

如果你曾经看完并发系列锁的实现,你曾经可能晓得他们实现的区别:

前者就是基于锁实现的,后者则是基于 CAS 非阻塞算法实现的

常见的队列有上面这几种:

霎时懵逼?看到这个没有兽性的图想间接走人?主观先别急,一会就柳暗花明了

当下你兴许有个问题:

为什么会有这么多种队列的存在

锁有应答各种情景的锁,队列也天然有应答各种情景的队列了, 是不是也有点繁多职责准则的意思呢?

所以咱们要理解这些队列到底是怎么设计的?以及用在了哪些地方?

先来看下图

如果你在 IDE 中关上以上非阻塞队列和阻塞队列,查看其实现办法,你就会发现,阻塞队列 非阻塞队列 额定反对两种操作

  1. 阻塞的插入

    当队列满时,队列会阻塞插入元素的线程,直到队列不满

  2. 阻塞的移除

    当队列为空时,获取元素的线程会阻塞,直到队列变为非空

综合阐明入队 / 出队操作,看似芜杂的办法,用一个表格就能概括了

抛出异样

  • 当队列满时,此时如果再向队列中插入元素,会抛出 IllegalStateException(这很好了解)
  • 当队列空时,此时如果再从队列中获取元素,会抛出 NoSuchElementException(这也很好了解)

返回非凡值

  • 当向队列插入元素时,会返回元素是否插入胜利,胜利则返回 true
  • 当从队列移除元素时,如果没有则返回 null

始终阻塞

  • 当队列满时,如果 生产者线程 向队列 put 元素,队列会始终阻塞生产者线程,直到队列可用或者响应中断退出
  • 当队列为空时,如果 消费者线程 从队列外面 take 元素,队列会阻塞消费者线程,直到队列不为空

对于阻塞,咱们其实早在 并发编程之期待告诉机制 就曾经充分说明过了,你还记得上面这张图吗?原理其实是一样一样滴

超时退出

和锁一样,因为有阻塞,为了灵便应用,就肯定反对超时退出,阻塞工夫达到超时工夫,就会间接返回

至于为啥插入和移除这么多种单词示意模式,我也不晓得,为了不便记忆,只须要记住阻塞的办法模式即可:

单词 puttake 字母 t 首位相连,一个放,一个拿

到这里你应该对 Java 并发队列有了个初步的意识了,原来看似芜杂的办法貌似也有了法则。接下来就到了疯狂串知识点的时刻了,借助前序章节的常识,分分钟就了解全副队列了

ArrayBlockingQueue

之前也说过,JDK 中的命名还是很考究滴,一看这名字,底层就是数组实现了,是否有界,那就看在结构的时候是否须要指定 capacity 值了

填鸭式的阐明也容易忘,这些都是哪看到的呢?在所有队列的 Java docs 的第一段,一句话就概括了该队列的次要个性,所以强烈建议大家本人在看源码时,简略 瞄一眼 docs 结尾,心中就有多半个数了

在讲 Java AQS 队列同步器以及 ReentrantLock 的利用 时咱们介绍了偏心锁与非偏心锁的概念,ArrayBlockingQueue 也有同样的概念,看它的构造方法,就有 ReentrantLock 来辅助实现

public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();}

默认状况下,仍旧是不保障线程偏心拜访队列(偏心与否是指阻塞的线程是否依照阻塞的先后顺序拜访队列,先阻塞线拜访,后阻塞后拜访)

到这我也要长期问一个说过屡次的面试送分题了:

为什么默认采纳非偏心锁的形式?它较偏心锁形式有什么益处,又可能带来哪些问题?

晓得了以上内容,联合下面表格中的办法,ArrayBlockingQueue 就能够轻松过关了

和数组绝对的天然是链表了

LinkedBlockingQueue

LinkedBlockingQueue 也算是一个有界阻塞队列,从上面的构造函数中你也能够看出,该队列的默认和最大长度为 Integer.MAX_VALUE,这也就 docs 说 optionally-bounded 的起因了

public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();
  this.capacity = capacity;
  last = head = new Node<E>(null);
}

正如 Java 汇合一样,链表模式的队列,其存取效率要比数组模式的队列高。然而在一些并发程序中,数组模式的队列因为具备肯定的可预测性,因而能够在某些场景中取得更高的效率

看到 LinkedBlockingQueue 是不是也有些相熟呢?为什么要应用线程池? 就曾经和它屡次照面了

创立单个线程池

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

创立固定个数线程池

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

面试送分题又来了

应用 Executors 创立线程池很简略,为什么大厂严格要求禁用这种创立形式呢?

PriorityBlockingQueue

PriorityBlockingQueue 是一个反对优先级的无界的阻塞队列,默认状况下采纳天然程序升序排列,当然也有非默认状况自定义优先级,须要排序,那天然要用到 Comparator 来定义排序规定了

能够定义优先级,天然也就有相应的限度,以及应用的注意事项

  • 依照上图阐明,队列中不容许存在 null 值,也不容许存在不能排序的元素
  • 对于排序值雷同的元素,其序列是不保障的,但你能够持续自定义其余能够辨别进去优先级的值,如果你有严格的优先级辨别,倡议有更欠缺的比拟规定,就像 Java docs 这样

     class FIFOEntry<E extends Comparable<? super E>>
         implements Comparable<FIFOEntry<E>> {static final AtomicLong seq = new AtomicLong(0);
       final long seqNum;
       final E entry;
       public FIFOEntry(E entry) {seqNum = seq.getAndIncrement();
         this.entry = entry;
       }
       public E getEntry() { return entry;}
       public int compareTo(FIFOEntry<E> other) {int res = entry.compareTo(other.entry);
         if (res == 0 && other.entry != this.entry)
           res = (seqNum < other.seqNum ? -1 : 1);
         return res;
       }
     }
  • 队列容量是没有下限的,然而如果插入的元素超过负载,有可能会引起 OutOfMemory 异样(这是必定的),这也是为什么咱们通常所说,队列无界,心中有界
  • PriorityBlockingQueue 也有 put 办法,这是一个阻塞的办法,因为它是无界的,天然不会阻塞,所以就有了上面比拟聪慧的做法

    public void put(E e) {offer(e); // never need to block  请自行对照下面表格
    }
  • 能够给定初始容量,这个容量会依照肯定的算法主动裁减

    // Default array capacity.
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
    public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);
    }

    这里默认的容量是 11,因为也是基于数组,那面试送分题又来了

    你通常是怎么定义容器 / 汇合初始容量的?有哪些根据?

DelayQueue

DelayQueue 是一个反对延时获取元素的无界阻塞队列

  • 是否延时必定是和某个工夫 (通常和以后工夫) 进行 比拟
  • 比拟过后还要进行排序,所以也是存在肯定的 优先级

看到这兴许感觉这有点和 PriorityBlockingQueue 很像,没错,DelayQueue 的外部也是应用 PriorityQueue

上图绿色框线也通知你,DelayQueue 队列的元素必须要实现 Depayed 接口:

所以从上图能够看出应用 DelayQueue 非常简单,只须要两步:

实现 getDelay() 办法,返回元素要延时多长时间

public long getDelay(TimeUnit unit) {
      // 最好采纳纳秒模式,这样更准确
    return unit.convert(time - now(), NANOSECONDS);
}

实现 compareTo() 办法,比拟元素程序

public int compareTo(Delayed other) {if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

下面的代码哪来的呢?如果你关上 ScheduledThreadPoolExecutor 里的 ScheduledFutureTask,你就看到了(ScheduledThreadPoolExecutor 外部就是利用 DelayQueue)

所以综合来说,上面两种状况非常适合应用 DelayQueue

  • 缓存零碎的设计:用 DelayQueue 保留缓存元素的有效期,应用一个线程循环查问 DelayQueue,如果能从 DelayQueue 中获取元素,阐明缓存有效期到了
  • 定时任务调度:用 DelayQueue 保留当天会执行的工作以及工夫,如果能从 DelayQueue 中获取元素,工作就能够开始执行了。比方 TimerQueue 就是这样实现的

SynchronousQueue

这是一个 不存储元素 的阻塞队列,不存储元素还叫队列?

没错,SynchronousQueue 直译过去叫 同步队列,如果在队列外面呆久了应该就算是“异步”了吧

所以应用它,每个 put() 操作必须要期待一个 take() 操作,反之亦然,否则不能持续增加元素

理论中怎么用呢?如果你须要两个线程之间同步共享变量,如果不必 SynchronousQueue 你可能会抉择用 CountDownLatch 来实现,就像这样:

ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);



Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();};



Runnable consumer = () -> {
    try {countDownLatch.await();
        Integer consumedElement = sharedState.get();} catch (InterruptedException ex) {ex.printStackTrace();
    }
};

这点小事就用计数器来实现,显然很不适合,用 SynchronousQueue 革新一下,感觉霎时就不一样了

ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {queue.put(producedElement);
    } catch (InterruptedException ex) {ex.printStackTrace();
    }
};

Runnable consumer = () -> {
    try {Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {ex.printStackTrace();
    }
};

其实 Executors.newCachedThreadPool() 办法外面应用的就是 SynchronousQueue

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

看到后面 LinkedBlockingQueue 用在 newSingleThreadExecutornewFixedThreadPool 上,而newCachedThreadPool 却用 SynchronousQueue,这是为什么呢?

因为单线程池和固定线程池中,线程数量是无限的,因而提交的工作须要在 LinkedBlockingQueue 队列中期待空余的线程;

而缓存线程池中,线程数量简直有限(下限为Integer.MAX_VALUE),因而提交的工作只须要在SynchronousQueue 队列中同步移交给空余线程即可, 所以有时也会说 SynchronousQueue 的吞吐量要高于 LinkedBlockingQueueArrayBlockingQueue

LinkedTransferQueue

简略来说,TransferQueue 提供了一个场合,生产者线程应用 transfer 办法传入一些对象并阻塞,直至这些对象被消费者线程全副取出。

你有没有感觉,刚刚介绍的 SynchronousQueue 是否很像一个容量为 0 的 TransferQueue

但 LinkedTransferQueue 相比其余阻塞队列多了三个办法

  • transfer(E e)

    如果以后有消费者正在期待生产元素,transfer 办法就能够间接将生产者传入的元素立即 transfer (传输) 给消费者;如果没有消费者期待生产元素,那么 transfer 办法会把元素放到队列的 tail(尾部)

    节点,始终阻塞,直到该元素被消费者生产才返回

  • tryTransfer(E e)

    tryTransfer,很显然是一种尝试,如果没有消费者期待生产元素,则马上返回 false,程序不会阻塞

  • tryTransfer(E e, long timeout, TimeUnit unit)

    带有超时限度,尝试将生产者传入的元素 transfer 给消费者,如果超时工夫到,还没有消费者生产元素,则返回 false

你瞧,所有阻塞的办法都是一个套路:

  1. 阻塞形式
  2. 带有 try 的非阻塞形式
  3. 带有 try 和超时工夫的非阻塞形式

看到这你兴许感觉 LinkedTransferQueue 没啥特点,其实它和其余阻塞队列的差异还挺大的:

BlockingQueue 是如果队列满了,线程才会阻塞;然而 TransferQueue 是如果没有生产元素,则会阻塞(transfer 办法)

这也就应了 Doug Lea 说的那句话:

LinkedTransferQueue is actually a superset of ConcurrentLinkedQueue, SynchronousQueue (in“fair”mode), and unbounded
LinkedBlockingQueues. And it’s made better by allowing you to mix and
match those features as well as take advantage of higher-performance i
mplementation techniques.

简略翻译:

LinkedTransferQueueConcurrentLinkedQueue, SynchronousQueue (在偏心模式下), 无界的LinkedBlockingQueues 等的超集; 容许你混合应用阻塞队列的多种个性

所以,在适合的场景中,请尽量应用LinkedTransferQueue

下面都看的是单向队列 FIFO,接下来咱们看看双向队列

LinkedBlockingDeque

LinkedBlockingDeque 是一个由链表构造组成的双向阻塞队列,但凡后缀为 Deque 的都是双向队列意思,后缀的发音为deck——/dek/, 刚接触它时我认为是这个冰激凌的发音

所谓双向队列值得就是能够从队列的两端插入和移除元素。所以:

双向队列因为多了一个操作队列的入口,在多线程同时入队是,也就会缩小一半的竞争

队列有头,有尾,因而它又比其余阻塞队列多了几个非凡的办法

  • addFirst
  • addLast
  • xxxxFirst
  • xxxxLast
  • … …

这么一看,双向阻塞队列的确很高效,

那双向阻塞队列利用在什么中央了呢?

不晓得你是否听过“工作窃取 ”模式,看似不太厚道的一种办法,实则是高效利用线程的好方法。下一篇文章,咱们就来看看 ForkJoinPool 是如何利用“ 工作窃取”模式的

总结

到这对于 Java 队列(其实次要介绍了阻塞队列)就疾速的辨别完了,将看似芜杂的办法做了分类整理,不便疾速了解其用处,同时也阐明了这些队列的理论用处。置信你带着更高的视角来浏览源码会更加轻松,最初也心愿大家认真看两个队列的源码实现,在遇到队列的问题,脑海中的画面分分钟就能够搞定了

参考

  1. Java 并发编程的艺术
  2. Java 并发编程之美
  3. https://zhuanlan.zhihu.com/p/…

日拱一兵 | 原创

退出移动版