乐趣区

关于java:Java并发JUC下

并发容器

JDK 提供的并发容器大部分在 java。util.concurrent 包中。比拟罕用的有:

  • ConcurrentHashMap:线程平安版 HashMap。
  • ConcurrentLinkedQueue:线程平安版 LinkedList。
  • ConcurrentSkipListMap:线程平安版跳表 Map。
  • CopyOnWriteArrayList:线程平安版 List,然而不是通过锁实现。在读多写少的场合性能十分好。
  • LinkedBlockQueue:线程平安的阻塞队列。
  • PriorityBlockingQueue:反对优先级的无界阻塞队列。

ConcurrentHashMap

ConcurrentHashMap 的数据组织和 HashMap 基本相同。通过一个数组来实现 Hash 桶,当没产生 Hash 抵触时,每个 Hash 桶内都保留一个 Key-Value Entry(Node 对象)。对桶内数据的批改都是通过 CAS 操作进行的,因为数组中的元素没法申明为 volatile, 所以从哈希表中读取数据时,应用到了 UNSAFE 的 getObjectVolatile 函数。

/**
 * The bin count threshold for using a tree rather than list for a * bin.  Bins are converted to trees when adding an element to a * bin with at least this many nodes. The value must be greater * than 2, and should be at least 8 to mesh with assumptions in * tree removal about conversion back to plain bins upon * shrinkage. */
static final int TREEIFY_THRESHOLD = 8;

/**
 * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. */
transient volatile Node<K,V>[] table;

SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

ConcurrentHashMap 只是用到了对象 hash 码的负数局部,因为它把一些正数的 Hash 码用来形容状态了。比方用 -1 表白以后节点正在迁徙,-2 示意以后节点时一个红黑树的根。-3 示意以后节点是一个保留节点。

/*
 * Encodings for Node hash fields. See above for explanation. */
static final int MOVED     = -1; // hash for forwarding nodes static final int TREEBIN   = -2; // hash for roots of trees static final int RESERVED  = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash 
/**
 * Spreads (XORs) higher bits of hash to lower and also forces top * bit to 0. Because the table uses power-of-two masking, sets of * hashes that vary only in bits above the current mask will * always collide. (Among known examples are sets of Float keys * holding consecutive whole numbers in small tables.)  So we * apply a transform that spreads the impact of higher bits * downward. There is a tradeoff between speed, utility, and * quality of bit-spreading. Because many common sets of hashes * are already reasonably distributed (so don't benefit from * spreading), and because we use trees to handle large sets of * collisions in bins, we just XOR some shifted bits in the * cheapest possible way to reduce systematic lossage, as well as * to incorporate impact of the highest bits that would otherwise * never be used in index calculations because of table bounds. */
static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;
}

当产生 Hash 抵触时,先通过链表来保留 Hash 雷同的所有 Key-Value Entry(Node 对象)。从上面 Node 的实现中,咱们能够看到它实际上就是一个链表的实现(蕴含 next 指针)。

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    //...     /**
 * Virtualized support for map.get(); overridden in subclasses. */
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

当链表的数量大于 TREEIFY_THRESHOLD(8)时,会用红黑树的 Node 代替链表来保留 Key-Value Entry。红黑树是一个自均衡的二叉树,能以 LogN 的工夫复杂度批改和查找数据。

/**
 * Nodes for use in TreeBins */
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links     TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion     boolean red;

    Node<K,V> find(int h, Object k) {return findTreeNode(h, k, null);
    }

    /**
 * Returns the TreeNode (or null if not found) for the given key * starting at given root. */
    final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {if (k != null) {
            TreeNode<K,V> p = this;
            do  {
                int ph, dir; K pk; TreeNode<K,V> q;
                TreeNode<K,V> pl = p.left, pr = p.right;
                if ((ph = p.hash) > h)
                    p = pl;
                else if (ph < h)
                    p = pr;
                else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                    return p;
                else if (pl == null)
                    p = pr;
                else if (pr == null)
                    p = pl;
                else if ((kc != null ||
                          (kc = comparableClassFor(k)) != null) &&
                         (dir = compareComparables(kc, k, pk)) != 0)
                    p = (dir < 0) ? pl : pr;
                else if ((q = pr.findTreeNode(h, k, kc)) != null)
                    return q;
                else
                    p = pl;
            } while (p != null);
        }
        return null;
    }
}

介绍完次要的外部数据结构,咱们来看一看 hash 表的初始化局部。这外面用到了一个 sizeCtl,它初始保留的是 HashMap 初始大小,在实现 hash 表的初始化之后,它保留的是下次进行扩容时的表内数据的数量。在进行初始化时,sizeCtl 还充当了锁的角色,咱们须要通过它来管制进行初始化工作的线程数量,只让一个线程进行初始化,其余线程期待。初始化实现后,sizeCtl 保留了下次进行扩容时,须要的数据数量,计算规定是 0.75 * 以后容量。而当进行扩容时,sizeCtl 又起到了记录并发扩容线程数的作用。

