序
本文主要研究一下 storm 的 JoinBolt
实例
@Test
public void testJoinBolt() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“uuid-spout”, new RandomWordSpout(new String[]{“uuid”, “timestamp”}), 1);
builder.setSpout(“word-spout”, new RandomWordSpout(new String[]{“word”, “timestamp”}), 1);
JoinBolt joinBolt = new JoinBolt(“uuid-spout”, “timestamp”)
//from priorStream inner join newStream on newStream.field = priorStream.field1
.join(“word-spout”, “timestamp”, “uuid-spout”)
.select(“uuid,word,timestamp”)
.withTumblingWindow(BaseWindowedBolt.Count.of(10));
builder.setBolt(“join”, joinBolt,1)
.fieldsGrouping(“uuid-spout”,new Fields(“timestamp”))
.fieldsGrouping(“word-spout”,new Fields(“timestamp”));
builder.setBolt(“fileWriter”,new FilePrinterBolt(),1).globalGrouping(“join”);
SubmitHelper.submitRemote(“windowTopology”,builder.createTopology());
}
JoinBolt
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
public class JoinBolt extends BaseWindowedBolt {
protected final Selector selectorType;
// Map[StreamName -> JoinInfo]
protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
protected FieldSelector[] outputFields; // specified via bolt.select() … used in declaring Output fields
// protected String[] dotSeparatedOutputFieldNames; // fieldNames in x.y.z format w/o stream name, used for naming output fields
protected String outputStreamName;
// Map[StreamName -> Map[Key -> List<Tuple>] ]
HashMap<String, HashMap<Object, ArrayList<Tuple>>> hashedInputs = new HashMap<>(); // holds remaining streams
private OutputCollector collector;
/**
* Calls JoinBolt(Selector.SOURCE, sourceId, fieldName)
*
* @param sourceId Id of source component (spout/bolt) from which this bolt is receiving data
* @param fieldName the field to use for joining the stream (x.y.z format)
*/
public JoinBolt(String sourceId, String fieldName) {
this(Selector.SOURCE, sourceId, fieldName);
}
/**
* Introduces the first stream to start the join with. Equivalent SQL … select …. from srcOrStreamId …
*
* @param type Specifies whether ‘srcOrStreamId’ refers to stream name/source component
* @param srcOrStreamId name of stream OR source component
* @param fieldName the field to use for joining the stream (x.y.z format)
*/
public JoinBolt(Selector type, String srcOrStreamId, String fieldName) {
selectorType = type;
joinCriteria.put(srcOrStreamId, new JoinInfo(new FieldSelector(srcOrStreamId, fieldName)));
}
/**
* Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on ‘default’ stream.
*/
public JoinBolt withOutputStream(String streamName) {
this.outputStreamName = streamName;
return this;
}
/**
* Performs inner Join with the newStream. SQL : from priorStream inner join newStream on newStream.field = priorStream.field1 same
* as: new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream);
*
* Note: priorStream must be previously joined. Valid ex: new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2); Invalid ex:
* new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);
*
* @param newStream Either stream name or name of upstream component
* @param field the field on which to perform the join
*/
public JoinBolt join(String newStream, String field, String priorStream) {
return joinCommon(newStream, field, priorStream, JoinType.INNER);
}
/**
* Performs left Join with the newStream. SQL : from stream1 left join stream2 on stream2.field = stream1.field1 same as: new
* WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1);
*
* Note: priorStream must be previously joined Valid ex: new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2);
* Invalid ex: new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
*
* @param newStream Either a name of a stream or an upstream component
* @param field the field on which to perform the join
*/
public JoinBolt leftJoin(String newStream, String field, String priorStream) {
return joinCommon(newStream, field, priorStream, JoinType.LEFT);
}
private JoinBolt joinCommon(String newStream, String fieldDescriptor, String priorStream, JoinType joinType) {
if (hashedInputs.containsKey(newStream)) {
throw new IllegalArgumentException(“‘” + newStream + “‘ is already part of join. Cannot join with it more than once.”);
}
hashedInputs.put(newStream, new HashMap<Object, ArrayList<Tuple>>());
JoinInfo joinInfo = joinCriteria.get(priorStream);
if (joinInfo == null) {
throw new IllegalArgumentException(“Stream ‘” + priorStream + “‘ was not previously declared”);
}
FieldSelector field = new FieldSelector(newStream, fieldDescriptor);
joinCriteria.put(newStream, new JoinInfo(field, priorStream, joinInfo, joinType));
return this;
}
/**
* Specify projection fields. i.e. Specifies the fields to include in the output. e.g: .select(“field1, stream2:field2, field3”) Nested
* Key names are supported for nested types: e.g: .select(“outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)” Inner
* types (non leaf) must be Map<> in order to support nested lookup using this dot notation This selected fields implicitly declare the
* output fieldNames for the bolt based.
*
* @param commaSeparatedKeys
* @return
*/
public JoinBolt select(String commaSeparatedKeys) {
String[] fieldNames = commaSeparatedKeys.split(“,”);
outputFields = new FieldSelector[fieldNames.length];
for (int i = 0; i < fieldNames.length; i++) {
outputFields[i] = new FieldSelector(fieldNames[i]);
}
return this;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
String[] outputFieldNames = new String[outputFields.length];
for (int i = 0; i < outputFields.length; ++i) {
outputFieldNames[i] = outputFields[i].getOutputName();
}
if (outputStreamName != null) {
declarer.declareStream(outputStreamName, new Fields(outputFieldNames));
} else {
declarer.declare(new Fields(outputFieldNames));
}
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
// initialize the hashedInputs data structure
int i = 0;
for (String stream : joinCriteria.keySet()) {
if (i > 0) {
hashedInputs.put(stream, new HashMap<Object, ArrayList<Tuple>>());
}
++i;
}
if (outputFields == null) {
throw new IllegalArgumentException(“Must specify output fields via .select() method.”);
}
}
@Override
public void execute(TupleWindow inputWindow) {
// 1) Perform Join
List<Tuple> currentWindow = inputWindow.get();
JoinAccumulator joinResult = hashJoin(currentWindow);
// 2) Emit results
for (ResultRecord resultRecord : joinResult.getRecords()) {
ArrayList<Object> outputTuple = resultRecord.getOutputFields();
if (outputStreamName == null) {
// explicit anchoring emits to corresponding input tuples only, as default window anchoring will anchor them to all
// tuples in window
collector.emit(resultRecord.tupleList, outputTuple);
} else {
// explicitly anchor emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples
// in window
collector.emit(outputStreamName, resultRecord.tupleList, outputTuple);
}
}
}
//……
}
JoinBolt 继承了 BaseWindowedBolt,定义了 Selector selectorType、LinkedHashMap<String, JoinInfo> joinCriteria、FieldSelector[] outputFields 等属性,用于记录关联类型及关联关系
join、leftJoin 方法用于设置 join 关联关系,最后都是调用 joinCommon 方法,关联关系使用 JoinInfo 对象,存储在 joinCriteria 中
select 方法用于选择结果集的列,最后设置到 outputFields,用于 declareOutputFields
execute 就是 join 的核心逻辑了,这里调用了 hashJoin
JoinBolt.hashJoin
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
protected JoinAccumulator hashJoin(List<Tuple> tuples) {
clearHashedInputs();
JoinAccumulator probe = new JoinAccumulator();
// 1) Build phase – Segregate tuples in the Window into streams.
// First stream’s tuples go into probe, rest into HashMaps in hashedInputs
String firstStream = joinCriteria.keySet().iterator().next();
for (Tuple tuple : tuples) {
String streamId = getStreamSelector(tuple);
if (!streamId.equals(firstStream)) {
Object field = getJoinField(streamId, tuple);
ArrayList<Tuple> recs = hashedInputs.get(streamId).get(field);
if (recs == null) {
recs = new ArrayList<Tuple>();
hashedInputs.get(streamId).put(field, recs);
}
recs.add(tuple);
} else {
ResultRecord probeRecord = new ResultRecord(tuple, joinCriteria.size() == 1);
probe.insert(probeRecord); // first stream’s data goes into the probe
}
}
// 2) Join the streams in order of streamJoinOrder
int i = 0;
for (String streamName : joinCriteria.keySet()) {
boolean finalJoin = (i == joinCriteria.size() – 1);
if (i > 0) {
probe = doJoin(probe, hashedInputs.get(streamName), joinCriteria.get(streamName), finalJoin);
}
++i;
}
return probe;
}
hashJoin 方法先遍历一下 tuples,把 tuples 分为两类,firstStream 的数据存到 JoinAccumulator probe 中,其余的存到 HashMap<String, HashMap<Object, ArrayList<Tuple>>> hashedInputs
之后对剩余的 streamId,挨个遍历调用 doJoin,把结果整合到 JoinAccumulator probe
JoinAccumulator
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
protected class JoinAccumulator {
ArrayList<ResultRecord> records = new ArrayList<>();
public void insert(ResultRecord tuple) {
records.add(tuple);
}
public Collection<ResultRecord> getRecords() {
return records;
}
}
JoinAccumulator 就是一个 ArrayList<ResultRecord>
ResultRecord
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
// Join helper to concat fields to the record
protected class ResultRecord {
ArrayList<Tuple> tupleList = new ArrayList<>(); // contains one Tuple per Stream being joined
ArrayList<Object> outFields = null; // refs to fields that will be part of output fields
// ‘generateOutputFields’ enables us to avoid projection unless it is the final stream being joined
public ResultRecord(Tuple tuple, boolean generateOutputFields) {
tupleList.add(tuple);
if (generateOutputFields) {
outFields = doProjection(tupleList, outputFields);
}
}
public ResultRecord(ResultRecord lhs, Tuple rhs, boolean generateOutputFields) {
if (lhs != null) {
tupleList.addAll(lhs.tupleList);
}
if (rhs != null) {
tupleList.add(rhs);
}
if (generateOutputFields) {
outFields = doProjection(tupleList, outputFields);
}
}
public ArrayList<Object> getOutputFields() {
return outFields;
}
// ‘stream’ cannot be null,
public Object getField(FieldSelector fieldSelector) {
for (Tuple tuple : tupleList) {
Object result = lookupField(fieldSelector, tuple);
if (result != null) {
return result;
}
}
return null;
}
}
// Performs projection on the tuples based on ‘projectionFields’
protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, FieldSelector[] projectionFields) {
ArrayList<Object> result = new ArrayList<>(projectionFields.length);
// Todo: optimize this computation… perhaps inner loop can be outside to avoid rescanning tuples
for (int i = 0; i < projectionFields.length; i++) {
boolean missingField = true;
for (Tuple tuple : tuples) {
Object field = lookupField(projectionFields[i], tuple);
if (field != null) {
result.add(field);
missingField = false;
break;
}
}
if (missingField) {// add a null for missing fields (usually in case of outer joins)
result.add(null);
}
}
return result;
}
// Extract the field from tuple. Field may be nested field (x.y.z)
protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) {
// very stream name matches, it stream name was specified
if (fieldSelector.streamName != null &&
!fieldSelector.streamName.equalsIgnoreCase(getStreamSelector(tuple))) {
return null;
}
Object curr = null;
for (int i = 0; i < fieldSelector.field.length; i++) {
if (i == 0) {
if (tuple.contains(fieldSelector.field[i])) {
curr = tuple.getValueByField(fieldSelector.field[i]);
} else {
return null;
}
} else {
curr = ((Map) curr).get(fieldSelector.field[i]);
if (curr == null) {
return null;
}
}
}
return curr;
}
ResultRecord 用于存储 joined 之后的数据
当 joinCriteria.size() == 1 或者 finalJoin 为 true 的时候,ResultRecord 的 generateOutputFields 为 true,会调用 doProjection 对结果集进行 projection 操作
当遍历 joinCriteria 调用 doJoin 的时候,遍历到最后一条记录时为 true
JoinBolt.doJoin
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
// Dispatches to the right join method (inner/left/right/outer) based on the joinInfo.joinType
protected JoinAccumulator doJoin(JoinAccumulator probe, HashMap<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
boolean finalJoin) {
final JoinType joinType = joinInfo.getJoinType();
switch (joinType) {
case INNER:
return doInnerJoin(probe, buildInput, joinInfo, finalJoin);
case LEFT:
return doLeftJoin(probe, buildInput, joinInfo, finalJoin);
case RIGHT:
case OUTER:
default:
throw new RuntimeException(“Unsupported join type : ” + joinType.name());
}
}
doJoin 封装了各种 join 类型的方法,目前仅仅实现了 INNER 以及 LEFT,分别调用 doInnerJoin、doLeftJoin 方法
doInnerJoin
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
// inner join – core implementation
protected JoinAccumulator doInnerJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
boolean finalJoin) {
String[] probeKeyName = joinInfo.getOtherField();
JoinAccumulator result = new JoinAccumulator();
FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
for (ResultRecord rec : probe.getRecords()) {
Object probeKey = rec.getField(fieldSelector);
if (probeKey != null) {
ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey);
if (matchingBuildRecs != null) {
for (Tuple matchingRec : matchingBuildRecs) {
ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
result.insert(mergedRecord);
}
}
}
}
return result;
}
这里挨个对 JoinAccumulator probe 的 records 遍历,然后通过 probeKey 从 buildInput 寻找对应的 records,如果有找到则进行合并
doLeftJoin
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java
// left join – core implementation
protected JoinAccumulator doLeftJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
boolean finalJoin) {
String[] probeKeyName = joinInfo.getOtherField();
JoinAccumulator result = new JoinAccumulator();
FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
for (ResultRecord rec : probe.getRecords()) {
Object probeKey = rec.getField(fieldSelector);
if (probeKey != null) {
ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); // ok if its return null
if (matchingBuildRecs != null && !matchingBuildRecs.isEmpty()) {
for (Tuple matchingRec : matchingBuildRecs) {
ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
result.insert(mergedRecord);
}
} else {
ResultRecord mergedRecord = new ResultRecord(rec, null, finalJoin);
result.insert(mergedRecord);
}
}
}
return result;
}
left join 与 inner join 的区别就在于没有找到匹配记录的话,仍旧保留左边的记录
小结
JoinBolt 继承了 BaseWindowedBolt,目前仅仅支持 inner join 及 left join,而且要求 join 的字段与 fieldsGrouping 的字段相同
JoinBolt 对于多个 stream 数据的合并,使用分治的方式实现,采用 JoinAccumulator 不断累加结果集,循环遍历调用 doJoin 来完成
由于 JoinBolt 是在内存进行操作,又需要匹配数据,需要消耗 CPU 及内存,有几个点需要注意一下:
window 的时间窗口不宜过大,否则内存堆积的数据过多,容易 OOM,可根据情况调整时间窗口或者通过 Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB 设置 woker 的内存大小
采取 slding window 会造成数据重复 join,因而需要使用 withTumblingWindow
如果开启 tuple 处理超时,则要求 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 大于 windowLength + slidingInterval + 处理时间,避免还没有处理完就误判为超时重新 replayed
由于 windowedBolt 会自动对 tupleWindow 的数据进行 anchor,数据量过多 anchor 操作会给整个 topology 造成压力,如无必要可以关闭 ack(设置 Config.TOPOLOGY_ACKER_EXECUTORS 为 0)
Config.TOPOLOGY_MAX_SPOUT_PENDING 要设置的大一点,给 window 的 join 操作及后续操作足够的时间,在一定程度上避免 spout 发送 tuple 速度过快,下游 bolt 消费不过来
生产上 Config.TOPOLOGY_DEBUG 设置为 false 关闭 debug 日志,Config.TOPOLOGY_EVENTLOGGER_EXECUTORS 设置为 0 关闭 event logger
doc
Windowing Support in Core Storm
Joining Streams in Storm Core