关于storm:storm集群部署和项目部署

storm集群部署和我的项目部署wget http://mirror.bit.edu.cn/apache/storm/apache-storm-1.2.2/apache-storm-1.2.2.tar.gztar -zxvf apache-storm-1.2.2.tar.gz cd apache-storm-1.2.2vim conf/storm.yaml 配置storm.local.dir: "/data/liang/datas/storm"storm.zookeeper.servers: - "10.2.45.3" - "10.2.45.4" - "10.2.45.5"storm.zookeeper.port: 2181nimbus.seeds: ["10.2.45.5"]ui.port: 8081supervisor.slots.ports:- 6700- 6701- 6702- 6703storm我的项目部署应用maven命令把依赖的jar打到target/dependency目录下 clean dependency:copy-dependencies而后把storm-core-1.2.1.jar包去掉,复制到apache-storm-1.2.2/extlib目录下mkdir app应用maven命令 clean install 打的包 sjz-rta-app-1.0-SNAPSHOT.jar把rta.yaml配置文件 sjz-rta-app-1.0-SNAPSHOT.jar放在app目录../bin/storm jar sjz-rta-app-1.0-SNAPSHOT.jar com.liang.ecpm.rta.AppRTA 运行应用插件maven-assembly-plugin 参看gj-bus-server或xinyun的实时计费我的项目 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.1</version> <scope>provided</scope>打包时这里须要这个配置(本地运行不必) </dependency>应用maven命令 clean install 打的包 sjz-rta-app-1.0-SNAPSHOT-jar-with-dependencies.jar 会把所有依赖的jar包都打在一起 间接能够运行../bin/storm jar sjz-rta-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.liang.ecpm.rta.AppRTA../bin/storm kill rta-sjz-v1-daili-fei -w 3../bin/storm kill saveDbStormStart -w 3 ../bin/storm kill RealPassengerServer -w 3 ../bin/storm kill ScheduleRealServer -w 3../bin/storm jar sjz-rta-app-1.0-SNAPSHOT.jar com.liang.ecpm.rta.AppRTA -nimbus carlan152 -port 6627../bin/storm jar sjz-rta-app.jar com.liang.ecpm.rta.AppRTA -nimbus carlan152 -port 6627../bin/storm jar sjz-rta-app-passenger-schedule.jar com.liang.ecpm.rta.AppRTA cluster

December 29, 2022 · 1 min · jiezi

[case43]聊聊storm的LinearDRPCTopologyBuilder