/**
 * Table initialization and resizing control.  When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads).  Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */
private transient volatile int sizeCtl;
/**
 * Initializes table, using the size recorded in sizeCtl. */
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0) // 有其余线程在初始化,间接 yield             Thread.yield(); // lost initialization race; just spin         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 加锁             try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    // 初始化                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 初始化实现后,sizeCtl 保留了下次进行扩容时,须要的数据数量,计算规定是 0.75 * 以后容量                     sc = n - (n >>> 2);
                }
            } finally {sizeCtl = sc;}
            break;
        }
    }
    return tab;
}

接下来咱们介绍一下增加数据时的解决逻辑。

  1. 必要时先进行初始化
  2. 如果以后 key 所在槽位为空,通过 CAS 创立初始 Node,其中间接保留了 key value,如果胜利则间接返回
  3. 否则,查看是否正在进行扩容,多线程一起扩容
  4. 走到这,阐明以后 hash 表槽位曾经被占,这时候咱们须要对该槽位保留的 Node 加锁,该 Node 可能是链表的头也可能是红黑树的 ” 树根 ”
  5. 加锁胜利后,要确保锁没加错对象,因为在此之前可能别的线程曾经把这个槽位的 Node 由链表改成了红黑树
  6. 接下来依据 Node 节点的 hash 进行分状况解决,hash 码大于 0 阐明以后是链表

    1.  查看对应的 key 是不是曾经在链表中,则间接批改
    2.  查看到尾结点依然没找到对应的 key,则在尾部增加节点
    
  7. 否则如果 Node 节点是红黑树的树根节点类型,则在红黑树中增加或批改节点,这外面须要对数进行均衡,这里就不开展介绍了,在算法那篇文章中有红黑树的介绍
  8. 增加完数据之后,如果该槽位的 Node 是链表,则查看链表长度,如果链表长度大于等于 8 则适时地将其转换为红黑树
  9. 如果对应的 key 是第一次 put 进 map 中,则批改以后数据数量,并适时地扩容
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果没有初始化,进行初始化         if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果以后 key 所在槽位为空,通过 CAS 创立初始 Node,其中间接保留了 key value         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin         }
        // 扩容中,帮忙一起扩容,多线程扩容         else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 对 Node 加锁             synchronized (f) {if (tabAt(tab, i) == f) {// 确保加锁后,锁没加错对象,因为在此之前可能别的线程曾经把这个槽位的 Node 由链表改成了红黑树                     if (fh >= 0) { // hash 码大于 0 阐明以后是链表                         binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 查看对应的 key 是不是曾经在链表中                                 oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // 查看到尾结点依然没找到对应的 key,则在尾部增加节点                                 pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // 如果 Node 节点是红黑树的树根节点类型,则在红黑树中增加节点                         Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {// 如果该槽位的 Node 是链表,则查看链表长度,如果链表长度大于等于 8 则适时地将其转换为红黑树                 if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                // 如果发现笼罩了之前的值,则不进行后续扩容,间接返回后果                     return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

介绍一下扩容逻辑,这里咱们以 addCount 函数为例,它会在每次 putVal 增加了新元素之后调用,其中 x 是减少的元素数量,check 示意是否要进行扩容查看,规定是 check < 0 不进行查看(罕用与移除元素时),check <=1 在没有竞争的时候查看:

  • putVal 因槽位为 null 而新增加元素时(check=0)
  • putVal 时曾经存在元素,而且该元素是链表构造,如果指标 key 是链表的头结点(check=1), 或者链表只有一个元素(check=1),而当头结点不是指标 key 或者链表长度大于 1 时(check>1)
  • putVal 时, 如果对应槽位保留的是红黑树节点,则 check= 2
  • remove 函数移除元素时,check=-1

在 ConcurrentHashMap 中,为了拉满性能,对数据 size 的保护也进行了优化,它的优化策略很像 linux 中多 cpu 联结计数器的思路。ConcurrentHashMap 有一个基计数器 baseCount,所有线程在减少 size 时,先通过 CAS 对 baseCount 进行批改,如果批改失败,它会为以后线程开拓一个服务于以后线程的计数器(以相似于哈希表的模式存储),不过这个计数器也会发生冲突,当发生冲突时,个别采纳扩容和从新 hash 的形式解决,通过种种操作,升高互斥时长。光说的话有点形象,咱们看一下相干代码吧。

  1. 如果线程独享的计数器 hash 表 counterCells 不为空或者通过 CAS 批改 baseCount 失败的话,阐明 baseCount 上呈现了竞争,对 size 的计算须要通过线程独享的计数器来实现
  2. 紧接着,如果 counterCells 为空, 或者 counterCells 大小为 0, 或者以后线程还没有调配 counterCells 槽位,或者从属于以后线程的 counterCell 计数器也发生冲突时,会通过 fullAddCount 进行 counterCells hash 表的创立,或为以后线程调配 counterCells 槽位,或 counterCells 哈希表扩容,或者 rehash 等操作来躲避竞争
  3. 如果存在 baseCount 竞争,并且 check <= 1 则不进行扩容查看
  4. 通过 baseCount 加所有 counterCells 的值统计共计 size
