乐趣区

聊聊flink的DualKeyMap


本文主要研究一下 flink 的 DualKeyMap
实例
@Test
public void testKeySets() {
final Random random = new Random();
final int capacity = 10;
final Set<Tuple2<Integer, Integer>> keys = new HashSet<>(capacity);

for (int i = 0; i < capacity; i++) {
int keyA = random.nextInt();
int keyB = random.nextInt();
keys.add(Tuple2.of(keyA, keyB));
}

final DualKeyMap<Integer, Integer, String> dualKeyMap = new DualKeyMap<>(capacity);

for (Tuple2<Integer, Integer> key : keys) {
dualKeyMap.put(key.f0, key.f1, “foobar”);
}

assertThat(dualKeyMap.keySetA(), Matchers.equalTo(keys.stream().map(t -> t.f0).collect(Collectors.toSet())));
assertThat(dualKeyMap.keySetB(), Matchers.equalTo(keys.stream().map(t -> t.f1).collect(Collectors.toSet())));
}
DualKeyMap 有两个 key,put 值的时候,需要指定 keyA 及 keyB
DualKeyMap
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
public class DualKeyMap<A, B, V> {

private final HashMap<A, Tuple2<B, V>> aMap;

private final HashMap<B, A> bMap;

private transient Collection<V> values;

public DualKeyMap(int initialCapacity) {
this.aMap = new HashMap<>(initialCapacity);
this.bMap = new HashMap<>(initialCapacity);
}

public int size() {
return aMap.size();
}

public V getKeyA(A aKey) {
final Tuple2<B, V> value = aMap.get(aKey);

if (value != null) {
return value.f1;
} else {
return null;
}
}

public V getKeyB(B bKey) {
final A aKey = bMap.get(bKey);

if (aKey != null) {
return aMap.get(aKey).f1;
} else {
return null;
}
}

public V put(A aKey, B bKey, V value) {
Tuple2<B, V> aValue = aMap.put(aKey, Tuple2.of(bKey, value));
bMap.put(bKey, aKey);

if (aValue != null) {
return aValue.f1;
} else {
return null;
}
}

public boolean containsKeyA(A aKey) {
return aMap.containsKey(aKey);
}

public boolean containsKeyB(B bKey) {
return bMap.containsKey(bKey);
}

public V removeKeyA(A aKey) {
Tuple2<B, V> aValue = aMap.remove(aKey);

if (aValue != null) {
bMap.remove(aValue.f0);
return aValue.f1;
} else {
return null;
}
}

public V removeKeyB(B bKey) {
A aKey = bMap.remove(bKey);

if (aKey != null) {
Tuple2<B, V> aValue = aMap.remove(aKey);
if (aValue != null) {
return aValue.f1;
} else {
return null;
}
} else {
return null;
}
}

public Collection<V> values() {
Collection<V> vs = values;

if (vs == null) {
vs = new Values();
values = vs;
}

return vs;
}

public Set<A> keySetA() {
return aMap.keySet();
}

public Set<B> keySetB() {
return bMap.keySet();
}

public void clear() {
aMap.clear();
bMap.clear();
}

// ———————————————————————–
// Inner classes
// ———————————————————————–

/**
* Collection which contains the values of the dual key map.
*/
private final class Values extends AbstractCollection<V> {

@Override
public Iterator<V> iterator() {
return new ValueIterator();
}

@Override
public int size() {
return aMap.size();
}
}

/**
* Iterator which iterates over the values of the dual key map.
*/
private final class ValueIterator implements Iterator<V> {

private final Iterator<Tuple2<B, V>> iterator = aMap.values().iterator();

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public V next() {
Tuple2<B, V> value = iterator.next();

return value.f1;
}
}
}

DualKeyMap 定义了三个泛型,分别是 A,B,V,即 keyA,keyB,value 的泛型;它维护了两个 HashMap,其中 aMap 的 key 为 keyA,value 为 Tuple2<B, V>;bMap 的 key 为 keyB,value 为 keyA
DualKeyMap 提供了 getKeyA、getKeyB、containsKeyA、containsKeyB、removeKeyA、removeKeyB、keySetA、keySetB、size、put、values、clear 方法
values 方法返回的是 Values,它继承了 AbstractCollection,它的 iterator 方法返回的是 ValueIterator;ValueIterator 实现了 Iterator 接口,其内部使用的是 aMap.values().iterator()

小结

DualKeyMap 定义了三个泛型,分别是 A,B,V,即 keyA,keyB,value 的泛型;它维护了两个 HashMap,其中 aMap 的 key 为 keyA,value 为 Tuple2<B, V>;bMap 的 key 为 keyB,value 为 keyA
DualKeyMap 提供了 getKeyA、getKeyB、containsKeyA、containsKeyB、removeKeyA、removeKeyB、keySetA、keySetB、size、put、values、clear 方法;put 值的时候,需要指定 keyA 及 keyB
values 方法返回的是 Values,它继承了 AbstractCollection,它的 iterator 方法返回的是 ValueIterator;ValueIterator 实现了 Iterator 接口,其内部使用的是 aMap.values().iterator()

doc
DualKeyMap

退出移动版