序本文主要研究一下storm的LinearDRPCTopologyBuilder实例manual drpc @Test public void testManualDRPC() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); DRPCSpout spout = new DRPCSpout(“exclamation”); //Fields(“args”, “return-info”) //spout为DRPCSpout,组件id为drpc builder.setSpout(“drpc”, spout); builder.setBolt(“exclaim”, new ManualExclaimBolt(), 3).shuffleGrouping(“drpc”); //Fields(“result”, “return-info”) builder.setBolt(“return”, new ReturnResults(), 3).shuffleGrouping(“exclaim”); SubmitHelper.submitRemote(“manualDrpc”,builder.createTopology()); }这里展示了最原始的drpc的topology的构建,开始使用DRPCSpout,结束使用ReturnResultsDRPCSpout的outputFields为Fields(“args”, “return-info”),ReturnResults接收的fields为Fields(“result”, “return-info”)这里要求自定义的ManualExclaimBolt的outputFields为Fields为Fields(“result”, “return-info”),其中return-info可以从input中获取,而result则会处理结果使用LinearDRPCTopologyBuilder @Test public void testBasicDRPCTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(“exclamation”); builder.addBolt(new ExclaimBolt(), 3); SubmitHelper.submitRemote(“basicDrpc”,builder.createRemoteTopology()); }LinearDRPCTopologyBuilder自动帮你构建了DRPCSpout、PrepareRequest、CoordinatedBolt、JoinResult、ReturnResults,在使用上极为简洁由于构造的component上下游不同,因而对用户自定义的bolt的要求为输入字段为Fields(“request”, “args”),输出字段为new Fields(“id”, “result”),其中前者的request即为requestId,即为后者的id,是long型;args为输入参数,result为输出结果LinearDRPCTopologyBuilderstorm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.javapublic class LinearDRPCTopologyBuilder { String function; List<Component> components = new ArrayList<>(); public LinearDRPCTopologyBuilder(String function) { this.function = function; } private static String boltId(int index) { return “bolt” + index; } public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) { return addBolt(new BatchBoltExecutor(bolt), parallelism); } public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) { return addBolt(bolt, 1); } @Deprecated public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) { if (parallelism == null) { parallelism = 1; } Component component = new Component(bolt, parallelism.intValue()); components.add(component); return new InputDeclarerImpl(component); } @Deprecated public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) { return addBolt(bolt, null); } public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) { return addBolt(new BasicBoltExecutor(bolt), parallelism); } public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) { return addBolt(bolt, null); } public StormTopology createLocalTopology(ILocalDRPC drpc) { return createTopology(new DRPCSpout(function, drpc)); } public StormTopology createRemoteTopology() { return createTopology(new DRPCSpout(function)); } private StormTopology createTopology(DRPCSpout spout) { final String SPOUT_ID = “spout”; final String PREPARE_ID = “prepare-request”; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID, spout); builder.setBolt(PREPARE_ID, new PrepareRequest()) .noneGrouping(SPOUT_ID); int i = 0; for (; i < components.size(); i++) { Component component = components.get(i); Map<String, SourceArgs> source = new HashMap<String, SourceArgs>(); if (i == 1) { source.put(boltId(i - 1), SourceArgs.single()); } else if (i >= 2) { source.put(boltId(i - 1), SourceArgs.all()); } IdStreamSpec idSpec = null; if (i == components.size() - 1 && component.bolt instanceof FinishedCallback) { idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM); } BoltDeclarer declarer = builder.setBolt( boltId(i), new CoordinatedBolt(component.bolt, source, idSpec), component.parallelism); for (SharedMemory request : component.sharedMemory) { declarer.addSharedMemory(request); } if (!component.componentConf.isEmpty()) { declarer.addConfigurations(component.componentConf); } if (idSpec != null) { declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields(“request”)); } if (i == 0 && component.declarations.isEmpty()) { declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM); } else { String prevId; if (i == 0) { prevId = PREPARE_ID; } else { prevId = boltId(i - 1); } for (InputDeclaration declaration : component.declarations) { declaration.declare(prevId, declarer); } } if (i > 0) { declarer.directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID); } } IRichBolt lastBolt = components.get(components.size() - 1).bolt; OutputFieldsGetter getter = new OutputFieldsGetter(); lastBolt.declareOutputFields(getter); Map<String, StreamInfo> streams = getter.getFieldsDeclaration(); if (streams.size() != 1) { throw new RuntimeException(“Must declare exactly one stream from last bolt in LinearDRPCTopology”); } String outputStream = streams.keySet().iterator().next(); List<String> fields = streams.get(outputStream).get_output_fields(); if (fields.size() != 2) { throw new RuntimeException( “Output stream of last component in LinearDRPCTopology must contain exactly two fields. " + “The first should be the request id, and the second should be the result.”); } builder.setBolt(boltId(i), new JoinResult(PREPARE_ID)) .fieldsGrouping(boltId(i - 1), outputStream, new Fields(fields.get(0))) .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields(“request”)); i++; builder.setBolt(boltId(i), new ReturnResults()) .noneGrouping(boltId(i - 1)); return builder.createTopology(); } //……}从createTopology可以看到,构建的spout为DRPCSpout(spout),之后是PrepareRequest(prepare-request)之后根据用户设置的bolt,包装构建CoordinatedBolt,如果有多个bolt的话,会对第二个及之后的bolt设置directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID),用emitDirect发射Fields(“id”, “count”)构建完用户设置的bolt之后,构建JoinResult,最后才是ReturnResultsDRPCSpoutstorm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.javapublic class DRPCSpout extends BaseRichSpout { public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class); //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS static final long serialVersionUID = 2387848310969237877L; final String _function; final String _local_drpc_id; SpoutOutputCollector _collector; List<DRPCInvocationsClient> _clients = new ArrayList<>(); transient LinkedList<Future<Void>> _futures = null; transient ExecutorService _backround = null; public DRPCSpout(String function) { _function = function; if (DRPCClient.isLocalOverride()) { _local_drpc_id = DRPCClient.getOverrideServiceId(); } else { _local_drpc_id = null; } } //…… @Override public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; if (_local_drpc_id == null) { _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); _futures = new LinkedList<>(); int numTasks = context.getComponentTasks(context.getThisComponentId()).size(); int index = context.getThisTaskIndex(); int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT)); List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS); if (servers == null || servers.isEmpty()) { throw new RuntimeException(“No DRPC servers configured for topology”); } if (numTasks < servers.size()) { for (String s : servers) { _futures.add(_backround.submit(new Adder(s, port, conf))); } } else { int i = index % servers.size(); _futures.add(_backround.submit(new Adder(servers.get(i), port, conf))); } } } @Override public void close() { for (DRPCInvocationsClient client : _clients) { client.close(); } } @Override public void nextTuple() { if (_local_drpc_id == null) { int size = 0; synchronized (_clients) { size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end } for (int i = 0; i < size; i++) { DRPCInvocationsClient client; synchronized (_clients) { client = _clients.get(i); } if (!client.isConnected()) { LOG.warn(“DRPCInvocationsClient [{}:{}] is not connected.”, client.getHost(), client.getPort()); reconnectAsync(client); continue; } try { DRPCRequest req = client.fetchRequest(_function); if (req.get_request_id().length() > 0) { Map<String, Object> returnInfo = new HashMap<>(); returnInfo.put(“id”, req.get_request_id()); returnInfo.put(“host”, client.getHost()); returnInfo.put(“port”, client.getPort()); _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i)); break; } } catch (AuthorizationException aze) { reconnectAsync(client); LOG.error(“Not authorized to fetch DRPC request from DRPC server”, aze); } catch (TException e) { reconnectAsync(client); LOG.error(“Failed to fetch DRPC request from DRPC server”, e); } catch (Exception e) { LOG.error(“Failed to fetch DRPC request from DRPC server”, e); } } checkFutures(); } else { //…… } } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { DRPCMessageId did = (DRPCMessageId) msgId; DistributedRPCInvocations.Iface client; if (_local_drpc_id == null) { client = _clients.get(did.index); } else { client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id); } int retryCnt = 0; int maxRetries = 3; while (retryCnt < maxRetries) { retryCnt++; try { client.failRequest(did.id); break; } catch (AuthorizationException aze) { LOG.error(“Not authorized to failRequest from DRPC server”, aze); throw new RuntimeException(aze); } catch (TException tex) { if (retryCnt >= maxRetries) { LOG.error(“Failed to fail request”, tex); break; } reconnectSync((DRPCInvocationsClient) client); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“args”, “return-info”)); } //……}open的时候准备DRPCInvocationsClientnextTuple方法通过DRPCInvocationsClient.fetchRequest(_function)获取DRPCRequest信息之后构建returnInfo然后emit数据,msgId为DRPCMessageId,tuple为Values(req.get_func_args(), JSONValue.toJSONString(returnInfo))这里重写了fail方法,对于请求失败,进行重试,默认重试3次PrepareRequeststorm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.javapublic class PrepareRequest extends BaseBasicBolt { public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID; public static final String RETURN_STREAM = “ret”; public static final String ID_STREAM = “id”; Random rand; @Override public void prepare(Map<String, Object> map, TopologyContext context) { rand = new Random(); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String args = tuple.getString(0); String returnInfo = tuple.getString(1); long requestId = rand.nextLong(); collector.emit(ARGS_STREAM, new Values(requestId, args)); collector.emit(RETURN_STREAM, new Values(requestId, returnInfo)); collector.emit(ID_STREAM, new Values(requestId)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(ARGS_STREAM, new Fields(“request”, “args”)); declarer.declareStream(RETURN_STREAM, new Fields(“request”, “return”)); declarer.declareStream(ID_STREAM, new Fields(“request”)); }}PrepareRequest取出args及returnInfo,构造requestId,然后emit到ARGS_STREAM、RETURN_STREAM、ID_STREAM三个streamJoinResult会接收PrepareRequest的RETURN_STREAM,第一个CoordinatedBolt会接收ARGS_STREAMCoordinatedBoltstorm-2.0.0/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java/** * Coordination requires the request ids to be globally unique for awhile. This is so it doesn’t get confused in the case of retries. */public class CoordinatedBolt implements IRichBolt { private TimeCacheMap<Object, TrackingInfo> _tracked; //…… public void execute(Tuple tuple) { Object id = tuple.getValue(0); TrackingInfo track; TupleType type = getTupleType(tuple); synchronized (_tracked) { track = _tracked.get(id); if (track == null) { track = new TrackingInfo(); if (_idStreamSpec == null) { track.receivedId = true; } _tracked.put(id, track); } } if (type == TupleType.ID) { synchronized (_tracked) { track.receivedId = true; } checkFinishId(tuple, type); } else if (type == TupleType.COORD) { int count = (Integer) tuple.getValue(1); synchronized (_tracked) { track.reportCount++; track.expectedTupleCount += count; } checkFinishId(tuple, type); } else { synchronized (_tracked) { _delegate.execute(tuple); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { _delegate.declareOutputFields(declarer); declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields(“id”, “count”)); } //…… public static class TrackingInfo { int reportCount = 0; int expectedTupleCount = 0; int receivedTuples = 0; boolean failed = false; Map<Integer, Integer> taskEmittedTuples = new HashMap<>(); boolean receivedId = false; boolean finished = false; List<Tuple> ackTuples = new ArrayList<>(); @Override public String toString() { return “reportCount: " + reportCount + “\n” + “expectedTupleCount: " + expectedTupleCount + “\n” + “receivedTuples: " + receivedTuples + “\n” + “failed: " + failed + “\n” + taskEmittedTuples.toString(); } }}CoordinatedBolt在declareOutputFields的时候,除了调用代理bolt的declareOutputFields外,还declareStream,给Constants.COORDINATED_STREAM_ID发射Fields(“id”, “count”)execute方法首先保证每个requestId都有一个TrackingInfo,它记录了expectedTupleCount以及receivedTuples统计数,还有taskEmittedTuples(这里命名有点歧义,其实是这里维护的是当前bolt发射给下游bolt的task的tuple数量,用于emitDirect告知下游bolt的task它应该接收到的tuple数量(具体是在checkFinishId方法中,在finished的时候发送),下游bolt接收到该统计数之后更新expectedTupleCount)execute方法接收到的tuple有几类,一类是TupleType.ID(_idStreamSpec不为null的情况下)、一类是TupleType.COORD(接收Fields(“id”, “count”),并执行checkFinishId,判断是否应该结束)、一类是TupleType.REGULAR(正常的执行bolt的execute方法)checkFinishId会判断track.reportCount == _numSourceReports以及track.expectedTupleCount == track.receivedTuples,如果满足条件则标记track.finished = true,同时通知下游bolt它应该接收到多少数量的tuple(如果还有的话)。JoinResultstorm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.javapublic class JoinResult extends BaseRichBolt { public static final Logger LOG = LoggerFactory.getLogger(JoinResult.class); String returnComponent; Map<Object, Tuple> returns = new HashMap<>(); Map<Object, Tuple> results = new HashMap<>(); OutputCollector _collector; public JoinResult(String returnComponent) { this.returnComponent = returnComponent; } public void prepare(Map<String, Object> map, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { Object requestId = tuple.getValue(0); if (tuple.getSourceComponent().equals(returnComponent)) { returns.put(requestId, tuple); } else { results.put(requestId, tuple); } if (returns.containsKey(requestId) && results.containsKey(requestId)) { Tuple result = results.remove(requestId); Tuple returner = returns.remove(requestId); LOG.debug(result.getValue(1).toString()); List<Tuple> anchors = new ArrayList<>(); anchors.add(result); anchors.add(returner); _collector.emit(anchors, new Values(”” + result.getValue(1), returner.getValue(1))); _collector.ack(result); _collector.ack(returner); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“result”, “return-info”)); }}如果tuple是PrepareRequest发送过来的,则将tuple放入returns,否则放入results之后判断returns及results两个map是否同时都有该requestId,如果有表示匹配出了结果,则往下游emit数据emit的第一个字段为result,第二个为returnInfoReturnResultsstorm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.javapublic class ReturnResults extends BaseRichBolt { public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class); //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS static final long serialVersionUID = -774882142710631591L; OutputCollector _collector; boolean local; Map<String, Object> _conf; Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>(); @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { _conf = topoConf; _collector = collector; local = topoConf.get(Config.STORM_CLUSTER_MODE).equals(“local”); } @Override public void execute(Tuple input) { String result = (String) input.getValue(0); String returnInfo = (String) input.getValue(1); if (returnInfo != null) { Map<String, Object> retMap; try { retMap = (Map<String, Object>) JSONValue.parseWithException(returnInfo); } catch (ParseException e) { LOG.error(“Parseing returnInfo failed”, e); _collector.fail(input); return; } final String host = (String) retMap.get(“host”); final int port = ObjectReader.getInt(retMap.get(“port”)); String id = (String) retMap.get(“id”); DistributedRPCInvocations.Iface client; if (local) { client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host); } else { List server = new ArrayList() {{ add(host); add(port); }}; if (!_clients.containsKey(server)) { try { _clients.put(server, new DRPCInvocationsClient(_conf, host, port)); } catch (TTransportException ex) { throw new RuntimeException(ex); } } client = _clients.get(server); } int retryCnt = 0; int maxRetries = 3; while (retryCnt < maxRetries) { retryCnt++; try { client.result(id, result); _collector.ack(input); break; } catch (AuthorizationException aze) { LOG.error(“Not authorized to return results to DRPC server”, aze); _collector.fail(input); throw new RuntimeException(aze); } catch (TException tex) { if (retryCnt >= maxRetries) { LOG.error(“Failed to return results to DRPC server”, tex); _collector.fail(input); } reconnectClient((DRPCInvocationsClient) client); } } } } private void reconnectClient(DRPCInvocationsClient client) { if (client instanceof DRPCInvocationsClient) { try { LOG.info(“reconnecting… “); client.reconnectClient(); //Blocking call } catch (TException e2) { LOG.error(“Failed to connect to DRPC server”, e2); } } } @Override public void cleanup() { for (DRPCInvocationsClient c : _clients.values()) { c.close(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { }}ReturnResults主要是将结果发送给请求的DRPCInvocationsClientreturnInfo里头包含了要将结果发送到的目标host、port,根据host、port构造DRPCInvocationsClient之后调用DRPCInvocationsClient.result(id, result)方法将结果返回,默认重试3次,如果是AuthorizationException则直接fail,如果成功则ack小结LinearDRPCTopologyBuilder在v0.9.1-incubating版本的时候被标记为@Deprecated(2012年月),当时认为Trident的newDRPCStream的替代,不过这样的话要用drpc就得使用Trident,所以后来(2018年4月)移除掉该标志,在2.0.0, 1.1.3, 1.0.7, 1.2.2版本均已经不是废弃标记LinearDRPCTopologyBuilder包装组合了DRPCSpout、PrepareRequest、CoordinatedBolt、JoinResult、ReturnResults,对外暴露简单的api无需用户在构造这些componentDRPCSpout主要是构造args以及returnInfo信息;PrepareRequest将数据分流,发往ARGS_STREAM、RETURN_STREAM、ID_STREAM;CoordinatedBolt主要是保障这些bolt之间的tuple被完整传递及ack;JoinResult主要是匹配requestId及结果,将请求与响应的数据匹配上,然后发送到下游;ReturnResults根据returnInfo将数据返回给Client端使用LinearDRPCTopologyBuilder,对于第一个bolt,其输入为Fields(“request”, “args”);对最后一个bolt要求输出字段为new Fields(“id”, “result”);对于非最后一个bolt要求输出字段的第一个字段为id,即requestId,方便CoordinatedBolt进行追踪统计,确认bolt是否成功接收上游bolt发送的所有tuple。docDistributed RPCLinearDRPCTopologyBuilder DeprecatedLinearDRPCTopologyBuilder is deprecated - what to use insteadLinearDRPCTopologyBuilder shouldn’t be deprecatedTwitter Storm源代码分析之CoordinatedBolt ...