private final void addCount(long x, int check) {CounterCell[] as; long b, s;
    // 统计容量 size, 执行加一操作     if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        // 如果线程独享的计数器 hash 表 counterCells 不为空或者通过 CAS 批改 baseCount 失败的话,阐明 baseCount 上呈现了竞争,对 size 的计算须要通过线程独享的计数器来实现         CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// counterCells 为空, 或者 counterCells 大小为 0, 或者以后线程还没有调配 counterCells 槽位,或者从属于以后线程的 counterCell 计数器也发生冲突时             // 会通过 fullAddCount 进行 counterCells hash 表的创立,或为以后线程调配 counterCells 槽位,或 counterCells 哈希表扩容,或者 rehash 等操作来躲避竞争             fullAddCount(x, uncontended);
            return;
        }
        // 如果存在 baseCount 竞争,并且 check <= 1 则不进行扩容查看         if (check <= 1)
            return;
        // 通过 baseCount 加所有 counterCells 的值统计共计 size         s = sumCount();}
    // ... }

其中 sumCount 比较简单,就是把 baseCount 和所有 counterCells 的值加起来。

final long sumCount() {CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

fullAddCount 的实现很简单,咱们这里制作简略的介绍,不往深挖。

  1. 在每个线程中,会调配一个探针值,这个探针值通过 localInit 进行初始化,我感觉这里大家就把他简略地了解为线程中保留的随机数,它保留在 java.lang.Thread#threadLocalRandomProbe 字段,通过 Contended 注解来解决伪共享问题。
  2. 如果 counterCells hash 表等于空(代码在 fullAddCount 的后半段),就初始化 counterCells hash 表,初始大小是 2,创立好之后,对以后线程对应的槽位进行赋值。所有对 counterCells 的批改都是通过一个 CELLSBUSY 自旋锁进行爱护的
  3. 如果创立 counterCells hash 表的过程也产生了抵触就从新通过 baseCount 进行 size 的更新, 代码在 fullAddCount 的最初几行
  4. 如果 counterCells hash 表不为空,通过后面失去的线程探针值与 counterCells hash 表的容量 -1 相与,失去所属的槽位
  5. 如果所属槽位为空,先加 CELLSBUSY 自旋锁,而后创立 CounterCell 对象并存在对应槽位,如果这一步操作胜利了的话,就返回
  6. 如果上述操作没有胜利,阐明呈现了很重大的抵触,这里先试着对以后线程对应的槽位 CounterCell 进行更新,如果胜利就返回
  7. 如果上述操作都失败,就对 counterCells hash 表进行扩容,扩为原来的 2 倍,而后从新执行上述操作
  8. 如果上述操作全失败,而且扩容的时候还发生冲突,就重置以后线程的探针值,相当于再换一个随机数
// See LongAdder version for explanation private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {// 0 示意未调配探针值         ThreadLocalRandom.localInit();      // force initialization         // 对探针进行初始化         h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty     for (;;) {CounterCell[] as; CounterCell a; int n; long v;
        if ((as = counterCells) != null && (n = as.length) > 0) {// 如果 counterCells hash 表不为空,后面失去的线程探针值与 counterCells hash 表的容量 -1 相与,失去所属的槽位,if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) {// Try to attach new Cell                     CounterCell r = new CounterCell(x); // Optimistic create                     if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {// Recheck under lock                             CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {rs[j] = r;
                                created = true;
                            }
                        } finally {cellsBusy = 0;}
                        if (created)
                            break;
                        continue;           // Slot is now non-empty                     }
                }
                collide = false;
            }
            // 如果上述操作没有胜利,阐明呈现了很重大的抵触,else if (!wasUncontended)       // CAS already known to fail                 wasUncontended = true;      // Continue after rehash             else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            else if (counterCells != as || n >= NCPU)
                collide = false;            // At max size or stale             else if (!collide)
                collide = true;
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                //  如果上述操作都失败,就对 counterCells hash 表进行扩容,扩为原来的 2 倍,而后从新执行上述操作                 try {if (counterCells == as) {// Expand table unless stale                         CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {cellsBusy = 0;}
                collide = false;
                continue;                   // Retry with expanded table             }
            // 如果上述操作全失败,而且扩容的时候还发生冲突,就重置以后线程的探针值,相当于再换一个随机数             h = ThreadLocalRandom.advanceProbe(h);
        }
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            // 如果 counterCells hash 表等于空,就初始化 counterCells hash 表,所有对 counterCells 的批改都是通过一个 CELLSBUSY 自旋锁进行爱护的             boolean init = false;
            try {// Initialize table                 if (counterCells == as) {// hash 表的初始大小为 2,将以后线程对应的槽位进行设置                     CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {cellsBusy = 0;}
            if (init)
                break;
        }
        // 创立 counterCells hash 表的过程也产生了抵触就从新通过 baseCount 进行 size 的更新         else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base     }
}

