现如今想浏览 HashMap 源码实际上比较简单,因为网上一大堆博客去剖析 HashMap 和 ConcurrentHashMap。而本文是全网首篇详细分析 CopyOnWriteStateTable 源码的博客,浏览简单汇合类源码的过程是相当有挑战的,笔者在刚开始浏览也遇到很多疑难,最初一一解决了。本文有一万两千多字加不少的配图,实属不易。具体浏览完本文,无论是针对面试还是开阔视野肯定会对大家有帮忙的。
具体浏览完本文,无论是针对面试还是开阔视野肯定会对大家有帮忙的。
申明:笔者的源码剖析都是基于 flink-1.9.0 release 分支,其实浏览源码不必十分在意版本的问题,各版本的次要流程根本都是相似的。如果相熟了某个版本的源码,之后新版本有变动,咱们重点看一下变动之处即可。
本文次要讲述 Flink 中 CopyOnWriteStateTable 相干的常识,当应用 MemoryStateBackend 和 FsStateBackend 时,默认状况下会将状态数据保留到 CopyOnWriteStateTable 中。CopyOnWriteStateTable 中保留多个 KeyGroup 的状态,每个 KeyGroup 对应一个 CopyOnWriteStateMap。
CopyOnWriteStateMap 是一个相似于 HashMap 的构造,但反对了两个十分有意思的性能:
- 1、hash 构造为了保障读写数据的高性能,都须要有扩容策略,CopyOnWriteStateMap 的扩容策略是一个渐进式 rehash 的策略,即:不是一下子将数据全迁徙的新的 hash 表,而是缓缓去迁徙数据到新的 hash 表中。
- 2、Checkpoint 时 CopyOnWriteStateMap 反对异步快照,即:Checkpoint 时能够在做快照的同时,依然对 CopyOnWriteStateMap 中数据进行批改。问题来了:数据批改了,怎么保障快照数据的准确性呢?
理解 Redis 的同学应该晓得 Redis 也是一个大的 hash 构造,扩容策略也是渐进式 rehash。Redis 的 RDB 在长久化数据的过程中同时也是对外服务的,对外服务意味着数据可能被批改,那么 RDB 如何保障长久化好的数据肯定是正确的呢?
举个例子:17 点 00 分 00 秒 RDB 开始长久化数据,过了 1 秒 Redis 中某条数据被批改了,过了一分钟 RDB 才长久化完结。RDB 预期的长久化后果应该是 17 点 00 分 00 秒那一刻 Redis 的残缺快照,请问长久化过程中那些批改操作是否会影响 Redis 的快照。答:当然能够做到不影响。
Flink 在 Checkpoint 时的快照与 Redis 相似,都是想在快照时仍然对外提供服务,缩小服务进展工夫。Flink 具体如何实现上述性能的呢?带着问题具体浏览下文。
1.StateTable 简介
MemoryStateBackend 和 FsStateBackend 的 KeyedStateBackend 都应用 HeapKeyedStateBackend 存储数据,HeapKeyedStateBackend 持有 Map<String, StateTable<K, ?, ?>> registeredKVStates 来存储 StateName 与具体 State 的映射关系。registeredKVStates 的 key 就是 StateName,value 为具体的 State 数据。具体 State 的数据存储在 StateTable 中。
StateTable 有两个实现:CopyOnWriteStateTable 和 NestedMapsStateTable。
- CopyOnWriteStateTable 属于 Flink 本人定制化的数据结构,Checkpoint 时反对异步 Snapshot。
- NestedMapsStateTable 间接嵌套 Java 的两层 HashMap 来存储数据,Checkpoint 时须要同步快照。
上面具体介绍 CopyOnWriteStateTable。
2.CopyOnWriteStateTable
StateTable 中持有 StateMap[] keyGroupedStateMaps 真正的存储数据。StateTable 会为每个 KeyGroup 的数据初始化一个 StateMap 来对 KeyGroup 做数据隔离。对状态进行操作时,StateTable 会先依据 key 计算对应的 KeyGroup,拿到相应的 StateMap,能力对状态进行操作。
CopyOnWriteStateTable 中应用 CopyOnWriteStateMap 存储数据,这里次要介绍 CopyOnWriteStateMap 的实现。CopyOnWriteStateMap 中就是一个数组 + 链表形成的 hash 表。
CopyOnWriteStateMap 中元素类型都是是:StateMapEntry。hash 表的第一层先是一个 StateMapEntry 类型的数组,即:StateMapEntry[]。在 StateMapEntry 类中有个 StateMapEntry next 指针形成链表。
CopyOnWriteStateMap 相比一般的 hash 表,有以下几点须要重点关注:
- CopyOnWriteStateMap 的扩容策略是渐进式 rehash,而不是一下子扩容完
- 为了反对异步的 Snapshot,须要将 Snapshot 时 StateMap 的快照保留下来,具体的保留策略怎么实现的?
- 为了反对 CopyOnWrite 性能,所以在批改数据时,要进行一系列 copy 的操作,不能批改原始数据,否则会影响 Snapshot。
- Snapshot 异步快照流程及 Snapshot 实现时,如何 release 掉旧版本数据?
3.CopyOnWriteStateMap 的渐进式 rehash 策略
渐进式 rehash 策略示意 CopyOnWriteStateMap 中以后有一个 hash 表对外服务,然而以后 hash 表中元素太多须要扩容了,须要将数据迁徙到一个容量更大的 hash 表中。
Java 的 HashMap 在扩容时会一下子将旧 hash 表中所有数据都挪动到大 hash 表中,这样的策略存在的问题是如果 HashMap 以后存储了 1 G 的数据,那么霎时须要将 1 G 的数据迁徙完,可能会比拟耗时。而 CopyOnWriteStateMap 在扩容时,不会一下子将数据全副迁徙完,而是在每次操作 CopyOnWriteStateMap 时,缓缓去迁徙数据到大的 hash 表中。
例如:能够在每次 get、put 操作时,迁徙 4 条数据到大 hash 表中,这样通过一段时间的 get 和 put 操作,所有的数据就能迁徙实现。所以渐进式 rehash 策略,会分很屡次将所有的数据迁徙到新的 hash 表中。
3.1 扩容简述
在内存中有两个 hash 表,一个是 primaryTable 作为主桶,一个是 rehashTable 作为扩容期间用的桶。初始阶段只有 primaryTable,当 primaryTable 中元素个数大于设定的阈值时,就要开始扩容。
扩容过程:申请一个相比 primaryTable 容量大一倍的 hash 表保留到 rehashTable 中,缓缓地将 primaryTable 中的元素迁徙到 rehashTable 中。对应到源码中:putEntry 办法中判断 size() > threshold 时,会调用 doubleCapacity 办法申请新的 hash 表赋值给 rehashTable。
如下图所示 primaryTable 中桶的个数为 4,rehashTable 中桶的个数为 8。
扩容时 primaryTable 中 0 地位上的元素会迁徙到 rehashTable 的 0 和 4 地位上,同理 primaryTable 中 1 地位上的元素会迁徙到 rehashTable 的 1 和 5 地位上。
3.2 抉择 Table 的策略
假如 primaryTable 中 0 桶的数据曾经迁徙到 rehashTable 桶了,那么之后无论是 put 还是 get 操作 0 桶的数据,那么都会去操作 rehashTable。而 1、2、3 桶还未迁徙,所以 1、2、3 桶还须要操作 primaryTable 桶。对应到源码中会有一个选桶的操作,抉择到底应用 primaryTable 还是 rehashTable。
源码实现如下所示:
// 抉择以后元素到底应用 primaryTable 还是 incrementalRehashTable
private StateMapEntry<K, N, S>[] selectActiveTable(int hashCode) {
// 计算 hashCode 应该被分到 primaryTable 的哪个桶中
int curIndex = hashCode & (primaryTable.length - 1);
// 大于等于 rehashIndex 的桶还未迁徙,应该去 primaryTable 中去查找。// 小于 rehashIndex 的桶曾经迁徙实现,应该去 incrementalRehashTable 中去查找。return curIndex >= rehashIndex ? primaryTable : incrementalRehashTable;
}
首先通过 int curIndex = hashCode & (primaryTable.length – 1); 计算以后 hashCode 应该分到 primaryTable 的哪个桶中。
rehashIndex 用来标记以后 rehash 迁徙的进度,即:rehashIndex 之前的数据曾经从 primaryTable 迁徙到 rehashTable 桶中。假如 rehashIndex = 1,示意 primaryTable 1 桶之前的数据全副迁徙实现了,即:0 桶数据全副迁徙完了。
策略:大于等于 rehashIndex 的桶还未迁徙,应该去 primaryTable 中去查找。小于 rehashIndex 的桶曾经迁徙实现,应该去 incrementalRehashTable 中去查找。
3.3 迁徙过程
每次有 get、put、containsKey、remove 操作时,都会调用 computeHashForOperationAndDoIncrementalRehash 办法触发迁徙操作。
computeHashForOperationAndDoIncrementalRehash 办法作用:
- 检测是否处于 rehash 中,如果正在 rehash 就会调用 incrementalRehash 迁徙一波数据
- 计算 key 和 namespace 对应的 hashCode
重点关注 incrementalRehash 办法实现:
private void incrementalRehash() {StateMapEntry<K, N, S>[] oldMap = primaryTable;
StateMapEntry<K, N, S>[] newMap = incrementalRehashTable;
int oldCapacity = oldMap.length;
int newMask = newMap.length - 1;
int requiredVersion = highestRequiredSnapshotVersion;
int rhIdx = rehashIndex;
// 记录本次迁徙了几个元素
int transferred = 0;
// 每次至多迁徙 MIN_TRANSFERRED_PER_INCREMENTAL_REHASH 个元素到新桶、// MIN_TRANSFERRED_PER_INCREMENTAL_REHASH 默认为 4
while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {
// 遍历 oldMap 的第 rhIdx 个桶
StateMapEntry<K, N, S> e = oldMap[rhIdx];
// 每次 e 都指向 e.next,e 不为空,示意以后桶中还有元素未遍历,须要持续遍历
// 每次迁徙必须保障,整个桶被迁徙完,不能是某个桶迁徙到一半
while (e != null) {
// 遇到版本比 highestRequiredSnapshotVersion 小的元素,则 copy 一份
if (e.entryVersion < requiredVersion) {e = new StateMapEntry<>(e, stateMapVersion);
}
// 保留下一个要迁徙的节点节点到 n
StateMapEntry<K, N, S> n = e.next;
// 迁徙以后元素 e 到新的 table 中,插入到链表头部
int pos = e.hash & newMask;
e.next = newMap[pos];
newMap[pos] = e;
// e 指向下一个要迁徙的节点
e = n;
// 迁徙元素数 +1
++transferred;
}
oldMap[rhIdx] = null;
// rhIdx 之前的桶曾经迁徙完,rhIdx == oldCapacity 就示意迁徙实现了
// 做一些初始化操作
if (++rhIdx == oldCapacity) {
XXX
return;
}
}
// primaryTableSize 中减去 transferred,减少 transferred
primaryTableSize -= transferred;
incrementalRehashTableSize += transferred;
rehashIndex = rhIdx;
}
incrementalRehash 办法中第一层 while 循环用于管制每次迁徙的最小元素个数。而后遍历 oldMap 的第 rhIdx 个桶,e 指向以后遍历的元素,每次 e 都指向 e.next,e 不为空,示意以后桶中还有元素未遍历,须要持续遍历。每次迁徙必须保障,整个桶被迁徙完,不能是某个桶迁徙到一半。
迁徙过程中,将以后元素 e 从新计算 hash 值,插入到 newMap 相应桶的头部(头插法)。其中 e.entryVersion < requiredVersion 时,须要创立一个新的 Entry,这里是为了反对 CopyOnWrite 性能,上面会介绍。
4.StateMap 的 Snapshot 策略
StateMap 的 Snapshot 策略是指:为了反对异步的 Snapshot,须要将 Snapshot 时 StateMap 的快照保留下来。
传统的办法就是将 StateMap 的全量数据在内存中深拷贝一份,而后拷贝的这一份数据去缓缓做快照,原始的数据能够对外服务。然而深拷贝须要拷贝所有的实在数据,所以效率会非常低。为了提高效率,Flink 只是对数据进行了浅拷贝。
4.1 浅拷贝原理剖析
浅拷贝就是只拷贝援用,不拷贝数据。
如果 StateMap 没有处于扩容中,Snapshot 流程绝对比较简单,创立一个新的 snapshotData,间接将 primaryTable 的数据拷贝到 snapshotData 中即可。
如图所示,对于浅拷贝能够了解为两个 Table 的 0 号桶中都援用的同一个链表,也就是将 snapshotData 指向图中的 Entry a 即可。其余桶的浅拷贝也是相似,就不一一画图了。
如果 StateMap 以后处于扩容中,Snapshot 流程绝对比拟繁琐,创立一个新的 snapshotData,须要将 primaryTable 和 rehashTable 的数据都拷贝到 snapshotData 中。
如图所示,将原始两个 Table 数据拷贝到 snapshotData 中,然而 snapshotData 数组的长度并不是 primaryTable 的长度 + rehashTable 的长度。而是别离计算 primaryTable 和 rehashTable 中有几个桶中有数据。例如上图案例所示,primaryTable 中有 3 个桶中有元素,rehashTable 中有 2 个桶中有元素,所以 snapshotData 的桶数量为 5 即可,没必要 4 + 8 = 12 个桶。
上图中也是省略了 Entry,Entry 援用的浅拷贝与之前没有扩容的状况相似。
4.2 浅拷贝源码详解
首先调用 CopyOnWriteStateTable 的 stateSnapshot 办法对整个 StateTable 进行快照。stateSnapshot 办法会创立 CopyOnWriteStateTableSnapshot,CopyOnWriteStateTableSnapshot 的结构器中会调用 CopyOnWriteStateTable 的 getStateMapSnapshotList 办法。
getStateMapSnapshotList 办法源码如下所示:
List<CopyOnWriteStateMapSnapshot<K, N, S>> getStateMapSnapshotList() {
List<CopyOnWriteStateMapSnapshot<K, N, S>> snapshotList =
new ArrayList<>(keyGroupedStateMaps.length);
// 调用所有 CopyOnWriteStateMap 的 stateSnapshot 办法
// 生成 CopyOnWriteStateMapSnapshot 保留到 list 中
for (int i = 0; i < keyGroupedStateMaps.length; i++) {
CopyOnWriteStateMap<K, N, S> stateMap =
(CopyOnWriteStateMap<K, N, S>) keyGroupedStateMaps[i];
snapshotList.add(stateMap.stateSnapshot());
}
return snapshotList;
}
CopyOnWriteStateTable 中为每个 KeyGroup 保护了一个 StateMap 到 keyGroupedStateMaps 中,getStateMapSnapshotList 办法会调用所有 CopyOnWriteStateMap 的 stateSnapshot 办法。
CopyOnWriteStateMap 的 stateSnapshot 办法相干源码如下所示:
public CopyOnWriteStateMapSnapshot<K, N, S> stateSnapshot() {return new CopyOnWriteStateMapSnapshot<>(this);
}
CopyOnWriteStateMapSnapshot(CopyOnWriteStateMap<K, N, S> owningStateMap) {super(owningStateMap);
// 对 StateMap 的数据进行浅拷贝,生成 snapshotData
this.snapshotData = owningStateMap.snapshotMapArrays();
// 记录以后的 StateMap 版本到 snapshotVersion 中
this.snapshotVersion = owningStateMap.getStateMapVersion();
this.numberOfEntriesInSnapshotData = owningStateMap.size();}
CopyOnWriteStateMap 的 stateSnapshot 办法会创立 CopyOnWriteStateMapSnapshot,CopyOnWriteStateMapSnapshot 的结构器中会调用 StateMap 的 snapshotMapArrays 办法对 StateMap 的数据进行浅拷贝生成 snapshotData。且将以后的 StateMap 版本到 snapshotVersion 中。
StateMap 的 snapshotMapArrays 办法对浅拷贝原理进行了代码实现,代码如下所示:
public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
// 以后 StateMap 的 version
private int stateMapVersion;
// 所有 正在进行中的 snapshot 的 version
private final TreeSet<Integer> snapshotVersions;
// 正在进行中的那些 snapshot 的最大版本号
private int highestRequiredSnapshotVersion;
StateMapEntry<K, N, S>[] snapshotMapArrays() {
// 1、stateMapVersion 版本 + 1,赋值给 highestRequiredSnapshotVersion,// 并退出 snapshotVersions
synchronized (snapshotVersions) {
++stateMapVersion;
highestRequiredSnapshotVersion = stateMapVersion;
snapshotVersions.add(highestRequiredSnapshotVersion);
}
// 2、将当初 primary 和 Increment 的元素浅拷贝一份到 copy 中
// copy 策略:copy 数组长度为 primary 中残余的桶数 + Increment 中有数据的桶数
// primary 中残余的数据放在 copy 数组的后面,Increment 中低位数据随后,// Increment 中高位数据放到 copy 数组的最初
StateMapEntry<K, N, S>[] table = primaryTable;
final int totalMapIndexSize = rehashIndex + table.length;
final int copiedArraySize = Math.max(totalMapIndexSize, size());
final StateMapEntry<K, N, S>[] copy = new StateMapEntry[copiedArraySize];
if (isRehashing()) {
final int localRehashIndex = rehashIndex;
final int localCopyLength = table.length - localRehashIndex;
// for the primary table, take every index >= rhIdx.
System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
table = incrementalRehashTable;
System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
System.arraycopy(table, table.length >>> 1, copy,
localCopyLength + localRehashIndex, localRehashIndex);
} else {System.arraycopy(table, 0, copy, 0, table.length);
}
return copy;
}
}
CopyOnWriteStateMap 中三个比拟重要的属性:
- stateMapVersion:示意以后 StateMap 的版本,每次 Snapshot 时版本号加一
- snapshotVersions:寄存所有正在进行中的 snapshot 的版本号(因为可能存在多个同时进行的 Snapshot)
- highestRequiredSnapshotVersion:示意正在进行中的那些 snapshot 的最大版本号,如果以后没有正在进行中的 Snapshot,那么赋值为 0
snapshotMapArrays 办法第一步依照上述规定更新这三个属性,第二步将当初 primaryTable 和 rehashTable 的元素浅拷贝一份到 copy 数组中。
注:copy 数组的长度与上述原理剖析不完全一致,原理剖析时应该是 copiedArraySize = totalMapIndexSize;实际上 copiedArraySize = Math.max(totalMapIndexSize, size())。
源码正文写到:实践上 totalMapIndexSize 就够了,这里思考 size 次要是为了兼容 StateMap 的 TransformedSnapshotIterator 性能。
5.CopyOnWrite 实现原理
上一部分得出结论,每次 Snapshot 时仅仅是浅拷贝一份,所以 Snapshot 和 StateMap 独特援用实在的数据。如果 Snapshot 还没将数据 flush 到磁盘,然而 StateMap 中对数据进行了批改,那么 Snapshot 最初 flush 的数据就是谬误的。Snapshot 的指标是:将 Snapshot 快照中原始的数据刷到磁盘,既然叫快照,所以不容许被批改。
5.1 CopyOnWrite 原理简述
那 StateMap 如何来保障批改数据的时候,不会批改 Snapshot 的数据呢?其实原理很简略:StateMap 和 Snapshot 共享了一大堆数据,既然 Snapshot 要求数据不能批改,那么 StateMap 在批改某条数据时能够将这条数据复制一份产生一个正本,所以 Snapshot 和 StateMap 就会各自领有本人的正本,所以 StateMap 对数据的批改就不会影响 Snapshot 的快照。
当然为了节俭内存和提高效率,StateMap 只会拷贝那些要扭转的数据,尽量多的实现共享,不能实现共享的数据只能 Copy 一份再批改了,这就是类名用 CopyOnWrite 润饰的起因。
5.2 CopyOnWrite 原理详解
上一部分 Snapshot 时,仅仅对 Table 做了一份浅拷贝,而且能够看到拷贝前后,桶内的数据不变,且桶跟桶之间是没有交加的,所以这里的原理详解次要就剖析一个桶中的链表如何实现 CopyOnWrite。
■ 5.2.1 批改链表头部节点的场景
如上图所示,primaryTable 和 snapshotTable 的 0 号桶都指向 Entry a,假如当初应用层要批改 Entry a 的数据,整体流程:
- 深拷贝一个 Entry a 对象为 Entry a copy
- 将 Entry a copy 放到 primaryTable 的链表中,且 next 指向 Entry b
- 应用层批改 Entry a copy 的 data,将 data1 批改为设定的 data2
这里 Entry b 和 c 没有批改,所以不必拷贝,属于 primaryTable 和 snapshotTable 共享的。
这里就引出了 CopyOnWriteStateMap 的设计指标(本人的了解,并不是官网观点):在保障 Snapshot 数据正确性的前提下,尽量的少拷贝数据进步性能。
■ 5.2.2 批改链表两头节点的场景
如上图所示,primaryTable 和 snapshotTable 的 0 号桶都指向 Entry a,假如当初应用层要批改 Entry b 的数据,整体流程:
- 深拷贝一个 Entry b 对象为 Entry b copy
- 将 Entry b copy 串在 primaryTable 的链表中,且 next 指向 Entry c
- 应用层批改 Entry b copy 的 data,将 data 批改为设定的 data2
然而上述流程成立吗?如上图所示 Entry a 和 c 是 primaryTable 和 snapshotTable 共享的。每个 Entry 只有一个 next 指针,所以 Entry a 能够同时指向 Entry b 和 b copy 吗?必定是不能够的,所以 Entry a 不能够共享。下图是正确流程。
如下图所示,在批改 Entry b 时,不仅仅要将 Entry b 拷贝一份,而且还要将链表中 Entry b 之前的 Entry 必须全副 copy 一份,这样能力保障在满足正确性的前提下批改 Entry b,毕竟正确性是第一位。
正确整体流程:
- 深拷贝 Entry a 和 b 对象为 Entry a copy 和 b copy
- 将 Entry a copy 和 b copy 串在 primaryTable 的链表中,且 Entry b 的 next 指向 Entry c
- 应用层批改 Entry b copy 的 data,将 data 批改为设定的 data2
总结: 假如要批改 Entry b,那么要将 Entry b 以及链表中 Entry b 之前的 Entry 必须全副 copy 一份,Entry b 之后的 Entry 能够共享。
■ 5.2.3 插入新数据的场景
如上图所示是插入新数据的场景,会应用头插法插入 Entry d,头插法不须要拷贝原始链表的任何数据,只须要插入最新的数据到链表头部即可。这样 primaryTable 能够拜访到插入的数据,且不影响 SnapshotData 拜访原始快照的数据。
注:这里必须是插入新数据的场景,对于 Map 类型,插入旧数据对应的可能是批改操作
■ 5.2.4 链表头部有新节点再批改链表两头节点的场景
如上图所示是链表头部有新节点 Entry d 再批改 Entry b 的场景,此时正确的流程是:
- 深拷贝 Entry a 和 b 对象为 Entry a copy 和 b copy
- 将 Entry a copy 和 b copy 串在 Entry d 的链表中,且 Entry b 的 next 指向 Entry c
- 应用层批改 Entry b copy 的 data,将 data 批改为设定的 data2
之前说过要批改 Entry b 须要将 Entry b 之前的 Entry 全副 copy 一份,然而此时并不需要对 Entry d 进行 copy。之前 copy 是因为 Entry b 之前的元素有被 snapshotData 援用,然而这里 Entry d 并不被 snapshotData 援用,只有 primaryTable 只有 Entry d,所以不须要 copy。
批改 Entry b 时,Entry b 之前的 Entry 哪些须要 copy,哪些不须要 copy,具体如何辨别会在后续的源码环节具体介绍。
■ 5.2.5 get 链表两头节点的场景
实践来讲,拜访两头节点的场景数据数据是十分平安的。
如下图所示 Flink 应用层通过 primaryTable 拜访 Entry b,实践来讲只是读取的场景就不须要 copy 正本了。因为之前 copy 正本都是因为应用层批改了数据,为了保障 Snapshot 数据的不可变个性,所以专门 copy 一个正本让 primaryTable 去批改。但神奇的是 CopyOnWriteStateMap 在 get 操作时,也须要将 Entry b 以及 Entry b 之前的所有 Entry 拷贝一个正本。
为什么呢?尽管是 get 拜访操作,然而应用层拿到了 Entry b 中的 data 对象,万一应用层批改了 data 对象里的属性怎么办呢?例如 Entry 中的 data 是 Person 对象,Person 对象可能有一些 setter 办法,能够批改其 name 和 age。如果应用层批改了 name 或 age,那么在 Snapshot 的过程中,还是呈现了数据批改的状况。
所以 CopyOnWriteStateMap 把 get 操作跟 put 操作同等对待,无论是 get 还是 put 都须要将 Entry 及其之前的 Entry copy 一份。
■ 5.2.6 remove 数据的场景
须要辨别两种 case:remove 的 Entry 是链表头节点;remove 的 Entry 不是链表头节点。
Case1:remove 的 Entry 是链表头节点的场景比较简单,将桶间接指向 Entry a 的 next Entry b 即可。
Case 2:remove 的 Entry 不是链表头节点,须要将 Entry b 之前的所有 Entry 拷贝一份(新插入的 Entry 不须要拷贝),且 Entry b 前一个节点的正本间接指向 Entry b 的下一个节点。具体为什么 Entry a 须要拷贝一份与 put 和 get 操作相似,因为 Entry a 的 next 指针没方法指向两个节点,所以 primaryTable 和 snapshotTable 要有各自的头结点。
■ 5.2.7 COW 原理小结
上述 case 根本笼罩到了各种场景,这里做一个总结:
- 插入新的 Entry 应用头插法插入到链表中
- 假如要批改 Entry b,那么要将 Entry b 以及链表中 Entry b 之前的 Entry 必须全副 copy 一份(新插入的数据不须要拷贝),Entry b 之后的 Entry 能够共享
- 拜访 Entry b 的场景与批改 Entry b 的场景相似
- 如果批改或拜访的数据是 copy 后的数据,那么实际上不须要再 copy 了,因为 copy 后的数据曾经保障是 primaryTable 独占的数据,不与 Snapshot 共享
-
remove 数据的场景,分为两种 case:
- 如果 remove 的 Entry 是链表头节点,将桶间接指向头结点的 next 节点即可。
- 如果 remove 的 Entry 不是链表头节点,须要将指标 Entry 之前的所有 Entry 拷贝一份,且指标 Entry 前一个节点的正本间接指向指标 Entry 的下一个节点。当然如果前继节点曾经是新版本了,则不须要拷贝,间接批改前继 Entry 的 next 指针即可。
5.3 CopyOnWriteStateMap 各种操作源码详解
■ 5.3.1 CopyOnWriteStateMap 介绍
CopyOnWriteStateMap 类用于存储数据,反对了 CopyOnWrite 的性能,先介绍 CopyOnWriteStateMap 中一些绝对重要的字段,相干源码如下所示(重点看一下每个字段的正文):
public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
// 默认容量 128,即:hash 表中桶的个数默认 128
public static final int DEFAULT_CAPACITY = 128;
// hash 扩容迁徙数据时,每次起码要迁徙 4 条数据
private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
// State 的序列化器
protected final TypeSerializer<S> stateSerializer;
// 空表:提前创立好
private static final StateMapEntry<?, ?, ?>[] EMPTY_TABLE =
new StateMapEntry[MINIMUM_CAPACITY >>> 1];
// 以后 StateMap 的 version,每次创立一个 Snapshot 时,StateMap 的版本号加一
private int stateMapVersion;
// 所有 正在进行中的 snapshot 的 version
// 每次创立出一个 Snapshot 时,都须要将 Snapshot 的 version 保留到该 Set 中
private final TreeSet<Integer> snapshotVersions;
// 正在进行中的那些 snapshot 的最大版本号
// 这里保留的就是 TreeSet<Integer> snapshotVersions 中最大的版本号
private int highestRequiredSnapshotVersion;
// 主表:用于存储数据的 table
private StateMapEntry<K, N, S>[] primaryTable;
// 扩容时的新表,扩容期间数组长度为 primaryTable 的 2 倍。// 非扩容期间为 空表
private StateMapEntry<K, N, S>[] incrementalRehashTable;
// primaryTable 中元素个数
private int primaryTableSize;
// incrementalRehashTable 中元素个数
private int incrementalRehashTableSize;
// primary table 中增量 rehash 要迁徙的下一个 index
// 即:primaryTable 中 rehashIndex 之前的数据全副搬移实现
private int rehashIndex;
// 扩容阈值,与 HashMap 相似,当元素个数大于 threshold 时,就会开始扩容。// 默认 threshold 为 StateMap 容量 * 0.75
private int threshold;
// 用于记录元素批改的次数,遍历迭代过程中,发现 modCount 批改了,则抛异样
private int modCount;
}
其中 primaryTable 字段是真正存储数据的 hash 表,primaryTable 是 StateMapEntry 类型的数据,StateMapEntry 用于存储 StateMap 中的一条数据,上面介绍 StateMapEntry。
■ 5.3.2 StateMapEntry
StateMapEntry 是 CopyOnWriteStateMap 中真正存储数据的实体。在 Java 的 HashMap 中也是将数据封装在 Entry 中,HashMap 的 Entry 源码如下所示:
static class Node<K,V> implements Map.Entry<K,V> {
// 以后 key 对应的 hash 值
final int hash;
final K key;
V value;
// next 指向以后桶中下一个 Node
Node<K,V> next;
}
HashMap 中的动态外部类 Node 实现 Map.Entry,类中有四个字段:hash、key、value、next。key 和 value 不同解释,hash 示意以后 key 对应的 hash 值,next 指向以后桶中下一个 Node。
HashMap 在 get(key) 查找数据流程:
- 依据 key 计算 hash 值,定位到具体的桶
- 遍历以后桶的一个个 Entry,先比拟 hash 值是否雷同,在比拟 key 是否雷同 (应用 equals 判断 key 是否雷同)
- 如果 hash 值和 key 的 equals 办法都能匹配,示意找到了对应的 Entry,返回 Entry 中的 value 即可
StateMapEntry 源码如下所示:
protected static class StateMapEntry<K, N, S> implements StateEntry<K, N, S> {
final K key;
final N namespace;
S state;
final int hash;
StateMapEntry<K, N, S> next;
// new entry 时的版本号
int entryVersion;
// state(数据)更新时的 版本号
int stateVersion;
}
StateMapEntry 与 HashMap 的 Entry 类似度较高,其余 key、hash、next 这三个属性完全相同,StateMapEntry 中的 state 示意 HashMap 中的 value,即:具体存储的数据。
StateMapEntry 相比 HashMap 的 Entry,多了三个字段:
- namespace:namespace 是 Flink 中的概念,用于辨别不同的 Window,在 StateMapEntry 中 key 和 namespace 组合起来作为独特的主键,state 作为 value
- entryVersion:示意创立 entry 时的版本号
- stateVersion:示意以后 StateMapEntry 中 state(数据)更新时的版本号
因为 key 和 namespace 独特作为主键,因而在 CopyOnWriteStateMap 的 get 或 put 操作中,判断是否找到了匹配的 Entry,不仅要判断 hash 值,还要通过 equals 办法对 key 和 namespace 进行判断。三个参数都校验通过能力示意找到了相应的 Entry。这一点是与 HashMap 区别较大的,要留神了解。
■ 5.3.3 插入新数据源码流程
CopyOnWriteStateMap 类的 put 办法如下所示:
public void put(K key, N namespace, S value) {
// putEntry 用于找到对应的 Entry,// 包含了批改数据或插入新数据的场景
final StateMapEntry<K, N, S> e = putEntry(key, namespace);
// 将 value set 到 Entry 中
e.state = value;
// state 更新了,所以要更新 stateVersion
e.stateVersion = stateMapVersion;
}
put 办法间接调用 putEntry 办法,putEntry 用于找到对应的 Entry,putEntry 包含了批改数据或插入新数据的场景。找到 Entry 后,将 value set 到 Entry 中。putEntry 办法源码如下所示:
private StateMapEntry<K, N, S> putEntry(K key, N namespace) {
// 计算以后对应的 hash 值,抉择 primaryTable 或 incrementalRehashTable
final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
int index = hash & (tab.length - 1);
// 遍历以后桶中链表的一个个 Entry
for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
// 如果依据 key 和 namespace 找到了对应的 Entry,则认为是批改数据
// 一般的 HashMap 构造有一个 Key,而这里 key 和 namespace 的组合当做 key
if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
// 批改数据逻辑(临时疏忽)if (e.entryVersion < highestRequiredSnapshotVersion) {e = handleChainedEntryCopyOnWrite(tab, index, e);
}
// 批改数据,间接返回对应的 Entry
return e;
}
}
// 代码走到这里,阐明原始的链表中没找到对应 Entry,即:插入新数据的逻辑
++modCount;
if (size() > threshold) {doubleCapacity();
}
// 链中没有找到 key 和 namespace 的数据
return addNewStateMapEntry(tab, key, namespace, hash);
}
putEntry 办法首先会计算以后 key 和 namespace 对应的 hash 值,应用 selectActiveTable 抉择应用 primaryTable 或 incrementalRehashTable,而后计算以后元素对应桶的 index。
这里留神,一般的 HashMap 构造有一个 Key 一个 value。而这里 key 和 namespace 的组合当做 Map 的 key,value 依然是原来的 value。
遍历以后桶中链表的一个个 Entry,如果通过 hash 值、key 和 namespace 的 equals 办法进行匹配,如果匹配胜利,示意找到了对应的 Entry,则认为是批改数据。
如果遍历完以后桶中链表的所有元素还没找到匹配的 Entry,阐明是插入一条新数据,则执行 addNewStateMapEntry 办法往链表头部插入一个新的 Entry 返回(头插法)。
■ 5.3.4 批改数据源码流程
在 putEntry 中,批改数据场景的源码如下所示:
// 如果依据 key 和 namespace 找到了对应的 Entry,则认为是批改数据
// 一般的 HashMap 构造有一个 Key,而这里 key 和 namespace 的组合当做 key
if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
// entryVersion 示意 entry 创立时的版本号
// highestRequiredSnapshotVersion 示意 正在进行中的那些 snapshot 的最大版本号
// entryVersion 小于 highestRequiredSnapshotVersion,阐明 Entry 的版本小于以后某些 Snapshot 的版本号,// 即:以后 Entry 是旧版本的数据,以后 Entry 被其余 snapshot 持有。// 为了保障 Snapshot 的数据正确性,这里必须为 e 创立新的正本,且 e 之前的某些元素也须要 copy 正本
// handleChainedEntryCopyOnWrite 办法将会进行相应的 copy 操作,并返回 e 的新正本
// 而后将返回 handleChainedEntryCopyOnWrite 办法返回的 e 的正本返回给下层,进行数据的批改操作。if (e.entryVersion < highestRequiredSnapshotVersion) {e = handleChainedEntryCopyOnWrite(tab, index, e);
}
// 反之,entryVersion >= highestRequiredSnapshotVersion
// 阐明以后 Entry 创立时的版本比所有 Snapshot 的版本高
// 即:以后 Entry 是新版本的数据,不被任何 Snapshot 持有
// 注:Snapshot 不可能援用高版本的数据
// 此时,e 是新的 Entry,不存在共享问题,所以间接批改以后 Entry 即可,所以返回以后 e
return e;
}
这里是上一部分插入新数据的局部源码,当初重点讲述批改数据的过程。如果依据 key 和 namespace 找到了相应的 Entry,则认为是对老数据的批改,走相应的批改逻辑。而后判断以后 Entry 的 entryVersion 是否小于 highestRequiredSnapshotVersion。
entryVersion 示意 entry 创立时的版本号,highestRequiredSnapshotVersion 示意正在进行中的那些 snapshot 的最大版本号。
- entryVersion 小于 highestRequiredSnapshotVersion,阐明 Entry 创立时的版本小于以后某些 Snapshot 的版本号,即:以后 Entry 是旧版本的数据,以后 Entry 被其余 Snapshot 持有。为了保障 Snapshot 的数据正确性,这里必须为 e 创立新的正本,且 e 之前的某些元素也须要 copy 正本,handleChainedEntryCopyOnWrite 办法将会进行相应的 copy 操作,并返回 e 的新正本。最初将 e 的正本返回给下层,进行数据的批改操作。
- 反之,entryVersion >= highestRequiredSnapshotVersion,阐明以后 Entry 创立时的版本比所有 Snapshot 的版本高。Snapshot 不可能援用高版本的数据,所以以后 Entry 是新版本的数据不被任何 Snapshot 持有。此时 e 是新的 Entry,不存在共享问题,所以间接批改以后 Entry 即可,所以返回以后 e。
handleChainedEntryCopyOnWrite 办法的作用:为 Entry e 创立新的正本,且链表中 Entry e 之前某些元素也须要 copy 正本,最初返回 e 的正本。
那哪些元素应该拷贝,哪些元素不应该拷贝呢?Snapshot 之后新创建的 Entry 就不须要再拷贝了,Snapshot 之前创立的 Entry 会被 Snapshot 援用所以须要再拷贝。
handleChainedEntryCopyOnWrite 的源码如下所示:
private StateMapEntry<K, N, S> handleChainedEntryCopyOnWrite(StateMapEntry<K, N, S>[] tab,
int mapIdx,
StateMapEntry<K, N, S> untilEntry) {
// current 指向以后桶的头结点
StateMapEntry<K, N, S> current = tab[mapIdx];
StateMapEntry<K, N, S> copy;
// 判断头结点创立时的版本是否低于 highestRequiredSnapshotVersion
// 如果低于,则 current 节点被 Snapshot 援用,所以须要 new 一个新的 Entry
if (current.entryVersion < highestRequiredSnapshotVersion) {copy = new StateMapEntry<>(current, stateMapVersion);
tab[mapIdx] = copy;
} else {copy = current;}
// 顺次遍历以后桶的元素,直到遍历到 untilEntry 节点,也就是咱们要批改的 Entry 节点
while (current != untilEntry) {
current = current.next;
// current 版本小于 highestRequiredSnapshotVersion,则须要拷贝,// 否则不必拷贝
if (current.entryVersion < highestRequiredSnapshotVersion) {
// entryVersion 示意创立 Entry 时的 version,// 所以新创建的 Entry 对应的 entryVersion 要更新为以后 StateMap 的 version
copy.next = new StateMapEntry<>(current, stateMapVersion);
copy = copy.next;
} else {copy = current;}
}
return copy;
}
从源码能够看到,,从头结点到要批改的 Entry 节点顺次遍历桶中元素,都是应用 current.entryVersion < highestRequiredSnapshotVersion 来判断以后节点的创立创立时的版本是否低于 highestRequiredSnapshotVersion。
- 如果低于则 current 节点被 Snapshot 援用,所以须要 new 一个新的 Entry,也就是所谓的拷贝一个正本。
- 否则不必拷贝。
在新创建 Entry 时,新 Entry 的 entryVersion 要更新为以后 StateMap 的 version,示意这是一个新版本的 Entry,并没有被 Snapshot 援用。这样之后再要批改该 Entry 时间接批改该 Entry 即可,不须要再拷贝一份正本了。
■ 5.3.5 拜访数据源码流程
CopyOnWriteStateMap 类的 get 办法与 putEntry 相似,都是顺次遍历相应桶的元素,直到依据 key 和 namespace 找到了相应的 Entry,则返回相应的 Entry。如果遍历完相应桶的所有 Entry,都没有与 key 和 namespace 相匹配的 Entry,则示意 StateMap 中没有指定的元素则返回 null。
如果找到了相应 Entry,为了保障 Snapshot 援用的数据不被批改,所以也要进行拷贝操作。除了拷贝其余源码比较简单与 putEntry 实现相似,所以重点剖析找到 Entry 后的相干源码。相干源码如下所示:
if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
// 一旦 get 以后数据,为了避免应用层批改数据外部的属性值,// 所以必须保障这是一个最新的 Entry,并更新其 stateVersion
// 首先查看以后的 State,也就是 value 值是否是旧版本数据,// 如果 value 是旧版本,则必须深拷贝一个 value
// 否则 value 是新版本,间接返回给应用层
if (e.stateVersion < requiredVersion) {
// 此时还有两种状况,// 1、如果以后 Entry 是旧版本的,则 Entry 也须要拷贝一份,// 依照之前剖析过的 handleChainedEntryCopyOnWrite 策略拷贝即可
// 2、以后 Entry 是新版本数据,则不须要拷贝,间接批改其 State 即可
if (e.entryVersion < requiredVersion) {e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
}
// 更新其 stateVersion
e.stateVersion = stateMapVersion;
// 通过序列化器,深拷贝一个数据
e.state = getStateSerializer().copy(e.state);
}
return e.state;
}
一旦 get 以后数据,为了避免应用层批改数据外部的属性值,所以必须保障这是一个最新的 Entry,并更新其 stateVersion。首先查看以后的 State,也就是 value 值是否是旧版本数据:
- 如果 value 是旧版本,则必须深拷贝一个 value
- 否则 value 是新版本,间接返回给应用层
如果 value 值是还辨别两种状况:
- 1、如果以后 Entry 是旧版本的,则 Entry 也须要拷贝一份,依照之前剖析过的 handleChainedEntryCopyOnWrite 策略拷贝即可
- 2、以后 Entry 是新版本数据,则不须要拷贝,间接批改其 State 即可
case 1 容易了解,如下图所示拜访 Entry b 就是 case 1 的场景,须要应用 handleChainedEntryCopyOnWrite 办法对 Entry b 和 a 进行拷贝操作,而后再对 Entry b 的 value 对象进行一次深拷贝,所以 Entry b 和 b copy 不会共享 data 对象。
尽管 Entry a 也拷贝了一份生成 Entry a copy,然而 Entry a 中的 value 对象并没有深拷贝一份,而是共享 data1 对象。get Entry b 后 Entry a 和 a copy 援用 data 1 的图示用下图会更形象一些,即:Entry a 和 a copy 的 state 会独特援用 data1 对象。对于批改 Entry a 如果下次再有 get 操作,就会对应上述的 case 2 场景:stateVersion 是老版本,然而 Entry a copy 属于新版本。此时不须要再对 Entry 进行复制操作,只须要对 State 进行一次深拷贝,保障不会将 Entry a 的 State 返回给应用层。
■ 5.3.6 remove 数据源码流程
removeEntry 源码如下所示:
private StateMapEntry<K, N, S> removeEntry(K key, N namespace) {final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
int index = hash & (tab.length - 1);
for (StateMapEntry<K, N, S> e = tab[index], prev = null;
e != null; prev = e, e = e.next) {if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
// 如果要删除的 Entry 不存在前继节点,阐明要删除的 Entry 是头结点,// 间接将桶间接指向头结点的 next 节点即可。if (prev == null) {tab[index] = e.next;
} else {
// 如果 remove 的 Entry 不是链表头节点,须要将指标 Entry 之前的所有 Entry 拷贝一份,// 且指标 Entry 前一个节点的正本间接指向指标 Entry 的下一个节点。// 当然如果前继节点曾经是新版本了,则不须要拷贝,间接批改前继 Entry 的 next 指针即可。// copy-on-write check for entry
if (prev.entryVersion < highestRequiredSnapshotVersion) {prev = handleChainedEntryCopyOnWrite(tab, index, prev);
}
prev.next = e.next;
}
// 批改一些计数器
++modCount;
if (tab == primaryTable) {--primaryTableSize;} else {--incrementalRehashTableSize;}
return e;
}
}
return null;
}
remove 数据的场景,分为两种 case:
- 如果 remove 的 Entry 是链表头节点,将桶间接指向头结点的 next 节点即可。
- 如果 remove 的 Entry 不是链表头节点,须要将指标 Entry 之前的所有 Entry 拷贝一份,且指标 Entry 前一个节点的正本间接指向指标 Entry 的下一个节点。当然如果前继节点曾经是新版本了,则不须要拷贝,间接批改前继 Entry 的 next 指针即可。
源码比拟清晰加上曾经详细分析了 put 和 get 源码,所以 remove 源码间接联合原理看正文即可。
6.Snapshot 流程及实现后的 release 操作
后面曾经剖析了 CopyOnWriteStateMap 的扩容 rehash 原理和源码、Snapshot 时浅拷贝原理和源码以及 CopyOnWrite 实现的原理和源码。
CopyOnWrite 的实现次要为了缩小 Checkpoint 同步阶段的进展工夫,将数据的快照过程尽量放到异步流程。上面剖析 Snapshot 异步快照流程及 Snapshot 实现后 release 相干操作。
HeapSnapshotStrategy 类的 AsyncSnapshotCallable 匿名外部类的 callInternal 办法中会调用 AbstractStateTableSnapshot 的 writeStateInKeyGroup 办法,并顺次将每个 KeyGroupId 当做参数传入。
writeStateInKeyGroup 办法源码如下所示:
public void writeStateInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) {
// 获取 KeyGroupId 对应的 CopyOnWriteStateMapSnapshot
StateMapSnapshot<K, N, S, ? extends StateMap<K, N, S>> stateMapSnapshot =
getStateMapSnapshotForKeyGroup(keyGroupId);
// 将 stateMapSnapshot 中的 State 数据进行序列化输入
stateMapSnapshot.writeState(localKeySerializer, localNamespaceSerializer,
localStateSerializer, dov, stateSnapshotTransformer);
// stateMapSnapshot 对应的数据曾经遍历完了,所以能够开释该快照
stateMapSnapshot.release();}
writeStateInKeyGroup 办法拿到 KeyGroupId 对应的 CopyOnWriteStateMapSnapshot,而后将 stateMapSnapshot 中的 State 数据进行序列化输入,这一步就会顺次遍历 stateMapSnapshot 所有援用的数据序列化输入到内部存储中。序列化实现就能够开释该快照了。
release 最初会调用 CopyOnWriteStateMap 的 releaseSnapshot 办法,releaseSnapshot 办法源码如下所示:
void releaseSnapshot(int snapshotVersion) {synchronized (snapshotVersions) {
// 将 相应的 snapshotVersion 从 snapshotVersions 中 remove
snapshotVersions.remove(snapshotVersion);
// 将 snapshotVersions 的最大值更新到 highestRequiredSnapshotVersion,// 如果 snapshotVersions 为空,则 highestRequiredSnapshotVersion 更新为 0
highestRequiredSnapshotVersion = snapshotVersions.isEmpty() ?
0 : snapshotVersions.last();}
}
releaseSnapshot 办法将相应的 snapshotVersion 从 snapshotVersions 中 remove,并将 snapshotVersions 的最大值更新到 highestRequiredSnapshotVersion,如果 snapshotVersions 为空,则 highestRequiredSnapshotVersion 更新为 0。
有个小疑难:依据之前的流程剖析,Snapshot 过程中如果 Flink 应用层产生了大量 get 和 put 操作,那么很多 Entry 和 State 都会呈现多个正本。Snapshot 完结后,就应该把那些旧版本的数据清理掉。可是没有看到对旧版本数据进行清理操作呢?
如上图所示,Entry b 和 a 都存在正本,当 Snapshot 完结后,因为新数据在 Entry a copy 和 b copy 中,所以 Entry a 和 b 都应该被清理掉,留着 Entry a copy 和 b copy 即可。然而代码中没有看到去清理 Entry a 和 b。那么会不会呈现内存透露的问题呢?
其实并不会,Snapshot 完结后 snapshotData 对应的 hash 表不会再被异步快照的线程援用,所以 Entry a 和 b 就会变成不可达对象,会被 JVM 的 GC 回收掉。
7. 总结
本文具体介绍了 CopyOnWriteStateTable 的设计原理及相干源码,次要从 rehash 和 CopyOnWrite 两个点进行深刻分析,心愿对大家能有所帮忙。
本文波及的 github 仓库,都在 feature/source-code-read-1-9-0 分支,之后也会继续更新:
https://github.com/1996fanrui…