October 29, 2018 · 8 min · jiezi

聊聊storm的JoinBolt

序本文主要研究一下storm的JoinBolt实例 @Test public void testJoinBolt() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(“uuid-spout”, new RandomWordSpout(new String[]{“uuid”, “timestamp”}), 1); builder.setSpout(“word-spout”, new RandomWordSpout(new String[]{“word”, “timestamp”}), 1); JoinBolt joinBolt = new JoinBolt(“uuid-spout”, “timestamp”) //from priorStream inner join newStream on newStream.field = priorStream.field1 .join(“word-spout”, “timestamp”, “uuid-spout”) .select(“uuid,word,timestamp”) .withTumblingWindow(BaseWindowedBolt.Count.of(10)); builder.setBolt(“join”, joinBolt,1) .fieldsGrouping(“uuid-spout”,new Fields(“timestamp”)) .fieldsGrouping(“word-spout”,new Fields(“timestamp”)); builder.setBolt(“fileWriter”,new FilePrinterBolt(),1).globalGrouping(“join”); SubmitHelper.submitRemote(“windowTopology”,builder.createTopology()); }JoinBoltstorm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javapublic class JoinBolt extends BaseWindowedBolt { protected final Selector selectorType; // Map[StreamName -> JoinInfo] protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>(); protected FieldSelector[] outputFields; // specified via bolt.select() … used in declaring Output fields // protected String[] dotSeparatedOutputFieldNames; // fieldNames in x.y.z format w/o stream name, used for naming output fields protected String outputStreamName; // Map[StreamName -> Map[Key -> List<Tuple>] ] HashMap<String, HashMap<Object, ArrayList<Tuple>>> hashedInputs = new HashMap<>(); // holds remaining streams private OutputCollector collector; /** * Calls JoinBolt(Selector.SOURCE, sourceId, fieldName) * * @param sourceId Id of source component (spout/bolt) from which this bolt is receiving data * @param fieldName the field to use for joining the stream (x.y.z format) / public JoinBolt(String sourceId, String fieldName) { this(Selector.SOURCE, sourceId, fieldName); } /* * Introduces the first stream to start the join with. Equivalent SQL … select …. from srcOrStreamId … * * @param type Specifies whether ‘srcOrStreamId’ refers to stream name/source component * @param srcOrStreamId name of stream OR source component * @param fieldName the field to use for joining the stream (x.y.z format) / public JoinBolt(Selector type, String srcOrStreamId, String fieldName) { selectorType = type; joinCriteria.put(srcOrStreamId, new JoinInfo(new FieldSelector(srcOrStreamId, fieldName))); } /* * Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on ‘default’ stream. / public JoinBolt withOutputStream(String streamName) { this.outputStreamName = streamName; return this; } /* * Performs inner Join with the newStream. SQL : from priorStream inner join newStream on newStream.field = priorStream.field1 same * as: new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream); * * Note: priorStream must be previously joined. Valid ex: new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2); Invalid ex: * new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1); * * @param newStream Either stream name or name of upstream component * @param field the field on which to perform the join / public JoinBolt join(String newStream, String field, String priorStream) { return joinCommon(newStream, field, priorStream, JoinType.INNER); } /* * Performs left Join with the newStream. SQL : from stream1 left join stream2 on stream2.field = stream1.field1 same as: new * WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1); * * Note: priorStream must be previously joined Valid ex: new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2); * Invalid ex: new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1); * * @param newStream Either a name of a stream or an upstream component * @param field the field on which to perform the join / public JoinBolt leftJoin(String newStream, String field, String priorStream) { return joinCommon(newStream, field, priorStream, JoinType.LEFT); } private JoinBolt joinCommon(String newStream, String fieldDescriptor, String priorStream, JoinType joinType) { if (hashedInputs.containsKey(newStream)) { throw new IllegalArgumentException("’" + newStream + “’ is already part of join. Cannot join with it more than once.”); } hashedInputs.put(newStream, new HashMap<Object, ArrayList<Tuple>>()); JoinInfo joinInfo = joinCriteria.get(priorStream); if (joinInfo == null) { throw new IllegalArgumentException(“Stream ‘” + priorStream + “’ was not previously declared”); } FieldSelector field = new FieldSelector(newStream, fieldDescriptor); joinCriteria.put(newStream, new JoinInfo(field, priorStream, joinInfo, joinType)); return this; } /* * Specify projection fields. i.e. Specifies the fields to include in the output. e.g: .select(“field1, stream2:field2, field3”) Nested * Key names are supported for nested types: e.g: .select(“outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)” Inner * types (non leaf) must be Map<> in order to support nested lookup using this dot notation This selected fields implicitly declare the * output fieldNames for the bolt based. * * @param commaSeparatedKeys * @return */ public JoinBolt select(String commaSeparatedKeys) { String[] fieldNames = commaSeparatedKeys.split(","); outputFields = new FieldSelector[fieldNames.length]; for (int i = 0; i < fieldNames.length; i++) { outputFields[i] = new FieldSelector(fieldNames[i]); } return this; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { String[] outputFieldNames = new String[outputFields.length]; for (int i = 0; i < outputFields.length; ++i) { outputFieldNames[i] = outputFields[i].getOutputName(); } if (outputStreamName != null) { declarer.declareStream(outputStreamName, new Fields(outputFieldNames)); } else { declarer.declare(new Fields(outputFieldNames)); } } @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { this.collector = collector; // initialize the hashedInputs data structure int i = 0; for (String stream : joinCriteria.keySet()) { if (i > 0) { hashedInputs.put(stream, new HashMap<Object, ArrayList<Tuple>>()); } ++i; } if (outputFields == null) { throw new IllegalArgumentException(“Must specify output fields via .select() method.”); } } @Override public void execute(TupleWindow inputWindow) { // 1) Perform Join List<Tuple> currentWindow = inputWindow.get(); JoinAccumulator joinResult = hashJoin(currentWindow); // 2) Emit results for (ResultRecord resultRecord : joinResult.getRecords()) { ArrayList<Object> outputTuple = resultRecord.getOutputFields(); if (outputStreamName == null) { // explicit anchoring emits to corresponding input tuples only, as default window anchoring will anchor them to all // tuples in window collector.emit(resultRecord.tupleList, outputTuple); } else { // explicitly anchor emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples // in window collector.emit(outputStreamName, resultRecord.tupleList, outputTuple); } } } //……}JoinBolt继承了BaseWindowedBolt,定义了Selector selectorType、LinkedHashMap<String, JoinInfo> joinCriteria、FieldSelector[] outputFields等属性,用于记录关联类型及关联关系join、leftJoin方法用于设置join关联关系,最后都是调用joinCommon方法,关联关系使用JoinInfo对象,存储在joinCriteria中select方法用于选择结果集的列,最后设置到outputFields,用于declareOutputFieldsexecute就是join的核心逻辑了,这里调用了hashJoinJoinBolt.hashJoinstorm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java protected JoinAccumulator hashJoin(List<Tuple> tuples) { clearHashedInputs(); JoinAccumulator probe = new JoinAccumulator(); // 1) Build phase - Segregate tuples in the Window into streams. // First stream’s tuples go into probe, rest into HashMaps in hashedInputs String firstStream = joinCriteria.keySet().iterator().next(); for (Tuple tuple : tuples) { String streamId = getStreamSelector(tuple); if (!streamId.equals(firstStream)) { Object field = getJoinField(streamId, tuple); ArrayList<Tuple> recs = hashedInputs.get(streamId).get(field); if (recs == null) { recs = new ArrayList<Tuple>(); hashedInputs.get(streamId).put(field, recs); } recs.add(tuple); } else { ResultRecord probeRecord = new ResultRecord(tuple, joinCriteria.size() == 1); probe.insert(probeRecord); // first stream’s data goes into the probe } } // 2) Join the streams in order of streamJoinOrder int i = 0; for (String streamName : joinCriteria.keySet()) { boolean finalJoin = (i == joinCriteria.size() - 1); if (i > 0) { probe = doJoin(probe, hashedInputs.get(streamName), joinCriteria.get(streamName), finalJoin); } ++i; } return probe; }hashJoin方法先遍历一下tuples,把tuples分为两类,firstStream的数据存到JoinAccumulator probe中,其余的存到HashMap<String, HashMap<Object, ArrayList<Tuple>>> hashedInputs之后对剩余的streamId,挨个遍历调用doJoin,把结果整合到JoinAccumulator probeJoinAccumulatorstorm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java protected class JoinAccumulator { ArrayList<ResultRecord> records = new ArrayList<>(); public void insert(ResultRecord tuple) { records.add(tuple); } public Collection<ResultRecord> getRecords() { return records; } }JoinAccumulator就是一个ArrayList<ResultRecord>ResultRecordstorm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java // Join helper to concat fields to the record protected class ResultRecord { ArrayList<Tuple> tupleList = new ArrayList<>(); // contains one Tuple per Stream being joined ArrayList<Object> outFields = null; // refs to fields that will be part of output fields // ‘generateOutputFields’ enables us to avoid projection unless it is the final stream being joined public ResultRecord(Tuple tuple, boolean generateOutputFields) { tupleList.add(tuple); if (generateOutputFields) { outFields = doProjection(tupleList, outputFields); } } public ResultRecord(ResultRecord lhs, Tuple rhs, boolean generateOutputFields) { if (lhs != null) { tupleList.addAll(lhs.tupleList); } if (rhs != null) { tupleList.add(rhs); } if (generateOutputFields) { outFields = doProjection(tupleList, outputFields); } } public ArrayList<Object> getOutputFields() { return outFields; } // ‘stream’ cannot be null, public Object getField(FieldSelector fieldSelector) { for (Tuple tuple : tupleList) { Object result = lookupField(fieldSelector, tuple); if (result != null) { return result; } } return null; } } // Performs projection on the tuples based on ‘projectionFields’ protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, FieldSelector[] projectionFields) { ArrayList<Object> result = new ArrayList<>(projectionFields.length); // Todo: optimize this computation… perhaps inner loop can be outside to avoid rescanning tuples for (int i = 0; i < projectionFields.length; i++) { boolean missingField = true; for (Tuple tuple : tuples) { Object field = lookupField(projectionFields[i], tuple); if (field != null) { result.add(field); missingField = false; break; } } if (missingField) { // add a null for missing fields (usually in case of outer joins) result.add(null); } } return result; } // Extract the field from tuple. Field may be nested field (x.y.z) protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) { // very stream name matches, it stream name was specified if (fieldSelector.streamName != null && !fieldSelector.streamName.equalsIgnoreCase(getStreamSelector(tuple))) { return null; } Object curr = null; for (int i = 0; i < fieldSelector.field.length; i++) { if (i == 0) { if (tuple.contains(fieldSelector.field[i])) { curr = tuple.getValueByField(fieldSelector.field[i]); } else { return null; } } else { curr = ((Map) curr).get(fieldSelector.field[i]); if (curr == null) { return null; } } } return curr; }ResultRecord用于存储joined之后的数据当joinCriteria.size() == 1或者finalJoin为true的时候,ResultRecord的generateOutputFields为true,会调用doProjection对结果集进行projection操作当遍历joinCriteria调用doJoin的时候,遍历到最后一条记录时为trueJoinBolt.doJoinstorm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java // Dispatches to the right join method (inner/left/right/outer) based on the joinInfo.joinType protected JoinAccumulator doJoin(JoinAccumulator probe, HashMap<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) { final JoinType joinType = joinInfo.getJoinType(); switch (joinType) { case INNER: return doInnerJoin(probe, buildInput, joinInfo, finalJoin); case LEFT: return doLeftJoin(probe, buildInput, joinInfo, finalJoin); case RIGHT: case OUTER: default: throw new RuntimeException(“Unsupported join type : " + joinType.name()); } }doJoin封装了各种join类型的方法,目前仅仅实现了INNER以及LEFT,分别调用doInnerJoin、doLeftJoin方法doInnerJoinstorm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java // inner join - core implementation protected JoinAccumulator doInnerJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) { String[] probeKeyName = joinInfo.getOtherField(); JoinAccumulator result = new JoinAccumulator(); FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName); for (ResultRecord rec : probe.getRecords()) { Object probeKey = rec.getField(fieldSelector); if (probeKey != null) { ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); if (matchingBuildRecs != null) { for (Tuple matchingRec : matchingBuildRecs) { ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin); result.insert(mergedRecord); } } } } return result; }这里挨个对JoinAccumulator probe的records遍历,然后通过probeKey从buildInput寻找对应的records,如果有找到则进行合并doLeftJoinstorm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java // left join - core implementation protected JoinAccumulator doLeftJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) { String[] probeKeyName = joinInfo.getOtherField(); JoinAccumulator result = new JoinAccumulator(); FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName); for (ResultRecord rec : probe.getRecords()) { Object probeKey = rec.getField(fieldSelector); if (probeKey != null) { ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); // ok if its return null if (matchingBuildRecs != null && !matchingBuildRecs.isEmpty()) { for (Tuple matchingRec : matchingBuildRecs) { ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin); result.insert(mergedRecord); } } else { ResultRecord mergedRecord = new ResultRecord(rec, null, finalJoin); result.insert(mergedRecord); } } } return result; }left join与inner join的区别就在于没有找到匹配记录的话,仍旧保留左边的记录小结JoinBolt继承了BaseWindowedBolt,目前仅仅支持inner join及left join,而且要求join的字段与fieldsGrouping的字段相同JoinBolt对于多个stream数据的合并,使用分治的方式实现,采用JoinAccumulator不断累加结果集,循环遍历调用doJoin来完成由于JoinBolt是在内存进行操作,又需要匹配数据,需要消耗CPU及内存,有几个点需要注意一下:window的时间窗口不宜过大,否则内存堆积的数据过多,容易OOM,可根据情况调整时间窗口或者通过Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB设置woker的内存大小采取slding window会造成数据重复join,因而需要使用withTumblingWindow如果开启tuple处理超时,则要求Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS大于windowLength + slidingInterval + 处理时间,避免还没有处理完就误判为超时重新replayed由于windowedBolt会自动对tupleWindow的数据进行anchor,数据量过多anchor操作会给整个topology造成压力,如无必要可以关闭ack(设置Config.TOPOLOGY_ACKER_EXECUTORS为0)Config.TOPOLOGY_MAX_SPOUT_PENDING要设置的大一点,给window的join操作及后续操作足够的时间,在一定程度上避免spout发送tuple速度过快,下游bolt消费不过来生产上Config.TOPOLOGY_DEBUG设置为false关闭debug日志,Config.TOPOLOGY_EVENTLOGGER_EXECUTORS设置为0关闭event loggerdocWindowing Support in Core StormJoining Streams in Storm Core ...