ConcurrentLinkedQueue

Java 提供的线程平安的 Queue 能够分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在理论利用中要依据理论须要选用阻塞队列或者非阻塞队列。阻塞队列能够通过加锁来实现,非阻塞队列能够通过 CAS 操作实现。

ConcurrentLinkedQueue 应用了链表作为其数据结构.外部应用 CAS 来进行链表的保护。ConcurrentLinkedQueue 适宜在对性能要求绝对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的老本较高则适宜应用无锁的 ConcurrentLinkedQueue 来代替。

接下来咱们简略地看一下 ConcurrentLinkedQueue 的实现,在 ConcurrentLinkedQueue 中所有数据通过单向链表存储,同时咱们还会保留该链表的头指针和尾指针。

// 链表中的节点 private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
    //... }
/**
 * A node from which the first live (non-deleted) node (if any) * can be reached in O(1) time. * Invariants: * - all live nodes are reachable from head via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * Non-invariants: * - head.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail *   to not be reachable from head! */
private transient volatile Node<E> head;

/**
 * A node from which the last node on list (that is, the unique * node with node.next == null) can be reached in O(1) time. * Invariants: * - the last node is always reachable from tail via succ() * - tail != null * Non-invariants: * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail *   to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */
private transient volatile Node<E> tail;

在对象实例化时,会创立一个虚节点。看到前面你会发现,如果想通过 CAS 保护一个链表,个别都会应用到虚节点。

public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);
}

介绍完外部数据结构,咱们看一下增删节点的实现形式。先来看一下减少数据的逻辑:

  1. 入队操作是在一个循环中尝试 CAS 操作,首先判断,尾结点 p.next 是不是 null,是的话就通过 CAS 将 null-> newNode,如果 CAS 胜利,阐明该节点就曾经算是退出到队列中了
  • 然而这里并没有间接批改尾结点,因为 ConcurrentLinkedQueue 中 tail 并不一定是实际上的尾结点,在并发很大时,如果所有线程都要去竞争批改尾结点的话,对性能会有影响,所以,当理论的尾结点(代码中的变量 p)不等于 tail 时,才会进行更新。
  • 在 ConcurrentLinkedQueue 中会呈现 Node1(head)->Node2(tail)->null 以及 Node1(head)->Node2(tail)->Node3->null 这样的状况甚至 Node1(head)->Node2(tail)->Node3->Node4 这样的状况,尽管 tail 指针没有间接指向尾结点会导致将新节点退出链表时,须要从 tail 向后查找理论的尾结点,然而这个过程相较于对 tail 节点的竞争来说,影响较小,最终效率也更高
  1. 如果发现以后 p 节点不是实际上的尾结点,会先查看它的 next 指针是否指向本人,在出队函数 poll 中,将一个元素出队后会把它的 next 指针指向本人,所以这一步实际上是判断以后的 p 节点是否曾经出队
  • 如果满足上述情况,咱们须要从新获取 tail 指针,如果发现在上述过程中 tail 指针产生了变动,这阐明期间曾经好有个并发插入过程实现了,咱们间接从最新的 tail 对象开始上述流程即可,所以这里就将 p 赋为最新的 tail 指向的对象,
  • 如果整个过程中 tail 指针都没变,阐明以后的状况相似于 Node1(head,tail)-> Node2->null, 然而在判断 p == q 之前,产生了出队操作,状态变成了 Node1(tail, 曾经出队的对象) Node2(head)->null,这个时候咱们要将 p 设置为 head 而后从 head 开始向后遍历
  1. 最初就是单纯的没有遍历到尾结点的状况了,Node1(head)->Node2(tail,以后 p 变量)->Node3(以后 q 变量)->null
  • 如果发现曾经进行过一次向后遍历的过程,即 p != t,并且 tail 指针产生了变动,咱们就间接应用 tail 指针,不再向后遍历了 p = t(最新的 tail 指针)
  • 如果不满足上述情况,比方还素来没遍历过,或者尽管遍历过然而 tail 指针没变,咱们就持续遍历 p = q(p.next)
