聊聊storm的PartialKeyGrouping

43次阅读

共计 11317 个字符,预计需要花费 29 分钟才能阅读完成。


本文主要研究一下 storm 的 PartialKeyGrouping
实例
@Test
public void testPartialKeyGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
String spoutId = “wordGenerator”;
String counterId = “counter”;
String aggId = “aggregator”;
String intermediateRankerId = “intermediateRanker”;
String totalRankerId = “finalRanker”;
int TOP_N = 5;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(spoutId, new TestWordSpout(), 5);
//NOTE 通过 partialKeyGrouping 替代 fieldsGrouping,实现较为均衡的负载到 countBolt
builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields(“word”));
builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields(“obj”));
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields(“obj”));
builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
submitRemote(builder);
}
值得注意的是在 wordCount 的 bolt 使用 PartialKeyGrouping,同一个单词不再固定发给相同的 task,因此这里还需要 RollingCountAggBolt 按 fieldsGrouping 进行合并。
PartialKeyGrouping(1.2.2 版)
storm-core-1.2.2-sources.jar!/org/apache/storm/grouping/PartialKeyGrouping.java
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
private static final long serialVersionUID = -447379837314000353L;
private List<Integer> targetTasks;
private long[] targetTaskStats;
private HashFunction h1 = Hashing.murmur3_128(13);
private HashFunction h2 = Hashing.murmur3_128(17);
private Fields fields = null;
private Fields outFields = null;

public PartialKeyGrouping() {
//Empty
}

public PartialKeyGrouping(Fields fields) {
this.fields = fields;
}

@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
targetTaskStats = new long[this.targetTasks.size()];
if (this.fields != null) {
this.outFields = context.getComponentOutputFields(stream);
}
}

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
List<Integer> boltIds = new ArrayList<>(1);
if (values.size() > 0) {
byte[] raw;
if (fields != null) {
List<Object> selectedFields = outFields.select(fields, values);
ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
for (Object o: selectedFields) {
if (o instanceof List) {
out.putInt(Arrays.deepHashCode(((List)o).toArray()));
} else if (o instanceof Object[]) {
out.putInt(Arrays.deepHashCode((Object[])o));
} else if (o instanceof byte[]) {
out.putInt(Arrays.hashCode((byte[]) o));
} else if (o instanceof short[]) {
out.putInt(Arrays.hashCode((short[]) o));
} else if (o instanceof int[]) {
out.putInt(Arrays.hashCode((int[]) o));
} else if (o instanceof long[]) {
out.putInt(Arrays.hashCode((long[]) o));
} else if (o instanceof char[]) {
out.putInt(Arrays.hashCode((char[]) o));
} else if (o instanceof float[]) {
out.putInt(Arrays.hashCode((float[]) o));
} else if (o instanceof double[]) {
out.putInt(Arrays.hashCode((double[]) o));
} else if (o instanceof boolean[]) {
out.putInt(Arrays.hashCode((boolean[]) o));
} else if (o != null) {
out.putInt(o.hashCode());
} else {
out.putInt(0);
}
}
raw = out.array();
} else {
raw = values.get(0).toString().getBytes(); // assume key is the first field
}
int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % this.targetTasks.size());
int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % this.targetTasks.size());
int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice;
boltIds.add(targetTasks.get(selected));
targetTaskStats[selected]++;
}
return boltIds;
}
}

可以看到 PartialKeyGrouping 是一种 CustomStreamGrouping,在 prepare 的时候,初始化了 long[] targetTaskStats 用于统计每个 task
partialKeyGrouping 如果没有指定 fields,则默认按 outputFields 的第一个 field 来计算
这里使用 guava 类库提供的 Hashing.murmur3_128 函数,构造了两个 HashFunction,然后计算哈希值的绝对值与 targetTasks.size() 取余数得到两个可选的 taskId 下标
然后根据 targetTaskStats 的统计值,取用过的次数小的那个 taskId,选中之后更新 targetTaskStats