October 26, 2018 · 8 min · jiezi

聊聊storm的WindowedBoltExecutor

序本文主要研究一下storm的WindowedBoltExecutorWindowedBoltExecutorstorm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java/** * An {@link IWindowedBolt} wrapper that does the windowing of tuples. /public class WindowedBoltExecutor implements IRichBolt { public static final String LATE_TUPLE_FIELD = “late_tuple”; private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class); private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s private static final int DEFAULT_MAX_LAG_MS = 0; // no lag private final IWindowedBolt bolt; // package level for unit tests transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator; private transient WindowedOutputCollector windowedOutputCollector; private transient WindowLifecycleListener<Tuple> listener; private transient WindowManager<Tuple> windowManager; private transient int maxLagMs; private TimestampExtractor timestampExtractor; private transient String lateTupleStream; private transient TriggerPolicy<Tuple, ?> triggerPolicy; private transient EvictionPolicy<Tuple, ?> evictionPolicy; private transient Duration windowLengthDuration; public WindowedBoltExecutor(IWindowedBolt bolt) { this.bolt = bolt; timestampExtractor = bolt.getTimestampExtractor(); } @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { doPrepare(topoConf, context, collector, new ConcurrentLinkedQueue<>(), false); } // NOTE: the queue has to be thread safe. protected void doPrepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector, Collection<Event<Tuple>> queue, boolean stateful) { Objects.requireNonNull(topoConf); Objects.requireNonNull(context); Objects.requireNonNull(collector); Objects.requireNonNull(queue); this.windowedOutputCollector = new WindowedOutputCollector(collector); bolt.prepare(topoConf, context, windowedOutputCollector); this.listener = newWindowLifecycleListener(); this.windowManager = initWindowManager(listener, topoConf, context, queue, stateful); start(); LOG.info(“Initialized window manager {} “, windowManager); } @Override public void execute(Tuple input) { if (isTupleTs()) { long ts = timestampExtractor.extractTimestamp(input); if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) { windowManager.add(input, ts); } else { if (lateTupleStream != null) { windowedOutputCollector.emit(lateTupleStream, input, new Values(input)); } else { LOG.info(“Received a late tuple {} with ts {}. This will not be processed.”, input, ts); } windowedOutputCollector.ack(input); } } else { windowManager.add(input); } } @Override public void cleanup() { if (waterMarkEventGenerator != null) { waterMarkEventGenerator.shutdown(); } windowManager.shutdown(); bolt.cleanup(); } // for unit tests WindowManager<Tuple> getWindowManager() { return windowManager; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { String lateTupleStream = (String) getComponentConfiguration().get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM); if (lateTupleStream != null) { declarer.declareStream(lateTupleStream, new Fields(LATE_TUPLE_FIELD)); } bolt.declareOutputFields(declarer); } @Override public Map<String, Object> getComponentConfiguration() { return bolt.getComponentConfiguration(); } //……}WindowedBoltExecutor实现了IRichBolt接口,在prepare的时候初始化windowedOutputCollector、listener、windowManager,调用了bolt.prepare;在cleanup的时候对waterMarkEventGenerator、windowManager、bolt进行清理;TopologyBuilder在setBolt的时候,对原始的IWindowedBolt的实现类进行了一次包装,用WindowedBoltExecutor替代declareOutputFields采用的是bolt.declareOutputFields(declarer);getComponentConfiguration也返回的是bolt.getComponentConfiguration();execute方法主要是将tuple添加到windowManager,对于不纳入window的tuple则立刻进行ackWindowedOutputCollectorstorm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java /* * Creates an {@link OutputCollector} wrapper that automatically anchors the tuples to inputTuples while emitting. / private static class WindowedOutputCollector extends OutputCollector { private List<Tuple> inputTuples; WindowedOutputCollector(IOutputCollector delegate) { super(delegate); } void setContext(List<Tuple> inputTuples) { this.inputTuples = inputTuples; } @Override public List<Integer> emit(String streamId, List<Object> tuple) { return emit(streamId, inputTuples, tuple); } @Override public void emitDirect(int taskId, String streamId, List<Object> tuple) { emitDirect(taskId, streamId, inputTuples, tuple); } }WindowedOutputCollector继承了OutputCollector,可以看到这里重写了emit计emitDirect方法,默认对inputTuples进行anchorWindowLifecycleListenerstorm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java/* * A callback for expiry, activation of events tracked by the {@link WindowManager} * * @param <T> The type of Event in the window (e.g. Tuple). /public interface WindowLifecycleListener<T> { /* * Called on expiry of events from the window due to {@link EvictionPolicy} * * @param events the expired events / void onExpiry(List<T> events); /* * Called on activation of the window due to the {@link TriggerPolicy} * * @param events the list of current events in the window. * @param newEvents the newly added events since last activation. * @param expired the expired events since last activation. * @param referenceTime the reference (event or processing) time that resulted in activation / default void onActivation(List<T> events, List<T> newEvents, List<T> expired, Long referenceTime) { throw new UnsupportedOperationException(“Not implemented”); } /* * Called on activation of the window due to the {@link TriggerPolicy}. This is typically invoked when the windows are persisted in * state and is huge to be loaded entirely in memory. * * @param eventsIt a supplier of iterator over the list of current events in the window * @param newEventsIt a supplier of iterator over the newly added events since the last ativation * @param expiredIt a supplier of iterator over the expired events since the last activation * @param referenceTime the reference (event or processing) time that resulted in activation / default void onActivation(Supplier<Iterator<T>> eventsIt, Supplier<Iterator<T>> newEventsIt, Supplier<Iterator<T>> expiredIt, Long referenceTime) { throw new UnsupportedOperationException(“Not implemented”); }}WindowLifecycleListener定义了几个回调方法,分别是onExpiry、onActivation它们分别是由EvictionPolicy、TriggerPolicy两种策略来触发EvictionPolicystorm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java/* * Eviction policy tracks events and decides whether an event should be evicted from the window or not. * * @param <T> the type of event that is tracked. /public interface EvictionPolicy<T, S> { /* * Decides if an event should be expired from the window, processed in the current window or kept for later processing. * * @param event the input event * @return the {@link org.apache.storm.windowing.EvictionPolicy.Action} to be taken based on the input event / Action evict(Event<T> event); /* * Tracks the event to later decide whether {@link EvictionPolicy#evict(Event)} should evict it or not. * * @param event the input event to be tracked / void track(Event<T> event); /* * Returns the current context that is part of this eviction policy. * * @return the eviction context / EvictionContext getContext(); /* * Sets a context in the eviction policy that can be used while evicting the events. E.g. For TimeEvictionPolicy, this could be used to * set the reference timestamp. * * @param context the eviction context / void setContext(EvictionContext context); /* * Resets the eviction policy. / void reset(); /* * Return runtime state to be checkpointed by the framework for restoring the eviction policy in case of failures. * * @return the state / S getState(); /* * Restore the eviction policy from the state that was earlier checkpointed by the framework. * * @param state the state / void restoreState(S state); /* * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked. / public enum Action { /* * expire the event and remove it from the queue. / EXPIRE, /* * process the event in the current window of events. / PROCESS, /* * don’t include in the current window but keep the event in the queue for evaluating as a part of future windows. / KEEP, /* * stop processing the queue, there cannot be anymore events satisfying the eviction policy. / STOP }}EvictionPolicy主要负责追踪event,然后判断event是否该从window中移除EvictionPolicy有几个实现类:CountEvictionPolicy、TimeEvictionPolicy、WatermarkCountEvictionPolicy、WatermarkTimeEvictionPolicyTriggerPolicystorm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/TriggerPolicy.java/* * Triggers the window calculations based on the policy. * * @param <T> the type of the event that is tracked /public interface TriggerPolicy<T, S> { /* * Tracks the event and could use this to invoke the trigger. * * @param event the input event / void track(Event<T> event); /* * resets the trigger policy. / void reset(); /* * Starts the trigger policy. This can be used during recovery to start the triggers after recovery is complete. / void start(); /* * Any clean up could be handled here. / void shutdown(); /* * Return runtime state to be checkpointed by the framework for restoring the trigger policy in case of failures. * * @return the state / S getState(); /* * Restore the trigger policy from the state that was earlier checkpointed by the framework. * * @param state the state / void restoreState(S state);}TriggerPolicy主要是负责window的计算TriggerPolicy有几个实现类:CountTriggerPolicy、TimeTriggerPolicy、WatermarkCountTriggerPolicy、WatermarkTimeTriggerPolicyWindowedBoltExecutor.newWindowLifecycleListenerstorm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() { return new WindowLifecycleListener<Tuple>() { @Override public void onExpiry(List<Tuple> tuples) { for (Tuple tuple : tuples) { windowedOutputCollector.ack(tuple); } } @Override public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) { windowedOutputCollector.setContext(tuples); boltExecute(tuples, newTuples, expiredTuples, timestamp); } }; } protected void boltExecute(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) { bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, getWindowStartTs(timestamp), timestamp)); }这里创建了一个匿名的WindowLifecycleListener实现在onExpiry的时候挨个对tuple进行ack,在onActivation的时候,调用了boltExecute,构造TupleWindowImpl,传递给bolt进行执行WindowedBoltExecutor.initWindowManagerstorm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf, TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) { WindowManager<Tuple> manager = stateful ? new StatefulWindowManager<>(lifecycleListener, queue) : new WindowManager<>(lifecycleListener, queue); Count windowLengthCount = null; Duration slidingIntervalDuration = null; Count slidingIntervalCount = null; // window length if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) { windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue()); } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) { windowLengthDuration = new Duration( ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS); } // sliding interval if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) { slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue()); } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) { slidingIntervalDuration = new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS); } else { // default is a sliding window of count 1 slidingIntervalCount = new Count(1); } // tuple ts if (timestampExtractor != null) { // late tuple stream lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM); if (lateTupleStream != null) { if (!context.getThisStreams().contains(lateTupleStream)) { throw new IllegalArgumentException( “Stream for late tuples must be defined with the builder method withLateTupleStream”); } } // max lag if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) { maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue(); } else { maxLagMs = DEFAULT_MAX_LAG_MS; } // watermark interval int watermarkInterval; if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) { watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue(); } else { watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS; } waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval, maxLagMs, getComponentStreams(context)); } else { if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) { throw new IllegalArgumentException(“Late tuple stream can be defined only when specifying a timestamp field”); } } // validate validate(topoConf, windowLengthCount, windowLengthDuration, slidingIntervalCount, slidingIntervalDuration); evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration); triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration, manager, evictionPolicy); manager.setEvictionPolicy(evictionPolicy); manager.setTriggerPolicy(triggerPolicy); return manager; } private EvictionPolicy<Tuple, ?> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration) { if (windowLengthCount != null) { if (isTupleTs()) { return new WatermarkCountEvictionPolicy<>(windowLengthCount.value); } else { return new CountEvictionPolicy<>(windowLengthCount.value); } } else { if (isTupleTs()) { return new WatermarkTimeEvictionPolicy<>(windowLengthDuration.value, maxLagMs); } else { return new TimeEvictionPolicy<>(windowLengthDuration.value); } } } private TriggerPolicy<Tuple, ?> getTriggerPolicy(Count slidingIntervalCount, Duration slidingIntervalDuration, WindowManager<Tuple> manager, EvictionPolicy<Tuple, ?> evictionPolicy) { if (slidingIntervalCount != null) { if (isTupleTs()) { return new WatermarkCountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy, manager); } else { return new CountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy); } } else { if (isTupleTs()) { return new WatermarkTimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy, manager); } else { return new TimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy); } } }对于WindowedBoltExecutor来说,stateful为false,这里创建的是WindowManager这里默认的DEFAULT_MAX_LAG_MS为0,即没有lag,默认的DEFAULT_WATERMARK_EVENT_INTERVAL_MS为1000,即1秒这里根据windowLength及slidingInterval指定的参数类型,来获取相应的EvictionPolicy及TriggerPolicy,对于有配置timestampField的,参数是Duration的,则创建的是WatermarkTimeEvictionPolicy以及WatermarkTimeTriggerPolicyWindowManagerstorm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java/* * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks on expiry of events or activation of the window due to * {@link TriggerPolicy}. * * @param <T> the type of event in the window. /public class WindowManager<T> implements TriggerHandler { protected final Collection<Event<T>> queue; private final AtomicInteger eventsSinceLastExpiry; //…… /* * Add an event into the window, with the given ts as the tracking ts. * * @param event the event to track * @param ts the timestamp / public void add(T event, long ts) { add(new EventImpl<T>(event, ts)); } /* * Tracks a window event * * @param windowEvent the window event to track / public void add(Event<T> windowEvent) { // watermark events are not added to the queue. if (!windowEvent.isWatermark()) { queue.add(windowEvent); } else { LOG.debug(“Got watermark event with ts {}”, windowEvent.getTimestamp()); } track(windowEvent); compactWindow(); } /* * feed the event to the eviction and trigger policies for bookkeeping and optionally firing the trigger. / private void track(Event<T> windowEvent) { evictionPolicy.track(windowEvent); triggerPolicy.track(windowEvent); } /* * expires events that fall out of the window every EXPIRE_EVENTS_THRESHOLD so that the window does not grow too big. / protected void compactWindow() { if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) { scanEvents(false); } } /* * Scan events in the queue, using the expiration policy to check if the event should be evicted or not. * * @param fullScan if set, will scan the entire queue; if not set, will stop as soon as an event not satisfying the expiration policy is * found * @return the list of events to be processed as a part of the current window / private List<Event<T>> scanEvents(boolean fullScan) { LOG.debug(“Scan events, eviction policy {}”, evictionPolicy); List<T> eventsToExpire = new ArrayList<>(); List<Event<T>> eventsToProcess = new ArrayList<>(); try { lock.lock(); Iterator<Event<T>> it = queue.iterator(); while (it.hasNext()) { Event<T> windowEvent = it.next(); Action action = evictionPolicy.evict(windowEvent); if (action == EXPIRE) { eventsToExpire.add(windowEvent.get()); it.remove(); } else if (!fullScan || action == STOP) { break; } else if (action == PROCESS) { eventsToProcess.add(windowEvent); } } expiredEvents.addAll(eventsToExpire); } finally { lock.unlock(); } eventsSinceLastExpiry.set(0); LOG.debug(”[{}] events expired from window.”, eventsToExpire.size()); if (!eventsToExpire.isEmpty()) { LOG.debug(“invoking windowLifecycleListener.onExpiry”); windowLifecycleListener.onExpiry(eventsToExpire); } return eventsToProcess; } //……}WindowedBoltExecutor的execute主要是将tuple添加到windowManagerEventImpl的isWatermark返回false,这里主要是执行track及compactWindow操作track主要是委托给evictionPolicy以及triggerPolicy进行track,compactWindow在events超过指定阈值的时候,会触发scanEvents,不是fullScan的话,检测到一个非过期的event就跳出遍历,然后检测eventsToExpire是否为空如果有则触发windowLifecycleListener.onExpiry(eventsToExpire);WaterMarkEventGeneratorstorm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java/* * Tracks tuples across input streams and periodically emits watermark events. Watermark event timestamp is the minimum of the latest tuple * timestamps across all the input streams (minus the lag). Once a watermark event is emitted any tuple coming with an earlier timestamp can * be considered as late events. /public class WaterMarkEventGenerator<T> implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(WaterMarkEventGenerator.class); private final WindowManager<T> windowManager; private final int eventTsLag; private final Set<GlobalStreamId> inputStreams; private final Map<GlobalStreamId, Long> streamToTs; private final ScheduledExecutorService executorService; private final int interval; private ScheduledFuture<?> executorFuture; private volatile long lastWaterMarkTs; //…… public void start() { this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS); } @Override public void run() { try { long waterMarkTs = computeWaterMarkTs(); if (waterMarkTs > lastWaterMarkTs) { this.windowManager.add(new WaterMarkEvent<>(waterMarkTs)); lastWaterMarkTs = waterMarkTs; } } catch (Throwable th) { LOG.error(“Failed while processing watermark event “, th); throw th; } }}WindowedBoltExecutor在start的时候会调用WaterMarkEventGenerator的start方法该方法每隔watermarkInterval时间调度WaterMarkEventGenerator这个任务其run方法就是计算watermark(这批数据最小值-lag),当大于lastWaterMarkTs时,更新lastWaterMarkTs,往windowManager添加WaterMarkEvent(该event的isWatermark为true)windowManager.add(new WaterMarkEvent<>(waterMarkTs))会触发triggerPolicy.track(windowEvent)以及compactWindow操作WatermarkTimeTriggerPolicy.trackstorm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java @Override public void track(Event<T> event) { if (started && event.isWatermark()) { handleWaterMarkEvent(event); } } /* * Invokes the trigger all pending windows up to the watermark timestamp. The end ts of the window is set in the eviction policy context * so that the events falling within that window can be processed. / private void handleWaterMarkEvent(Event<T> event) { long watermarkTs = event.getTimestamp(); long windowEndTs = nextWindowEndTs; LOG.debug(“Window end ts {} Watermark ts {}”, windowEndTs, watermarkTs); while (windowEndTs <= watermarkTs) { long currentCount = windowManager.getEventCount(windowEndTs); evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount)); if (handler.onTrigger()) { windowEndTs += slidingIntervalMs; } else { / * No events were found in the previous window interval. * Scan through the events in the queue to find the next * window intervals based on event ts. / long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs); LOG.debug(“Next aligned window end ts {}”, ts); if (ts == Long.MAX_VALUE) { LOG.debug(“No events to process between {} and watermark ts {}”, windowEndTs, watermarkTs); break; } windowEndTs = ts; } } nextWindowEndTs = windowEndTs; } /* * Computes the next window by scanning the events in the window and finds the next aligned window between the startTs and endTs. Return * the end ts of the next aligned window, i.e. the ts when the window should fire. * * @param startTs the start timestamp (excluding) * @param endTs the end timestamp (including) * @return the aligned window end ts for the next window or Long.MAX_VALUE if there are no more events to be processed. / private long getNextAlignedWindowTs(long startTs, long endTs) { long nextTs = windowManager.getEarliestEventTs(startTs, endTs); if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) { return nextTs; } return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs)); }handleWaterMarkEvent会触发handler.onTrigger()方法WindowManager.onTriggerstorm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java /* * The callback invoked by the trigger policy. / @Override public boolean onTrigger() { List<Event<T>> windowEvents = null; List<T> expired = null; try { lock.lock(); / * scan the entire window to handle out of order events in * the case of time based windows. */ windowEvents = scanEvents(true); expired = new ArrayList<>(expiredEvents); expiredEvents.clear(); } finally { lock.unlock(); } List<T> events = new ArrayList<>(); List<T> newEvents = new ArrayList<>(); for (Event<T> event : windowEvents) { events.add(event.get()); if (!prevWindowEvents.contains(event)) { newEvents.add(event.get()); } } prevWindowEvents.clear(); if (!events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug(“invoking windowLifecycleListener onActivation, [{}] events in window.”, events.size()); windowLifecycleListener.onActivation(events, newEvents, expired, evictionPolicy.getContext().getReferenceTime()); } else { LOG.debug(“No events in the window, skipping onActivation”); } triggerPolicy.reset(); return !events.isEmpty(); }onTrigger方法主要是计算出三类数据,events、expiredEvents、newEvents当events不为空时,触发windowLifecycleListener.onActivation,也就是调用bolt的execute方法小结WindowedBoltExecutor实现了IRichBolt接口,是一个bolt,TopologyBuilder在setBolt的时候,对用户的IWindowedBolt的实现类进行了一次包装,用WindowedBoltExecutor替代,它改造了execute方法,对于该纳入windows的调用windowManager.add添加,该丢弃的则进行ack,而真正的bolt的execute操作,则需要等待window的触发WindowLifecycleListener有两个回调操作,一个是由EvictionPolicy触发的onExpiry,一个是由TriggerPolicy触发的onActivation操作由于window的windowLength及slidingInterval参数有Duration及Count两个维度,因而EvictionPolicy及TriggerPolicy也有这两类维度,外加watermark属性,因而每个policy分别有4个实现类,EvictionPolicy有几个实现类:CountEvictionPolicy、TimeEvictionPolicy、WatermarkCountEvictionPolicy、WatermarkTimeEvictionPolicy;TriggerPolicy有几个实现类:CountTriggerPolicy、TimeTriggerPolicy、WatermarkCountTriggerPolicy、WatermarkTimeTriggerPolicywindowManager.add除了把tuple保存起来外,还调用了两类trigger的track操作,然后进行compactWindow操作;WatermarkTimeEvictionPolicy的track目前没有操作,而WatermarkTimeTriggerPolicy的track方法在event是WaterMarkEvent的时候会触发window操作,调用WindowManager的onTrigger方法,进而筛选出window的数据,然后触发windowLifecycleListener.onActivation操作,最后触发windowedBolt的execute方法WindowManager的onTrigger方法以及add方法都会调用scanEvents,区别是前者是fullScan,后者不是;scanEvents会调用evictionPolicy.evict来判断是否该剔除tuple,进而触发windowLifecycleListener.onExpiry操作,该操作会对tuple进行ack,即过期的tuple在expired的时候会自动ack(理论上所有tuple都会过期,也就都会自动被ack,因而要求Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS大于windowLength + slidingInterval,避免还没ack就被认为超时)WindowedBoltExecutor在start的时候会启动WaterMarkEventGenerator,它会注册一个定时任务,每隔watermarkInterval时间计算watermark(这批数据最小值-lag),当大于lastWaterMarkTs时,更新lastWaterMarkTs,往windowManager添加WaterMarkEvent(该event的isWatermark为true),整个WindowManager的onTrigger方法(即windowLifecycleListener.onActivation操作)就是靠这里来触发的关于ack的话,在WindowedBoltExecutor.execute方法对于未能进入window队列的,没有配置配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM的话,则立马ack;在tuple过期的时候会自ack;WindowedBoltExecutor使用了WindowedOutputCollector,它继承了OutputCollector,对输入的tuples做anchor操作docWindowing Support in Core Storm ...