public boolean offer(E e) {checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {// p is last node             // 找到了最初一个节点,通过 CAS 将其 next 指向新节点             if (p.casNext(null, newNode)) {// Successful CAS is the linearization point                 // for e to become an element of this queue,                 // and for newNode to become "live".                 // 如果 tail.next 为 null 就不批改 tail,tail.next != null 时才会批改                 // 这里会呈现多个线程同时发现 tail.next != null 的状况,所以 tail 指针和理论的尾结点的间隔不肯定是 1                 if (p != t) // hop two nodes at a time                     casTail(t, newNode);  // Failure is OK. 因为没有要求 tail 指针和理论的尾结点的间隔是 1                 return true;
            }
            // Lost CAS race to another thread; re-read next         }
        else if (p == q)
            // 如果发现以后 p 节点不是实际上的尾结点,会先查看它的 next 指针是否指向本人,在出队函数 poll 中,将一个元素出队后会把它的 next 指针指向本人,所以这一步实际上是判断以后的 p 节点是否曾经出队             // 如果 tail 指针产生了变动,就从最新的 tail 开始遍历             // 否则,从 head 开始遍历,因为这时候 tail 可能指向了一个死掉 (next 指向本人,曾经从队列中移除) 的节点             // We have fallen off list.  If tail is unchanged, it             // will also be off-list, in which case we need to             // jump to head, from which all live nodes are always             // reachable.  Else the new tail is a better bet.             p = (t != (t = tail)) ? t : head;
        else
            // 最初就是单纯的没有遍历到尾结点的状况了             // 如果发现曾经进行过一次向后遍历的过程,并且 tail 指针产生了变动,咱们就间接应用 tail 指针             // 如果还素来没遍历过,或者尽管遍历过然而 tail 指针没变,咱们就持续遍历             // Check for tail updates after two hops.             p = (p != t && t != (t = tail)) ? t : q;
    }
}

最初,咱们介绍一下出队的操作,整个出队过程也是在一个 CAS 循环中进行:

  1. 首先咱们查看头指针的 p(head).item 是不是 null,不是的话才阐明该节点是一个无效节点,因为初始化是创立的虚节点 item 才等于 null,这里通过 item 是不是 null 来判断是不是虚节点也就是说 ConcurrentLinkedQueue 中不能增加值为 null 的节点
  • 找到无效节点后,通过 cas 将 item 改为 null,后续的操作和增加元素时相似,因为 head 指针也是一个竞争点,所以这里并没有间接批改 head 指针,而是发现从 head 至多向后遍历过一次时,才会批改 head 指针,这和 offer 中的形式相似
  • 如果以后线程要负责批改 head 指针,会判断 刚删掉的节点 p 的 next 是不是 null,是的话就让 p 作为 head(此时 p 充当新的虚节点),如果不是的话,就让 p.next 作为 next(此时 head 就是实际上的头结点)
  1. 如果 p 的 item == null 或者 cas 失败(别的线程曾经把 p.item 置为 null),咱们要检查一下 p.next 是不是 null,如果是的话阐明 p 曾经是最初一个节点了,咱们须要返回 null,然而在此之前,咱们无妨把 p 设为新的 head 来缩小其余线程的遍历开销
  2. 查看以后 p 节点的 next 指针是不是指向本人,是的话阐明以后查看的这个节点曾经被别的线程从队列中移除了,那咱们就从新开始执行 poll
  3. 否则,让 p = q(p.next),也就是说这是从 head 开始向后遍历的过程
public E poll() {
    restartFromHead:
    for (;;) {for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            // item != null 阐明该节点是一个无效节点, 通过 CAS 将其 item 改为 null             if (item != null && p.casItem(item, null)) {// CAS 胜利阐明曾经移除一个节点了,后续的操作和增加元素时相似,因为 head 指针也是一个竞争点                 // 所以这里并没有间接批改 head 指针,而是发现从 head 至多向后遍历过一次时,才会批改 head 指针,这和 offer 中的形式相似                 // Successful CAS is the linearization point                 // for item to be removed from this queue.                 if (p != h) // hop two nodes at a time                     // 判断刚删掉的节点 p 的 next 是不是 null,是的话就让 p 作为 head(此时 p 充当新的虚节点),// 如果不是的话,就让 p.next 作为 next(此时 head 就是实际上的头结点)updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {// 阐明 p 曾经是最初一个节点了,咱们须要返回 null                 // 然而在此之前,咱们无妨把 p 设为新的 head 来缩小其余线程的遍历开销                 updateHead(h, p);
                return null;
            }
            else if (p == q)
                // 阐明以后查看的这个节点曾经被别的线程从队列中移除了,那咱们就从新开始执行 poll                 continue restartFromHead;
            else
                // p = q(p.next),也就是说这是从 head 开始向后遍历的过程                 p = q;
        }
    }
}