PartialKeyGrouping(2.0.0 版)
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
/**
* A variation on FieldGrouping. This grouping operates on a partitioning of the incoming tuples (like a FieldGrouping), but it can send
* Tuples from a given partition to multiple downstream tasks.
*
* Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of a subset of those tasks. Each
* key is assigned a subset of tasks. Each tuple is then sent to one task from that subset.
*
* Notes: – the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible – the default
* AssignmentCreator hashes the key and produces an assignment of two tasks
*/
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
private static final long serialVersionUID = -1672360572274911808L;
private List<Integer> targetTasks;
private Fields fields = null;
private Fields outFields = null;

private AssignmentCreator assignmentCreator;
private TargetSelector targetSelector;

public PartialKeyGrouping() {
this(null);
}

public PartialKeyGrouping(Fields fields) {
this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
}

public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
this(fields, assignmentCreator, new BalancedTargetSelector());
}

public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
this.fields = fields;
this.assignmentCreator = assignmentCreator;
this.targetSelector = targetSelector;
}

@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
if (this.fields != null) {
this.outFields = context.getComponentOutputFields(stream);
}
}

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
List<Integer> boltIds = new ArrayList<>(1);
if (values.size() > 0) {
final byte[] rawKeyBytes = getKeyBytes(values);

final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);

boltIds.add(selectedTask);
}
return boltIds;
}

/**
* Extract the key from the input Tuple.
*/
private byte[] getKeyBytes(List<Object> values) {
byte[] raw;
if (fields != null) {
List<Object> selectedFields = outFields.select(fields, values);
ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
for (Object o : selectedFields) {
if (o instanceof List) {
out.putInt(Arrays.deepHashCode(((List) o).toArray()));
} else if (o instanceof Object[]) {
out.putInt(Arrays.deepHashCode((Object[]) o));
} else if (o instanceof byte[]) {
out.putInt(Arrays.hashCode((byte[]) o));
} else if (o instanceof short[]) {
out.putInt(Arrays.hashCode((short[]) o));
} else if (o instanceof int[]) {
out.putInt(Arrays.hashCode((int[]) o));
} else if (o instanceof long[]) {
out.putInt(Arrays.hashCode((long[]) o));
} else if (o instanceof char[]) {
out.putInt(Arrays.hashCode((char[]) o));
} else if (o instanceof float[]) {
out.putInt(Arrays.hashCode((float[]) o));
} else if (o instanceof double[]) {
out.putInt(Arrays.hashCode((double[]) o));
} else if (o instanceof boolean[]) {
out.putInt(Arrays.hashCode((boolean[]) o));
} else if (o != null) {
out.putInt(o.hashCode());
} else {
out.putInt(0);
}
}
raw = out.array();
} else {
raw = values.get(0).toString().getBytes(); // assume key is the first field
}
return raw;
}

//……
}
2.0.0 版本将逻辑封装到了 RandomTwoTaskAssignmentCreator 以及 BalancedTargetSelector 中
RandomTwoTaskAssignmentCreator
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
/**
* This interface is responsible for choosing a subset of the target tasks to use for a given key.
*
* NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on multiple Storm Workers, thus
* each of them needs to come up with the same assignment for a given key.
*/
public interface AssignmentCreator extends Serializable {
int[] createAssignment(List<Integer> targetTasks, byte[] key);
}

/*========== Implementations ==========*/

/**
* This implementation of AssignmentCreator chooses two arbitrary tasks.
*/
public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator {
/**
* Creates a two task assignment by selecting random tasks.
*/
public int[] createAssignment(List<Integer> tasks, byte[] key) {
// It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the key
final long seedForRandom = Arrays.hashCode(key);
final Random random = new Random(seedForRandom);
final int choice1 = random.nextInt(tasks.size());
int choice2 = random.nextInt(tasks.size());
// ensure that choice1 and choice2 are not the same task
choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2;
return new int[]{ tasks.get(choice1), tasks.get(choice2) };
}
}
2.0.0 版本不再使用 guava 类库提供的 Hashing.murmur3_128 哈希函数,转而使用 key 的哈希值作为 seed,采用 Random 函数来计算两个 taskId 的下标,这里返回两个值供 bolt 做负载均衡选择
BalancedTargetSelector
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
/**
* This interface chooses one element from a task assignment to send a specific Tuple to.
*/
public interface TargetSelector extends Serializable {
Integer chooseTask(int[] assignedTasks);
}