October 25, 2018 · 12 min · jiezi

apache storm demo示例

从国外网站上翻译的,主要业务是创建移动电话日志分析器。 场景 - 移动呼叫日志分析器 移动电话及其持续时间将作为Apache Storm的输入提供,Storm将处理并分组相同呼叫者和接收者之间的呼叫及其呼叫总数。 创建Spout Spout是用于数据生成的组件。基本上,spout将实现一个IRichSpout接口。“IRichSpout”界面有以下重要方法 - open - 为spout提供执行环境。执行者将运行此方法来初始化spout。 nextTuple - 通过收集器发出生成的数据。 close - spout将要关闭时调用此方法。 declareOutputFields - 声明元组的输出模式。 ack - 确认处理了特定的tuple fail - 指定一个特定的tuple不被处理并且不被重新处理。 open __open__方法的签名如下 - open(Map conf, TopologyContext context, SpoutOutputCollector collector) conf - 为此spout提供storm暴配置。 context - 提供关于topology中spout位置,其任务ID,输入和输出信息的完整信息。 collector - 使我们能够发出将由bolts处理的tuple。 nextTuple __nextTuple__方法的签名如下 - nextTuple() nextTuple()从与ack()和fail()方法相同的循环周期性地调用。当没有工作要做时,它必须释放对线程的控制,以便其他方法有机会被调用。所以nextTuple的第一行检查处理是否完成。如果是这样,它应该睡眠至少一毫秒,以在返回之前减少处理器上的负载。 ...

May 7, 2018 · 4 min · jiezi