聊聊storm的CustomStreamGrouping

42次阅读

共计 7084 个字符,预计需要花费 18 分钟才能阅读完成。


本文主要研究一下 storm 的 CustomStreamGrouping
CustomStreamGrouping
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping.java
public interface CustomStreamGrouping extends Serializable {

/**
* Tells the stream grouping at runtime the tasks in the target bolt. This information should be used in chooseTasks to determine the
* target tasks.
*
* It also tells the grouping the metadata on the stream this grouping will be used on.
*/
void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);

/**
* This function implements a custom stream grouping. It takes in as input the number of tasks in the target bolt in prepare and returns
* the tasks to send the tuples to.
*
* @param values the values to group on
*/
List<Integer> chooseTasks(int taskId, List<Object> values);
}

这里定义了 prepare 以及 chooseTasks 方法
GrouperFactory 里头定义了 FieldsGrouper、GlobalGrouper、NoneGrouper、AllGrouper、BasicLoadAwareCustomStreamGrouping
另外 org.apache.storm.grouping 包里头也定义了 ShuffleGrouping、PartialKeyGrouping、LoadAwareShuffleGrouping

FieldsGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class FieldsGrouper implements CustomStreamGrouping {

private Fields outFields;
private List<List<Integer>> targetTasks;
private Fields groupFields;
private int numTasks;

public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
this.outFields = outFields;
this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));

}

@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = new ArrayList<List<Integer>>();
for (Integer targetTask : targetTasks) {
this.targetTasks.add(Collections.singletonList(targetTask));
}
this.numTasks = targetTasks.size();
}

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
return targetTasks.get(targetTaskIndex);
}

}
对选中 fields 的 values 通过 TupleUtils.chooseTaskIndex 选择 task 下标;chooseTaskIndex 主要是采用 Arrays.deepHashCode 取哈希值然后对 numTask 向下取模
GlobalGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class GlobalGrouper implements CustomStreamGrouping {

private List<Integer> targetTasks;

public GlobalGrouper() {
}

@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
}

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
if (targetTasks.isEmpty()) {
return null;
}
// It’s possible for target to have multiple tasks if it reads multiple sources
return Collections.singletonList(targetTasks.get(0));
}
}
这里固定取第一个 task,即 targetTasks.get(0)
NoneGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class NoneGrouper implements CustomStreamGrouping {

private final Random random;
private List<Integer> targetTasks;
private int numTasks;

public NoneGrouper() {
random = new Random();
}

@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
this.numTasks = targetTasks.size();
}

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int index = random.nextInt(numTasks);
return Collections.singletonList(targetTasks.get(index));
}
}
这里通过 random.nextInt(numTasks) 随机取 task
AllGrouper
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
public static class AllGrouper implements CustomStreamGrouping {

private List<Integer> targetTasks;

@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
}

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
return targetTasks;
}
}
这里返回所有的 targetTasks
ShuffleGrouping
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
private ArrayList<List<Integer>> choices;
private AtomicInteger current;

@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i : targetTasks) {
choices.add(Arrays.asList(i));
}
current = new AtomicInteger(0);
Collections.shuffle(choices, new Random());
}

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int rightNow;
int size = choices.size();
while (true) {
rightNow = current.incrementAndGet();
if (rightNow < size) {
return choices.get(rightNow);
} else if (rightNow == size) {
current.set(0);
return choices.get(0);
}
} // race condition with another thread, and we lost. try again
}
}

这里在 prepare 的时候对 ArrayList<List<Integer>> choices 进行随机化
采用 current.incrementAndGet() 实现 round robbin 的效果,超过 size 的时候重置返回第一个,没有超过则返回 incr 后的 index 的值

PartialKeyGrouping
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
private static final long serialVersionUID = -1672360572274911808L;
private List<Integer> targetTasks;
private Fields fields = null;
private Fields outFields = null;

private AssignmentCreator assignmentCreator;
private TargetSelector targetSelector;

public PartialKeyGrouping() {
this(null);
}

public PartialKeyGrouping(Fields fields) {
this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
}

public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
this(fields, assignmentCreator, new BalancedTargetSelector());
}

public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
this.fields = fields;
this.assignmentCreator = assignmentCreator;
this.targetSelector = targetSelector;
}

@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
if (this.fields != null) {
this.outFields = context.getComponentOutputFields(stream);
}
}

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
List<Integer> boltIds = new ArrayList<>(1);
if (values.size() > 0) {
final byte[] rawKeyBytes = getKeyBytes(values);

final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);

boltIds.add(selectedTask);
}
return boltIds;
}
//……
}
这里通过 RandomTwoTaskAssignmentCreator 来选中两个 taskId,然后选择使用次数小的那个
LoadAwareCustomStreamGrouping
storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
void refreshLoad(LoadMapping loadMapping);
}

继承了 CustomStreamGrouping 接口,然后新定义了 refreshLoad 方法用于刷新负载,这里的负载主要是 executor 的 receiveQueue 的负载 (qMetrics.population() / qMetrics.capacity())
LoadAwareCustomStreamGrouping 有几个实现类,有 BasicLoadAwareCustomStreamGrouping 以及 LoadAwareShuffleGrouping

小结

storm 的 CustomStreamGrouping 接口定义了 chooseTasks 方法,用于选择 tasks 来处理 tuples
ShuffleGrouping 类似 round robbin,FieldsGrouper 则根据所选字段值采用 Arrays.deepHashCode 取哈希值然后对 numTask 向下取模,GlobalGrouper 返回 index 为 0 的 taskId,NoneGrouper 则随机返回,AllGrouper 不做过滤返回所有 taskId,PartialKeyGrouping 则使用 key 的哈希值作为 seed,采用 Random 函数来计算两个 taskId 的下标,然后选择使用次数少的那个 task。
LoadAware 的 grouping 有 BasicLoadAwareCustomStreamGrouping 以及 LoadAwareShuffleGrouping,他们都实现了 LoadAwareCustomStreamGrouping 接口,该接口定义了 refreshLoad 方法,用于动态刷新负载,这里的负载主要是 executor 的 receiveQueue 的负载 (qMetrics.population() / qMetrics.capacity())

doc
Stream groupings

正文完
 0