聊聊flink的DualKeyMap


本文主要研究一下flink的DualKeyMap
实例
@Test
public void testKeySets() {
final Random random = new Random();
final int capacity = 10;
final Set<Tuple2<Integer, Integer>> keys = new HashSet<>(capacity);

for (int i = 0; i < capacity; i++) {
int keyA = random.nextInt();
int keyB = random.nextInt();
keys.add(Tuple2.of(keyA, keyB));
}

final DualKeyMap<Integer, Integer, String> dualKeyMap = new DualKeyMap<>(capacity);

for (Tuple2<Integer, Integer> key : keys) {
dualKeyMap.put(key.f0, key.f1, “foobar”);
}

assertThat(dualKeyMap.keySetA(), Matchers.equalTo(keys.stream().map(t -> t.f0).collect(Collectors.toSet())));
assertThat(dualKeyMap.keySetB(), Matchers.equalTo(keys.stream().map(t -> t.f1).collect(Collectors.toSet())));
}
DualKeyMap有两个key,put值的时候,需要指定keyA及keyB
DualKeyMap
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
public class DualKeyMap<A, B, V> {

private final HashMap<A, Tuple2<B, V>> aMap;

private final HashMap<B, A> bMap;

private transient Collection<V> values;

public DualKeyMap(int initialCapacity) {
this.aMap = new HashMap<>(initialCapacity);
this.bMap = new HashMap<>(initialCapacity);
}

public int size() {
return aMap.size();
}

public V getKeyA(A aKey) {
final Tuple2<B, V> value = aMap.get(aKey);

if (value != null) {
return value.f1;
} else {
return null;
}
}

public V getKeyB(B bKey) {
final A aKey = bMap.get(bKey);

if (aKey != null) {
return aMap.get(aKey).f1;
} else {
return null;
}
}

public V put(A aKey, B bKey, V value) {
Tuple2<B, V> aValue = aMap.put(aKey, Tuple2.of(bKey, value));
bMap.put(bKey, aKey);

if (aValue != null) {
return aValue.f1;
} else {
return null;
}
}

public boolean containsKeyA(A aKey) {
return aMap.containsKey(aKey);
}

public boolean containsKeyB(B bKey) {
return bMap.containsKey(bKey);
}

public V removeKeyA(A aKey) {
Tuple2<B, V> aValue = aMap.remove(aKey);

if (aValue != null) {
bMap.remove(aValue.f0);
return aValue.f1;
} else {
return null;
}
}

public V removeKeyB(B bKey) {
A aKey = bMap.remove(bKey);

if (aKey != null) {
Tuple2<B, V> aValue = aMap.remove(aKey);
if (aValue != null) {
return aValue.f1;
} else {
return null;
}
} else {
return null;
}
}

public Collection<V> values() {
Collection<V> vs = values;

if (vs == null) {
vs = new Values();
values = vs;
}

return vs;
}

public Set<A> keySetA() {
return aMap.keySet();
}

public Set<B> keySetB() {
return bMap.keySet();
}

public void clear() {
aMap.clear();
bMap.clear();
}

// ———————————————————————–
// Inner classes
// ———————————————————————–

/**
* Collection which contains the values of the dual key map.
*/
private final class Values extends AbstractCollection<V> {

@Override
public Iterator<V> iterator() {
return new ValueIterator();
}

@Override
public int size() {
return aMap.size();
}
}

/**
* Iterator which iterates over the values of the dual key map.
*/
private final class ValueIterator implements Iterator<V> {

private final Iterator<Tuple2<B, V>> iterator = aMap.values().iterator();

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public V next() {
Tuple2<B, V> value = iterator.next();

return value.f1;
}
}
}

DualKeyMap定义了三个泛型,分别是A,B,V,即keyA,keyB,value的泛型;它维护了两个HashMap,其中aMap的key为keyA,value为Tuple2<B, V>;bMap的key为keyB,value为keyA
DualKeyMap提供了getKeyA、getKeyB、containsKeyA、containsKeyB、removeKeyA、removeKeyB、keySetA、keySetB、size、put、values、clear方法
values方法返回的是Values,它继承了AbstractCollection,它的iterator方法返回的是ValueIterator;ValueIterator实现了Iterator接口,其内部使用的是aMap.values().iterator()

小结

DualKeyMap定义了三个泛型,分别是A,B,V,即keyA,keyB,value的泛型;它维护了两个HashMap,其中aMap的key为keyA,value为Tuple2<B, V>;bMap的key为keyB,value为keyA
DualKeyMap提供了getKeyA、getKeyB、containsKeyA、containsKeyB、removeKeyA、removeKeyB、keySetA、keySetB、size、put、values、clear方法;put值的时候,需要指定keyA及keyB
values方法返回的是Values,它继承了AbstractCollection,它的iterator方法返回的是ValueIterator;ValueIterator实现了Iterator接口,其内部使用的是aMap.values().iterator()

doc
DualKeyMap

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理