并发容器
JDK 提供的并发容器大部分在 java。util.concurrent 包中。比拟罕用的有:
- ConcurrentHashMap:线程平安版 HashMap。
- ConcurrentLinkedQueue:线程平安版 LinkedList。
- ConcurrentSkipListMap:线程平安版跳表 Map。
- CopyOnWriteArrayList:线程平安版 List,然而不是通过锁实现。在读多写少的场合性能十分好。
- LinkedBlockQueue:线程平安的阻塞队列。
- PriorityBlockingQueue:反对优先级的无界阻塞队列。
ConcurrentHashMap
ConcurrentHashMap 的数据组织和 HashMap 基本相同。通过一个数组来实现 Hash 桶,当没产生 Hash 抵触时,每个 Hash 桶内都保留一个 Key-Value Entry(Node 对象)。对桶内数据的批改都是通过 CAS 操作进行的,因为数组中的元素没法申明为 volatile, 所以从哈希表中读取数据时,应用到了 UNSAFE 的 getObjectVolatile
函数。
/**
* The bin count threshold for using a tree rather than list for a * bin. Bins are converted to trees when adding an element to a * bin with at least this many nodes. The value must be greater * than 2, and should be at least 8 to mesh with assumptions in * tree removal about conversion back to plain bins upon * shrinkage. */
static final int TREEIFY_THRESHOLD = 8;
/**
* The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. */
transient volatile Node<K,V>[] table;
SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
ConcurrentHashMap 只是用到了对象 hash 码的负数局部,因为它把一些正数的 Hash 码用来形容状态了。比方用 -1 表白以后节点正在迁徙,-2 示意以后节点时一个红黑树的根。-3 示意以后节点是一个保留节点。
/*
* Encodings for Node hash fields. See above for explanation. */
static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
/**
* Spreads (XORs) higher bits of hash to lower and also forces top * bit to 0. Because the table uses power-of-two masking, sets of * hashes that vary only in bits above the current mask will * always collide. (Among known examples are sets of Float keys * holding consecutive whole numbers in small tables.) So we * apply a transform that spreads the impact of higher bits * downward. There is a tradeoff between speed, utility, and * quality of bit-spreading. Because many common sets of hashes * are already reasonably distributed (so don't benefit from * spreading), and because we use trees to handle large sets of * collisions in bins, we just XOR some shifted bits in the * cheapest possible way to reduce systematic lossage, as well as * to incorporate impact of the highest bits that would otherwise * never be used in index calculations because of table bounds. */
static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;
}
当产生 Hash 抵触时,先通过链表来保留 Hash 雷同的所有 Key-Value Entry(Node 对象)。从上面 Node 的实现中,咱们能够看到它实际上就是一个链表的实现(蕴含 next 指针)。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
//... /**
* Virtualized support for map.get(); overridden in subclasses. */
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
当链表的数量大于 TREEIFY_THRESHOLD(8)时,会用红黑树的 Node 代替链表来保留 Key-Value Entry。红黑树是一个自均衡的二叉树,能以 LogN 的工夫复杂度批改和查找数据。
/**
* Nodes for use in TreeBins */
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red;
Node<K,V> find(int h, Object k) {return findTreeNode(h, k, null);
}
/**
* Returns the TreeNode (or null if not found) for the given key * starting at given root. */
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
介绍完次要的外部数据结构,咱们来看一看 hash 表的初始化局部。这外面用到了一个 sizeCtl,它初始保留的是 HashMap 初始大小,在实现 hash 表的初始化之后,它保留的是下次进行扩容时的表内数据的数量。在进行初始化时,sizeCtl 还充当了锁的角色,咱们须要通过它来管制进行初始化工作的线程数量,只让一个线程进行初始化,其余线程期待。初始化实现后,sizeCtl 保留了下次进行扩容时,须要的数据数量,计算规定是 0.75 * 以后容量
。而当进行扩容时,sizeCtl 又起到了记录并发扩容线程数的作用。
/**
* Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */
private transient volatile int sizeCtl;
/**
* Initializes table, using the size recorded in sizeCtl. */
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0) // 有其余线程在初始化,间接 yield Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 加锁 try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 初始化 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 初始化实现后,sizeCtl 保留了下次进行扩容时,须要的数据数量,计算规定是 0.75 * 以后容量 sc = n - (n >>> 2);
}
} finally {sizeCtl = sc;}
break;
}
}
return tab;
}
接下来咱们介绍一下增加数据时的解决逻辑。
- 必要时先进行初始化
- 如果以后 key 所在槽位为空,通过 CAS 创立初始 Node,其中间接保留了 key value,如果胜利则间接返回
- 否则,查看是否正在进行扩容,多线程一起扩容
- 走到这,阐明以后 hash 表槽位曾经被占,这时候咱们须要对该槽位保留的 Node 加锁,该 Node 可能是链表的头也可能是红黑树的 ” 树根 ”
- 加锁胜利后,要确保锁没加错对象,因为在此之前可能别的线程曾经把这个槽位的 Node 由链表改成了红黑树
-
接下来依据 Node 节点的 hash 进行分状况解决,hash 码大于 0 阐明以后是链表
1. 查看对应的 key 是不是曾经在链表中,则间接批改 2. 查看到尾结点依然没找到对应的 key,则在尾部增加节点
- 否则如果 Node 节点是红黑树的树根节点类型,则在红黑树中增加或批改节点,这外面须要对数进行均衡,这里就不开展介绍了,在算法那篇文章中有红黑树的介绍
- 增加完数据之后,如果该槽位的 Node 是链表,则查看链表长度,如果链表长度大于等于 8 则适时地将其转换为红黑树
- 如果对应的 key 是第一次 put 进 map 中,则批改以后数据数量,并适时地扩容
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果没有初始化,进行初始化 if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果以后 key 所在槽位为空,通过 CAS 创立初始 Node,其中间接保留了 key value else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin }
// 扩容中,帮忙一起扩容,多线程扩容 else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 对 Node 加锁 synchronized (f) {if (tabAt(tab, i) == f) {// 确保加锁后,锁没加错对象,因为在此之前可能别的线程曾经把这个槽位的 Node 由链表改成了红黑树 if (fh >= 0) { // hash 码大于 0 阐明以后是链表 binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 查看对应的 key 是不是曾经在链表中 oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 查看到尾结点依然没找到对应的 key,则在尾部增加节点 pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 如果 Node 节点是红黑树的树根节点类型,则在红黑树中增加节点 Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {// 如果该槽位的 Node 是链表,则查看链表长度,如果链表长度大于等于 8 则适时地将其转换为红黑树 if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
// 如果发现笼罩了之前的值,则不进行后续扩容,间接返回后果 return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
介绍一下扩容逻辑,这里咱们以 addCount 函数为例,它会在每次 putVal 增加了新元素之后调用,其中 x 是减少的元素数量,check 示意是否要进行扩容查看,规定是 check < 0 不进行查看(罕用与移除元素时),check <=1 在没有竞争的时候查看:
- putVal 因槽位为 null 而新增加元素时(check=0)
- putVal 时曾经存在元素,而且该元素是链表构造,如果指标 key 是链表的头结点(check=1), 或者链表只有一个元素(check=1),而当头结点不是指标 key 或者链表长度大于 1 时(check>1)
- putVal 时, 如果对应槽位保留的是红黑树节点,则 check= 2
- remove 函数移除元素时,check=-1
在 ConcurrentHashMap 中,为了拉满性能,对数据 size 的保护也进行了优化,它的优化策略很像 linux 中多 cpu 联结计数器的思路。ConcurrentHashMap 有一个基计数器 baseCount,所有线程在减少 size 时,先通过 CAS 对 baseCount 进行批改,如果批改失败,它会为以后线程开拓一个服务于以后线程的计数器(以相似于哈希表的模式存储),不过这个计数器也会发生冲突,当发生冲突时,个别采纳扩容和从新 hash 的形式解决,通过种种操作,升高互斥时长。光说的话有点形象,咱们看一下相干代码吧。
- 如果线程独享的计数器 hash 表 counterCells 不为空或者通过 CAS 批改 baseCount 失败的话,阐明 baseCount 上呈现了竞争,对 size 的计算须要通过线程独享的计数器来实现
- 紧接着,如果 counterCells 为空, 或者 counterCells 大小为 0, 或者以后线程还没有调配 counterCells 槽位,或者从属于以后线程的 counterCell 计数器也发生冲突时,会通过 fullAddCount 进行 counterCells hash 表的创立,或为以后线程调配 counterCells 槽位,或 counterCells 哈希表扩容,或者 rehash 等操作来躲避竞争
- 如果存在 baseCount 竞争,并且 check <= 1 则不进行扩容查看
- 通过 baseCount 加所有 counterCells 的值统计共计 size
private final void addCount(long x, int check) {CounterCell[] as; long b, s;
// 统计容量 size, 执行加一操作 if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 如果线程独享的计数器 hash 表 counterCells 不为空或者通过 CAS 批改 baseCount 失败的话,阐明 baseCount 上呈现了竞争,对 size 的计算须要通过线程独享的计数器来实现 CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// counterCells 为空, 或者 counterCells 大小为 0, 或者以后线程还没有调配 counterCells 槽位,或者从属于以后线程的 counterCell 计数器也发生冲突时 // 会通过 fullAddCount 进行 counterCells hash 表的创立,或为以后线程调配 counterCells 槽位,或 counterCells 哈希表扩容,或者 rehash 等操作来躲避竞争 fullAddCount(x, uncontended);
return;
}
// 如果存在 baseCount 竞争,并且 check <= 1 则不进行扩容查看 if (check <= 1)
return;
// 通过 baseCount 加所有 counterCells 的值统计共计 size s = sumCount();}
// ... }
其中 sumCount 比较简单,就是把 baseCount 和所有 counterCells 的值加起来。
final long sumCount() {CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
fullAddCount 的实现很简单,咱们这里制作简略的介绍,不往深挖。
- 在每个线程中,会调配一个探针值,这个探针值通过 localInit 进行初始化,我感觉这里大家就把他简略地了解为线程中保留的随机数,它保留在
java.lang.Thread#threadLocalRandomProbe
字段,通过 Contended 注解来解决伪共享问题。 - 如果 counterCells hash 表等于空(代码在 fullAddCount 的后半段),就初始化 counterCells hash 表,初始大小是 2,创立好之后,对以后线程对应的槽位进行赋值。所有对 counterCells 的批改都是通过一个 CELLSBUSY 自旋锁进行爱护的
- 如果创立 counterCells hash 表的过程也产生了抵触就从新通过 baseCount 进行 size 的更新, 代码在 fullAddCount 的最初几行
- 如果 counterCells hash 表不为空,通过后面失去的线程探针值与 counterCells hash 表的容量 -1 相与,失去所属的槽位
- 如果所属槽位为空,先加 CELLSBUSY 自旋锁,而后创立 CounterCell 对象并存在对应槽位,如果这一步操作胜利了的话,就返回
- 如果上述操作没有胜利,阐明呈现了很重大的抵触,这里先试着对以后线程对应的槽位 CounterCell 进行更新,如果胜利就返回
- 如果上述操作都失败,就对 counterCells hash 表进行扩容,扩为原来的 2 倍,而后从新执行上述操作
- 如果上述操作全失败,而且扩容的时候还发生冲突,就重置以后线程的探针值,相当于再换一个随机数
// See LongAdder version for explanation private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {// 0 示意未调配探针值 ThreadLocalRandom.localInit(); // force initialization // 对探针进行初始化 h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty for (;;) {CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {// 如果 counterCells hash 表不为空,后面失去的线程探针值与 counterCells hash 表的容量 -1 相与,失去所属的槽位,if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) {// Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {// Recheck under lock CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {rs[j] = r;
created = true;
}
} finally {cellsBusy = 0;}
if (created)
break;
continue; // Slot is now non-empty }
}
collide = false;
}
// 如果上述操作没有胜利,阐明呈现了很重大的抵触,else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 如果上述操作都失败,就对 counterCells hash 表进行扩容,扩为原来的 2 倍,而后从新执行上述操作 try {if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {cellsBusy = 0;}
collide = false;
continue; // Retry with expanded table }
// 如果上述操作全失败,而且扩容的时候还发生冲突,就重置以后线程的探针值,相当于再换一个随机数 h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 如果 counterCells hash 表等于空,就初始化 counterCells hash 表,所有对 counterCells 的批改都是通过一个 CELLSBUSY 自旋锁进行爱护的 boolean init = false;
try {// Initialize table if (counterCells == as) {// hash 表的初始大小为 2,将以后线程对应的槽位进行设置 CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {cellsBusy = 0;}
if (init)
break;
}
// 创立 counterCells hash 表的过程也产生了抵触就从新通过 baseCount 进行 size 的更新 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base }
}
ConcurrentLinkedQueue
Java 提供的线程平安的 Queue 能够分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在理论利用中要依据理论须要选用阻塞队列或者非阻塞队列。阻塞队列能够通过加锁来实现,非阻塞队列能够通过 CAS 操作实现。
ConcurrentLinkedQueue 应用了链表作为其数据结构.外部应用 CAS 来进行链表的保护。ConcurrentLinkedQueue 适宜在对性能要求绝对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的老本较高则适宜应用无锁的 ConcurrentLinkedQueue 来代替。
接下来咱们简略地看一下 ConcurrentLinkedQueue 的实现,在 ConcurrentLinkedQueue 中所有数据通过单向链表存储,同时咱们还会保留该链表的头指针和尾指针。
// 链表中的节点 private static class Node<E> {
volatile E item;
volatile Node<E> next;
//... }
/**
* A node from which the first live (non-deleted) node (if any) * can be reached in O(1) time. * Invariants: * - all live nodes are reachable from head via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * Non-invariants: * - head.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */
private transient volatile Node<E> head;
/**
* A node from which the last node on list (that is, the unique * node with node.next == null) can be reached in O(1) time. * Invariants: * - the last node is always reachable from tail via succ() * - tail != null * Non-invariants: * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */
private transient volatile Node<E> tail;
在对象实例化时,会创立一个虚节点。看到前面你会发现,如果想通过 CAS 保护一个链表,个别都会应用到虚节点。
public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);
}
介绍完外部数据结构,咱们看一下增删节点的实现形式。先来看一下减少数据的逻辑:
- 入队操作是在一个循环中尝试 CAS 操作,首先判断,尾结点 p.next 是不是 null,是的话就通过 CAS 将 null-> newNode,如果 CAS 胜利,阐明该节点就曾经算是退出到队列中了
- 然而这里并没有间接批改尾结点,因为 ConcurrentLinkedQueue 中 tail 并不一定是实际上的尾结点,在并发很大时,如果所有线程都要去竞争批改尾结点的话,对性能会有影响,所以,当理论的尾结点(代码中的变量 p)不等于 tail 时,才会进行更新。
- 在 ConcurrentLinkedQueue 中会呈现 Node1(head)->Node2(tail)->null 以及 Node1(head)->Node2(tail)->Node3->null 这样的状况甚至 Node1(head)->Node2(tail)->Node3->Node4 这样的状况,尽管 tail 指针没有间接指向尾结点会导致将新节点退出链表时,须要从 tail 向后查找理论的尾结点,然而这个过程相较于对 tail 节点的竞争来说,影响较小,最终效率也更高
- 如果发现以后 p 节点不是实际上的尾结点,会先查看它的 next 指针是否指向本人,在出队函数 poll 中,将一个元素出队后会把它的 next 指针指向本人,所以这一步实际上是判断以后的 p 节点是否曾经出队
- 如果满足上述情况,咱们须要从新获取 tail 指针,如果发现在上述过程中 tail 指针产生了变动,这阐明期间曾经好有个并发插入过程实现了,咱们间接从最新的 tail 对象开始上述流程即可,所以这里就将 p 赋为最新的 tail 指向的对象,
- 如果整个过程中 tail 指针都没变,阐明以后的状况相似于 Node1(head,tail)-> Node2->null, 然而在判断
p == q
之前,产生了出队操作,状态变成了 Node1(tail, 曾经出队的对象) Node2(head)->null,这个时候咱们要将 p 设置为 head 而后从 head 开始向后遍历
- 最初就是单纯的没有遍历到尾结点的状况了,Node1(head)->Node2(tail,以后 p 变量)->Node3(以后 q 变量)->null
- 如果发现曾经进行过一次向后遍历的过程,即
p != t
,并且 tail 指针产生了变动,咱们就间接应用 tail 指针,不再向后遍历了 p = t(最新的 tail 指针) - 如果不满足上述情况,比方还素来没遍历过,或者尽管遍历过然而 tail 指针没变,咱们就持续遍历 p = q(p.next)
public boolean offer(E e) {checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {// p is last node // 找到了最初一个节点,通过 CAS 将其 next 指向新节点 if (p.casNext(null, newNode)) {// Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". // 如果 tail.next 为 null 就不批改 tail,tail.next != null 时才会批改 // 这里会呈现多个线程同时发现 tail.next != null 的状况,所以 tail 指针和理论的尾结点的间隔不肯定是 1 if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. 因为没有要求 tail 指针和理论的尾结点的间隔是 1 return true;
}
// Lost CAS race to another thread; re-read next }
else if (p == q)
// 如果发现以后 p 节点不是实际上的尾结点,会先查看它的 next 指针是否指向本人,在出队函数 poll 中,将一个元素出队后会把它的 next 指针指向本人,所以这一步实际上是判断以后的 p 节点是否曾经出队 // 如果 tail 指针产生了变动,就从最新的 tail 开始遍历 // 否则,从 head 开始遍历,因为这时候 tail 可能指向了一个死掉 (next 指向本人,曾经从队列中移除) 的节点 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head;
else
// 最初就是单纯的没有遍历到尾结点的状况了 // 如果发现曾经进行过一次向后遍历的过程,并且 tail 指针产生了变动,咱们就间接应用 tail 指针 // 如果还素来没遍历过,或者尽管遍历过然而 tail 指针没变,咱们就持续遍历 // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q;
}
}
最初,咱们介绍一下出队的操作,整个出队过程也是在一个 CAS 循环中进行:
- 首先咱们查看头指针的 p(head).item 是不是 null,不是的话才阐明该节点是一个无效节点,因为初始化是创立的虚节点 item 才等于 null,这里通过 item 是不是 null 来判断是不是虚节点也就是说 ConcurrentLinkedQueue 中不能增加值为 null 的节点
- 找到无效节点后,通过 cas 将 item 改为 null,后续的操作和增加元素时相似,因为 head 指针也是一个竞争点,所以这里并没有间接批改 head 指针,而是发现从 head 至多向后遍历过一次时,才会批改 head 指针,这和 offer 中的形式相似
- 如果以后线程要负责批改 head 指针,会判断 刚删掉的节点 p 的 next 是不是 null,是的话就让 p 作为 head(此时 p 充当新的虚节点),如果不是的话,就让 p.next 作为 next(此时 head 就是实际上的头结点)
- 如果 p 的 item == null 或者 cas 失败(别的线程曾经把 p.item 置为 null),咱们要检查一下 p.next 是不是 null,如果是的话阐明 p 曾经是最初一个节点了,咱们须要返回 null,然而在此之前,咱们无妨把 p 设为新的 head 来缩小其余线程的遍历开销
- 查看以后 p 节点的 next 指针是不是指向本人,是的话阐明以后查看的这个节点曾经被别的线程从队列中移除了,那咱们就从新开始执行 poll
- 否则,让 p = q(p.next),也就是说这是从 head 开始向后遍历的过程
public E poll() {
restartFromHead:
for (;;) {for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// item != null 阐明该节点是一个无效节点, 通过 CAS 将其 item 改为 null if (item != null && p.casItem(item, null)) {// CAS 胜利阐明曾经移除一个节点了,后续的操作和增加元素时相似,因为 head 指针也是一个竞争点 // 所以这里并没有间接批改 head 指针,而是发现从 head 至多向后遍历过一次时,才会批改 head 指针,这和 offer 中的形式相似 // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time // 判断刚删掉的节点 p 的 next 是不是 null,是的话就让 p 作为 head(此时 p 充当新的虚节点),// 如果不是的话,就让 p.next 作为 next(此时 head 就是实际上的头结点)updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {// 阐明 p 曾经是最初一个节点了,咱们须要返回 null // 然而在此之前,咱们无妨把 p 设为新的 head 来缩小其余线程的遍历开销 updateHead(h, p);
return null;
}
else if (p == q)
// 阐明以后查看的这个节点曾经被别的线程从队列中移除了,那咱们就从新开始执行 poll continue restartFromHead;
else
// p = q(p.next),也就是说这是从 head 开始向后遍历的过程 p = q;
}
}
}
updateHead 的过程中先会查看是不是真的有必要重置 head 指针,有必要的话在通过 CAS 批改 head 指针,如果 CAS 失败了也不妨,毕竟咱们不要求 head 肯定指向理论的头结点,poll 中的遍历过程能 cover 这种状况。如果 CAS 胜利,会将删掉的 head 指针指向本人。
/**
* Tries to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */
final void updateHead(Node<E> h, Node<E> p) {if (h != p && casHead(h, p))
h.lazySetNext(h);
}
void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);
}
这里大家可能会有疑难,为什么要 lazySet
next 指针呢?要想了解这个问题,咱们须要先了解 putOrderedObject
和 putObjectVolatile
的区别。因为 Node 中的 next 属性是用 volatile 润饰的,而 volatile 有什么特点呢?一个是避免指令重拍,一个是将其余 CPU cache 中的相干数据有效化,迫使这些 CPU 从新从主存中拉取最新数据。这是通过 Fence(内存屏障) 实现的,在 linux x86 架构中个别是 lock; addl $0,0(%%esp).
, 这里的 lock 是一个指令前缀, 它蕴含了 storeload 内存屏障的语义, 前面的 addl $0,0(%%esp) 是一个空指令, 因为 lock 前缀不能独立存在(它不是一条残缺的指令), 所以在应用它的时候个别会在前面跟一条什么都不做的指令。
而 putObjectVolatile
函数等效于申明一个 volatile 变量,而后间接对该变量进行批改。也就是说,无论是 putObjectVolatile
还是对 volatile 变量的间接批改,都依赖与 StoreLoad barriers
,这里 StoreLoad barriers
就是说如果指令的程序是 Store1; StoreLoad; Load2
,就须要确保 Store1 保留的数据在 Load2 拜访数据之前,肯定要可能对所有线程可见。对于内存屏障的解释,能够参考这篇手册, 其中介绍了各个内存屏障的要求,以及在不同架构上的实现形式。
而 putOrderedObject
函数呢,只须要保障以后 cpu 内指令是有序的,不会呈现非法的内存拜访即可,这也就是说,putOrderedObject 没有多解决期间的可见性保障,也就不会有多余的开销。在咱们 ConcurrentLinkedQueue 的场景中,最终将 next 指针指向本人并不需要这么高的可见性需求,而且 next 是润饰为 volatile 的,所以,咱们须要显式地调用 putOrderedObject
能力达到“去 volatile 个性”的成果,从而晋升效率。
对于它们的实现,能够参考如下代码,能够看到 ordered_store 最初插入了一个 Op_MemBarCPUOrder 内存屏障,而 putObjectVolatile
对应了 inline_unsafe_access
中的 is_volatile=true && is_store == true 的逻辑,也就是插入了 Op_MemBarVolatile 内存屏障。
bool LibraryCallKit::inline_unsafe_ordered_store(BasicType type) {// This is another variant of inline_unsafe_access, differing in // that it always issues store-store ("release") barrier and ensures // store-atomicity (which only matters for "long").
// ... if (type == T_OBJECT) // reference stores need a store barrier. store = store_oop_to_unknown(control(), base, adr, adr_type, val, type);
else {store = store_to_memory(control(), adr, val, type, adr_type, require_atomic_access);
}
insert_mem_bar(Op_MemBarCPUOrder);
return true;
}
bool LibraryCallKit::inline_unsafe_access(bool is_native_ptr, bool is_store, BasicType type, bool is_volatile) {
// ....
if (is_volatile) {if (!is_store)
insert_mem_bar(Op_MemBarAcquire);
else
insert_mem_bar(Op_MemBarVolatile);
}
if (need_mem_bar) insert_mem_bar(Op_MemBarCPUOrder);
return true;
}
再来看看 memnode.hpp 中对这两种内存屏障的解释。MemBarVolatileNode 须要保障多 CPU 的可见性,MemBarCPUOrderNode 只须要保障单 CPU 程序即可,而且 CPU 曾经做了所有的排序工作,咱们毋庸多做。
// Ordering between a volatile store and a following volatile load. // Requires multi-CPU visibility class MemBarVolatileNode: public MemBarNode {
public:
MemBarVolatileNode(Compile* C, int alias_idx, Node* precedent)
: MemBarNode(C, alias_idx, precedent) {}
virtual int Opcode() const;};
// Ordering within the same CPU. Used to order unsafe memory references // inside the compiler when we lack alias info. Not needed "outside" the // compiler because the CPU does all the ordering for us. class MemBarCPUOrderNode: public MemBarNode {
public:
MemBarCPUOrderNode(Compile* C, int alias_idx, Node* precedent)
: MemBarNode(C, alias_idx, precedent) {}
virtual int Opcode() const;
virtual uint ideal_reg() const { return 0;} // not matched in the AD file };
ConcurrentSkipListMap
对于一个单链表,即便链表是有序的,如果咱们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率天然就会很低。而跳表是在这个单链表的根底上同时保护了多个链表,并且链表是分层的。
最低层的链表保护了跳表内所有的元素,每下面一层链表都是上面一层的子集。
跳表内的所有链表的元素都是排序的。查找时,能够从顶级链表开始找。一旦发现被查找的元素大于以后链表中的取值,就会转入下一层链表持续找。这也就是说在查找过程中,搜寻是跳跃式的。如上图所示,在跳表中查找元素 18。
查找 18 的时候原来须要遍历 12 次,当初只须要 7 次即可。针对链表长度比拟大的时候,构建索引查找效率的晋升就会非常明显。
应用跳表实现 Map 和应用哈希算法实现 Map 的另外一个不同之处是:哈希并不会保留元素的程序,而跳表内所有的元素都是排序的。因而在对跳表进行遍历时,你会失去一个有序的后果。
在 JDK 的 ConcurrentSkipListMap 实现中,没有应用到锁,而是通过 CAS 来进行数据的批改,当插入数据时,通过 CAS 批改最上层列表的内容,而后再逐层向上保护各级列表(各层列表的批改都是通过 CAS 实现),这两个过程是独立的,因为下层列表保护的数据少也只会影响查找数据的速度,而不会影响到数据的准确性,因为 增加与查找数据都以最上层列表内容为准。
CopyOnWriteArrayList
CopyOnWriteArrayList 用于读场景远多于写场景的状况,它可能让读与读之间不互斥,读与写也不互斥,只有写与写之间才会互斥。它的思路也很简略,外部通过一个数组来保护数据,失常读数据时间接通过索引从数组中提取数据。
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {return (E) a[index];
}
/**
* {@inheritDoc} * * @throws IndexOutOfBoundsException {@inheritDoc} */
public E get(int index) {return get(getArray(), index);
}
/**
* Gets the array. Non-private so as to also be accessible * from CopyOnWriteArraySet class. */
final Object[] getArray() {return array;}
而写数据时,须要将整个数组都复制一遍,而后在新数组的开端增加最新的数据。最初替换掉原来的数组,这样原来的数组就会被回收。很显然,这种实现形式在减小竞争的同时,承当了数据空间 * 2 的压力。
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();
/**
* Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) */
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {lock.unlock();
}
}
/**
* Sets the array. */
final void setArray(Object[] a) {array = a;}
LinkedBlockQueue
LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,能够当做无界队列也能够当做有界队列来应用,满足 FIFO 的个性,为了避免 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创立 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer. MAX_VALUE。那么什么是阻塞队列呢?咱们晓得队列有入队出队两个操作,所谓阻塞队列,就是说如果队列已满时,能够阻塞入队操作,而如果队列为空时,能够阻塞出队操作。
为了实现阻塞成果并保障线程平安,它的外部用到了两个锁和两个 Condition。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
在进行数据出队时,先要取得 takeLock,而后查看以后队列容量是否为 0,如果队列容量为 0,则在 notEmpty 上期待,否则间接执行出队操作。最初判断一下,是不是执行出队操作之前,队列曾经达到最大容量,如果是的话,就唤醒期待中的入队操作。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
// 获取锁 final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {// 如果队列为空,就期待 while (count.get() == 0) {notEmpty.await();
}
// 否则,执行出队操作,并批改 size x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 如果队列不为空,则唤醒下一个期待中的出队操作 notEmpty.signal();} finally {takeLock.unlock();
}
// 如果之前队列满了,则唤醒期待中的入队操作 if (c == capacity)
signalNotFull();
return x;
}
/**
* Signals a waiting put. Called only from take/poll. */
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {notFull.signal();
} finally {putLock.unlock();
}
}
入队操作和出队操作正好相同,同样先获取锁,不过这里用的是 putLock,查看以后队列是否已满,是的话就在 notFull 上期待,否则执行入队操作并批改 size,如果之前的队列长度为 0,那么就有可能有一些出队操作被阻塞了,所以咱们这里须要唤醒所有在 notEmpty 上期待的线程。
/**
* Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 先获取锁 putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */
while (count.get() == capacity) {notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();} finally {putLock.unlock();
}
if (c == 0)
signalNotEmpty();}
/**
* Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {notEmpty.signal();
} finally {takeLock.unlock();
}
}
LinkedBlockingQueue 除了阻塞版的入队出队操作外,当然也有不阻塞的接口,不过这些接口比较简单,基本上就是在上述根底勾销 await 和 signal 逻辑,这里就不再赘述了。
PriorityBlockingQueue
PriorityBlockingQueue 是一个带排序功能的阻塞队列,因为它是一个队列,没必要保障整个队列的外部程序,只须要保障出队时依照排序后果出即可,所以其外部应用了二分堆得模式实现,同时,PriorityBlockingQueue 也是线程平安的,外部通过一个锁来管制堆数据的保护。
PriorityBlockingQueue 的堆数据都保留在如下的 queue 数组中,堆的根节点是 queue[0]
, 就像如下正文所说的,咱们能够依据一个节点的下标 n 疾速计算出它的两个子节点的下标,即 queue[2*n+1]
和 queue[2*(n+1)]
。这是用数组来形容二分堆(树)的常见办法。
/**
* Priority queue represented as a balanced binary heap: the two * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The * priority queue is ordered by comparator, or by the elements' * natural ordering, if comparator is null: For each node n in the * heap and each descendant d of n, n <= d. The element with the * lowest value is in queue[0], assuming the queue is nonempty. */
private transient Object[] queue;
因为保护二分堆时,咱们不须要保障整个堆内所有元素有序,只须要保障父子节点之间有序即可,所以当咱们要插入一个元素时,间接将其插入到堆尾,而后通过其与父节点的关系,进行适当地父子节点换位,就能保障堆的性质。
/**
* Inserts the specified element into this priority queue. * As the queue is unbounded, this method will never return {@code false}. * * @param e the element to add * @return {@code true} (as specified by {@link Queue#offer}) * @throws ClassCastException if the specified element cannot be compared * with elements currently in the priority queue according to the * priority queue's ordering * @throws NullPointerException if the specified element is null */
public boolean offer(E e) {if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 最新节点的下标(n)等于 size,确保容量没问题 while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 通过比拟函数,进行父子节点换位 siftUpComparable(n, e, array);
else
// 通过比拟函数,进行父子节点换位 siftUpUsingComparator(n, e, array, cmp);
// 批改 size 并唤醒期待中的出队操作 size = n + 1;
notEmpty.signal();} finally {lock.unlock();
}
return true;
}
/**
* Inserts item x at position k, maintaining heap invariant by * promoting x up the tree until it is greater than or equal to * its parent, or is the root. * * To simplify and speed up coercions and comparisons. the * Comparable and Comparator versions are separated into different * methods that are otherwise identical. (Similarly for siftDown.) * These methods are static, with heap state as arguments, to * simplify use in light of possible comparator exceptions. * * @param k the position to fill * @param x the item to insert * @param array the heap array */
private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {// 计算父节点下标 int parent = (k - 1) >>> 1;
Object e = array[parent];
// 和父节点比拟,因为默认是升序(最小堆),所以当新增加的节点大于父节点时就停下来了 if (key.compareTo((T) e) >= 0)
break;
// 新增加的节点小于父节点,那么父子节点换位,而后反复上述过程 array[k] = e;
k = parent;
}
array[k] = key;
}
当从 PriorityBlockingQueue 中执行出队操作时,间接提取下标 0 元素,而后用 queue 中的最初一个元素,接替 0 号元素的地位,自上而下地修改堆中元素的地位关系,使其满足堆的性质。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
// 加锁并循环出队,如果没有数据就在 notEmpty condition 上期待 try {while ( (result = dequeue()) == null)
notEmpty.await();} finally {lock.unlock();
}
return result;
}
/**
* Mechanics for poll(). Call only while holding lock. */
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {Object[] array = queue;
// 出队 0 号元素 E result = (E) array[0];
// 接替者是 queue 的开端元素 E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 接替者下移 siftDownComparable(0, x, array, n);
else
// 接替者下移 siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
从下移节点的实现中,咱们能够看到,它先会比拟两个子节点,选取最小的节点最为下移的指标节点。而后通过比拟器比拟以后节点 x 和指标节点是否满足堆的性质,如果不满足则替换节点地位,并反复上述过程。
/**
* Inserts item x at position k, maintaining heap invariant by * demoting x down the tree repeatedly until it is less than or * equal to its children or is a leaf. * * @param k the position to fill * @param x the item to insert * @param array the heap array * @param n heap size */
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf while (k < half) {int child = (k << 1) + 1; // assume left child is least Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
// 比拟左右两个节点,以最小节点作为下移的指标节点 c = array[child = right];
if (key.compareTo((T) c) <= 0)
// 父节点最小,进行 break;
// 子节点小,交换子节点和父节点,并反复上述过程 array[k] = c;
k = child;
}
array[k] = key;
}
}