updateHead 的过程中先会查看是不是真的有必要重置 head 指针,有必要的话在通过 CAS 批改 head 指针,如果 CAS 失败了也不妨,毕竟咱们不要求 head 肯定指向理论的头结点,poll 中的遍历过程能 cover 这种状况。如果 CAS 胜利,会将删掉的 head 指针指向本人。

/**
 * Tries to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */
final void updateHead(Node<E> h, Node<E> p) {if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);
}

这里大家可能会有疑难,为什么要 lazySet next 指针呢?要想了解这个问题,咱们须要先了解 putOrderedObjectputObjectVolatile 的区别。因为 Node 中的 next 属性是用 volatile 润饰的,而 volatile 有什么特点呢?一个是避免指令重拍,一个是将其余 CPU cache 中的相干数据有效化,迫使这些 CPU 从新从主存中拉取最新数据。这是通过 Fence(内存屏障) 实现的,在 linux x86 架构中个别是 lock; addl $0,0(%%esp). , 这里的 lock 是一个指令前缀, 它蕴含了 storeload 内存屏障的语义, 前面的 addl $0,0(%%esp) 是一个空指令, 因为 lock 前缀不能独立存在(它不是一条残缺的指令), 所以在应用它的时候个别会在前面跟一条什么都不做的指令。

putObjectVolatile 函数等效于申明一个 volatile 变量,而后间接对该变量进行批改。也就是说,无论是 putObjectVolatile 还是对 volatile 变量的间接批改,都依赖与 StoreLoad barriers,这里 StoreLoad barriers 就是说如果指令的程序是 Store1; StoreLoad; Load2,就须要确保 Store1 保留的数据在 Load2 拜访数据之前,肯定要可能对所有线程可见。对于内存屏障的解释,能够参考这篇手册, 其中介绍了各个内存屏障的要求,以及在不同架构上的实现形式。

putOrderedObject 函数呢,只须要保障以后 cpu 内指令是有序的,不会呈现非法的内存拜访即可,这也就是说,putOrderedObject 没有多解决期间的可见性保障,也就不会有多余的开销。在咱们 ConcurrentLinkedQueue 的场景中,最终将 next 指针指向本人并不需要这么高的可见性需求,而且 next 是润饰为 volatile 的,所以,咱们须要显式地调用 putOrderedObject 能力达到“去 volatile 个性”的成果,从而晋升效率。

对于它们的实现,能够参考如下代码,能够看到 ordered_store 最初插入了一个 Op_MemBarCPUOrder 内存屏障,而 putObjectVolatile 对应了 inline_unsafe_access 中的 is_volatile=true && is_store == true 的逻辑,也就是插入了 Op_MemBarVolatile 内存屏障。

bool LibraryCallKit::inline_unsafe_ordered_store(BasicType type) {// This is another variant of inline_unsafe_access, differing in   // that it always issues store-store ("release") barrier and ensures   // store-atomicity (which only matters for "long"). 
  // ...   if (type == T_OBJECT) // reference stores need a store barrier.     store = store_oop_to_unknown(control(), base, adr, adr_type, val, type);
  else {store = store_to_memory(control(), adr, val, type, adr_type, require_atomic_access);
  }
  insert_mem_bar(Op_MemBarCPUOrder);
  return true;
}

bool LibraryCallKit::inline_unsafe_access(bool is_native_ptr, bool is_store, BasicType type, bool is_volatile) {
  // .... 
  if (is_volatile) {if (!is_store)
      insert_mem_bar(Op_MemBarAcquire);
    else
      insert_mem_bar(Op_MemBarVolatile);
  }

  if (need_mem_bar) insert_mem_bar(Op_MemBarCPUOrder);

  return true;
} 

再来看看 memnode.hpp 中对这两种内存屏障的解释。MemBarVolatileNode 须要保障多 CPU 的可见性,MemBarCPUOrderNode 只须要保障单 CPU 程序即可,而且 CPU 曾经做了所有的排序工作,咱们毋庸多做。

// Ordering between a volatile store and a following volatile load. // Requires multi-CPU visibility class MemBarVolatileNode: public MemBarNode {
public:
  MemBarVolatileNode(Compile* C, int alias_idx, Node* precedent)
    : MemBarNode(C, alias_idx, precedent) {}
  virtual int Opcode() const;};