/**
* A basic implementation of target selection. This strategy chooses the task within the assignment that has received the fewest Tuples
* overall from this instance of the grouping.
*/
public static class BalancedTargetSelector implements TargetSelector {
private Map<Integer, Long> targetTaskStats = Maps.newHashMap();

/**
* Chooses one of the incoming tasks and selects the one that has been selected the fewest times so far.
*/
public Integer chooseTask(int[] assignedTasks) {
Integer taskIdWithMinLoad = null;
Long minTaskLoad = Long.MAX_VALUE;

for (Integer currentTaskId : assignedTasks) {
final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L);
if (currentTaskLoad < minTaskLoad) {
minTaskLoad = currentTaskLoad;
taskIdWithMinLoad = currentTaskId;
}
}

targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);
return taskIdWithMinLoad;
}
}
BalancedTargetSelector 根据选中的 taskId,然后根据 targetTaskStats 计算 taskIdWithMinLoad 返回
FieldsGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class FieldsGrouper implements CustomStreamGrouping {

private Fields outFields;
private List<List<Integer>> targetTasks;
private Fields groupFields;
private int numTasks;

public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
this.outFields = outFields;
this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));

}

@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = new ArrayList<List<Integer>>();
for (Integer targetTask : targetTasks) {
this.targetTasks.add(Collections.singletonList(targetTask));
}
this.numTasks = targetTasks.size();
}

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
return targetTasks.get(targetTaskIndex);
}

}
这里可以看到 FieldsGrouper 的 chooseTasks 方法使用 TupleUtils.chooseTaskIndex 来选择 taskId 下标
TupleUtils.chooseTaskIndex
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java
public static <T> int chooseTaskIndex(List<T> keys, int numTasks) {
return Math.floorMod(listHashCode(keys), numTasks);
}

private static <T> int listHashCode(List<T> alist) {
if (alist == null) {
return 1;
} else {
return Arrays.deepHashCode(alist.toArray());
}
}

这里先对 keys 进行 listHashCode,然后与 numTasks 进行 Math.floorMod 运算,即向下取模
listHashCode 调用了 Arrays.deepHashCode(alist.toArray()) 进行哈希值计算

小结

storm 的 PartialKeyGrouping 是解决 fieldsGrouping 造成的 bolt 节点 skewed load 的问题
fieldsGrouping 采取的是对所选字段进行哈希然后与 taskId 数量向下取模来选择 taskId 的下标
PartialKeyGrouping 在 1.2.2 版本的实现是使用 guava 提供的 Hashing.murmur3_128 哈希函数计算哈希值,然后取绝对值与 taskId 数量取余数得到两个可选的 taskId 下标;在 2.0.0 版本则使用 key 的哈希值作为 seed,采用 Random 函数来计算两个 taskId 的下标。注意这里返回两个值供 bolt 做负载均衡选择,这是与 fieldsGrouping 的差别。在得到两个候选 taskId 之后,PartialKeyGrouping 额外维护了 taskId 的使用数,每次选择使用少的,与此同时也更新每次选择的计数。
值得注意的是在 wordCount 的 bolt 使用 PartialKeyGrouping,同一个单词不再固定发给相同的 task,因此这里还需要 RollingCountAggBolt 按 fieldsGrouping 进行合并。

doc

Common Topology Patterns
The Power of Both Choices: Practical Load Balancing for Distributed Stream Processing Engines
Storm- 源码分析 -Streaming Grouping (backtype.storm.daemon.executor)

正文完
 0