// Ordering within the same CPU.  Used to order unsafe memory references // inside the compiler when we lack alias info.  Not needed "outside" the // compiler because the CPU does all the ordering for us. class MemBarCPUOrderNode: public MemBarNode {
public:
  MemBarCPUOrderNode(Compile* C, int alias_idx, Node* precedent)
    : MemBarNode(C, alias_idx, precedent) {}
  virtual int Opcode() const;
  virtual uint ideal_reg() const { return 0;} // not matched in the AD file };

ConcurrentSkipListMap

对于一个单链表,即便链表是有序的,如果咱们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率天然就会很低。而跳表是在这个单链表的根底上同时保护了多个链表,并且链表是分层的。

最低层的链表保护了跳表内所有的元素,每下面一层链表都是上面一层的子集。

跳表内的所有链表的元素都是排序的。查找时,能够从顶级链表开始找。一旦发现被查找的元素大于以后链表中的取值,就会转入下一层链表持续找。这也就是说在查找过程中,搜寻是跳跃式的。如上图所示,在跳表中查找元素 18。

查找 18 的时候原来须要遍历 12 次,当初只须要 7 次即可。针对链表长度比拟大的时候,构建索引查找效率的晋升就会非常明显。

应用跳表实现 Map 和应用哈希算法实现 Map 的另外一个不同之处是:哈希并不会保留元素的程序,而跳表内所有的元素都是排序的。因而在对跳表进行遍历时,你会失去一个有序的后果。

在 JDK 的 ConcurrentSkipListMap 实现中,没有应用到锁,而是通过 CAS 来进行数据的批改,当插入数据时,通过 CAS 批改最上层列表的内容,而后再逐层向上保护各级列表(各层列表的批改都是通过 CAS 实现),这两个过程是独立的,因为下层列表保护的数据少也只会影响查找数据的速度,而不会影响到数据的准确性,因为 增加与查找数据都以最上层列表内容为准

CopyOnWriteArrayList

CopyOnWriteArrayList 用于读场景远多于写场景的状况,它可能让读与读之间不互斥,读与写也不互斥,只有写与写之间才会互斥。它的思路也很简略,外部通过一个数组来保护数据,失常读数据时间接通过索引从数组中提取数据。

/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;

@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {return (E) a[index];
}

/**
 * {@inheritDoc} * * @throws IndexOutOfBoundsException {@inheritDoc} */
public E get(int index) {return get(getArray(), index);
}

/**
 * Gets the array.  Non-private so as to also be accessible * from CopyOnWriteArraySet class. */
final Object[] getArray() {return array;}

而写数据时,须要将整个数组都复制一遍,而后在新数组的开端增加最新的数据。最初替换掉原来的数组,这样原来的数组就会被回收。很显然,这种实现形式在减小竞争的同时,承当了数据空间 * 2 的压力。

/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();

/**
 * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) */
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {lock.unlock();
    }
}

/**
 * Sets the array. */
final void setArray(Object[] a) {array = a;}

LinkedBlockQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,能够当做无界队列也能够当做有界队列来应用,满足 FIFO 的个性,为了避免 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创立 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer. MAX_VALUE。那么什么是阻塞队列呢?咱们晓得队列有入队出队两个操作,所谓阻塞队列,就是说如果队列已满时,能够阻塞入队操作,而如果队列为空时,能够阻塞出队操作。

为了实现阻塞成果并保障线程平安,它的外部用到了两个锁和两个 Condition。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

在进行数据出队时,先要取得 takeLock,而后查看以后队列容量是否为 0,如果队列容量为 0,则在 notEmpty 上期待,否则间接执行出队操作。最初判断一下,是不是执行出队操作之前,队列曾经达到最大容量,如果是的话,就唤醒期待中的入队操作。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    // 获取锁     final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {// 如果队列为空,就期待         while (count.get() == 0) {notEmpty.await();
        }
        // 否则,执行出队操作,并批改 size         x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            // 如果队列不为空,则唤醒下一个期待中的出队操作             notEmpty.signal();} finally {takeLock.unlock();
    }
    // 如果之前队列满了,则唤醒期待中的入队操作     if (c == capacity)
        signalNotFull();
    return x;
}

/**
 * Signals a waiting put. Called only from take/poll. */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {notFull.signal();
    } finally {putLock.unlock();
    }
}

入队操作和出队操作正好相同,同样先获取锁,不过这里用的是 putLock,查看以后队列是否已满,是的话就在 notFull 上期待,否则执行入队操作并批改 size,如果之前的队列长度为 0,那么就有可能有一些出队操作被阻塞了,所以咱们这里须要唤醒所有在 notEmpty 上期待的线程。

/**
 * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var     // holding count negative to indicate failure unless set.     int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 先获取锁     putLock.lockInterruptibly();
    try {
        /*
 * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */
        while (count.get() == capacity) {notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();} finally {putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();}

/**
 * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {notEmpty.signal();
    } finally {takeLock.unlock();
    }
}

LinkedBlockingQueue 除了阻塞版的入队出队操作外,当然也有不阻塞的接口,不过这些接口比较简单,基本上就是在上述根底勾销 await 和 signal 逻辑,这里就不再赘述了。

PriorityBlockingQueue

PriorityBlockingQueue 是一个带排序功能的阻塞队列,因为它是一个队列,没必要保障整个队列的外部程序,只须要保障出队时依照排序后果出即可,所以其外部应用了二分堆得模式实现,同时,PriorityBlockingQueue 也是线程平安的,外部通过一个锁来管制堆数据的保护。

PriorityBlockingQueue 的堆数据都保留在如下的 queue 数组中,堆的根节点是 queue[0] , 就像如下正文所说的,咱们能够依据一个节点的下标 n 疾速计算出它的两个子节点的下标,即 queue[2*n+1]queue[2*(n+1)]。这是用数组来形容二分堆(树)的常见办法。

/**
 * Priority queue represented as a balanced binary heap: the two * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The * priority queue is ordered by comparator, or by the elements' * natural ordering, if comparator is null: For each node n in the * heap and each descendant d of n, n <= d.  The element with the * lowest value is in queue[0], assuming the queue is nonempty. */
private transient Object[] queue;

因为保护二分堆时,咱们不须要保障整个堆内所有元素有序,只须要保障父子节点之间有序即可,所以当咱们要插入一个元素时,间接将其插入到堆尾,而后通过其与父节点的关系,进行适当地父子节点换位,就能保障堆的性质。

/**
 * Inserts the specified element into this priority queue. * As the queue is unbounded, this method will never return {@code false}. * * @param e the element to add * @return {@code true} (as specified by {@link Queue#offer}) * @throws ClassCastException if the specified element cannot be compared *         with elements currently in the priority queue according to the *         priority queue's ordering * @throws NullPointerException if the specified element is null */
public boolean offer(E e) {if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 最新节点的下标(n)等于 size,确保容量没问题     while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 通过比拟函数,进行父子节点换位             siftUpComparable(n, e, array);
        else
            // 通过比拟函数,进行父子节点换位             siftUpUsingComparator(n, e, array, cmp);
        // 批改 size 并唤醒期待中的出队操作         size = n + 1;
        notEmpty.signal();} finally {lock.unlock();
    }
    return true;
}

/**
 * Inserts item x at position k, maintaining heap invariant by * promoting x up the tree until it is greater than or equal to * its parent, or is the root. * * To simplify and speed up coercions and comparisons. the * Comparable and Comparator versions are separated into different * methods that are otherwise identical. (Similarly for siftDown.) * These methods are static, with heap state as arguments, to * simplify use in light of possible comparator exceptions. * * @param k the position to fill * @param x the item to insert * @param array the heap array */
private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {// 计算父节点下标         int parent = (k - 1) >>> 1;
        Object e = array[parent];
        // 和父节点比拟,因为默认是升序(最小堆),所以当新增加的节点大于父节点时就停下来了         if (key.compareTo((T) e) >= 0)
            break;
        // 新增加的节点小于父节点,那么父子节点换位,而后反复上述过程         array[k] = e;
        k = parent;
    }
    array[k] = key;
}

当从 PriorityBlockingQueue 中执行出队操作时,间接提取下标 0 元素,而后用 queue 中的最初一个元素,接替 0 号元素的地位,自上而下地修改堆中元素的地位关系,使其满足堆的性质。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    // 加锁并循环出队,如果没有数据就在 notEmpty condition 上期待     try {while ( (result = dequeue()) == null)
            notEmpty.await();} finally {lock.unlock();
    }
    return result;
}

/**
 * Mechanics for poll().  Call only while holding lock. */
private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {Object[] array = queue;
        // 出队 0 号元素         E result = (E) array[0];
        // 接替者是 queue 的开端元素         E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            // 接替者下移             siftDownComparable(0, x, array, n);
        else
            // 接替者下移             siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

从下移节点的实现中,咱们能够看到,它先会比拟两个子节点,选取最小的节点最为下移的指标节点。而后通过比拟器比拟以后节点 x 和指标节点是否满足堆的性质,如果不满足则替换节点地位,并反复上述过程。

/**
 * Inserts item x at position k, maintaining heap invariant by * demoting x down the tree repeatedly until it is less than or * equal to its children or is a leaf. * * @param k the position to fill * @param x the item to insert * @param array the heap array * @param n heap size */
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf         while (k < half) {int child = (k << 1) + 1; // assume left child is least             Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                // 比拟左右两个节点,以最小节点作为下移的指标节点                 c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                // 父节点最小,进行                 break;
            // 子节点小,交换子节点和父节点,并反复上述过程             array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}
退出移动版