序本文主要研究一下storm的tickTuple实例TickWordCountBoltpublic class TickWordCountBolt extends BaseBasicBolt { private static final Logger LOGGER = LoggerFactory.getLogger(TickWordCountBolt.class); Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public Map<String, Object> getComponentConfiguration() { Config conf = new Config(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); return conf; } @Override public void execute(Tuple input, BasicOutputCollector collector) { if(TupleUtils.isTick(input)){ //execute tick logic LOGGER.info(“execute tick tuple, emit and clear counts”); counts.entrySet().stream() .forEach(entry -> collector.emit(new Values(entry.getKey(), entry.getValue()))); counts.clear(); }else{ String word = input.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“word”, “count”)); }}使用tick的话,在execute方法里头要自己判断tuple类型,然后执行相应处理这里实例是重写getComponentConfiguration方法,直接new了一个conf,设置了Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS参数tickTopology @Test public void testTickTuple() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); //并发度10 builder.setSpout(“spout”, new TestWordSpout(), 10); builder.setBolt(“count”, new TickWordCountBolt(), 5)// .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3) .fieldsGrouping(“spout”, new Fields(“word”)); builder.setBolt(“print”, new PrintBolt(), 1) .shuffleGrouping(“count”); SubmitHelper.submitRemote(“tickDemo”,builder); }除了重写getComponentConfiguration方法配置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS参数外,还可以在TopologyBuilder.setBolt之后调用addConfiguration方法在配置,这个配置会覆盖getComponentConfiguration方法的配置另外除了在bolt上配置,还可以在StormSubmitter.submitTopology时,对传入的conf配置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS参数,不过这个配置是全局的,作用于整个topology的所有bolt;当出现既有全局配置,又有bolt自己的配置时,作用范围小的优先。源码解析TupleUtils.isTickstorm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java public static boolean isTick(Tuple tuple) { return tuple != null && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); }isTick是根据tuple的sourceComponent以及sourceStreamId来判断TopologyBuilder.setBoltstorm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java /** * Define a new bolt in this topology with the specified amount of parallelism. * * @param id the id of this component. This id is referenced by other components that want to consume this bolt’s * outputs. * @param bolt the bolt * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process * somewhere around the cluster. * @return use the returned object to declare the inputs to this component * * @throws IllegalArgumentException if {@code parallelism_hint} is not positive / public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException { validateUnusedId(id); initCommon(id, bolt, parallelism_hint); _bolts.put(id, bolt); return new BoltGetter(id); } private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException { ComponentCommon common = new ComponentCommon(); common.set_inputs(new HashMap<GlobalStreamId, Grouping>()); if (parallelism != null) { int dop = parallelism.intValue(); if (dop < 1) { throw new IllegalArgumentException(“Parallelism must be positive.”); } common.set_parallelism_hint(dop); } Map<String, Object> conf = component.getComponentConfiguration(); if (conf != null) { common.set_json_conf(JSONValue.toJSONString(conf)); } commons.put(id, common); }setBolt的时候调用了initCommon,这里调用了bolt的getComponentConfiguration,将其配置写入到commonsBoltGetter.addConfigurationstorm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer { //…… }addConfiguration方法继承自BaseConfigurationDeclarerBaseConfigurationDeclarerstorm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.javapublic abstract class BaseConfigurationDeclarer<T extends ComponentConfigurationDeclarer> implements ComponentConfigurationDeclarer<T> { @Override public T addConfiguration(String config, Object value) { Map<String, Object> configMap = new HashMap<>(); configMap.put(config, value); return addConfigurations(configMap); } //……}这里新建一个map,然后调用子类的addConfigurations,这里子类为ConfigGetterConfigGetter.addConfigurations protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> { String id; public ConfigGetter(String id) { this.id = id; } @SuppressWarnings(“unchecked”) @Override public T addConfigurations(Map<String, Object> conf) { if (conf != null) { if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { throw new IllegalArgumentException(“Cannot set serializations for a component using fluent API”); } if (!conf.isEmpty()) { String currConf = commons.get(id).get_json_conf(); commons.get(id).set_json_conf(mergeIntoJson(parseJson(currConf), conf)); } } return (T) this; } //…… } private static String mergeIntoJson(Map<String, Object> into, Map<String, Object> newMap) { Map<String, Object> res = new HashMap<>(into); res.putAll(newMap); return JSONValue.toJSONString(res); }可以看到这里从common获取配置,然后将自己的配置合并到component自身的配置中,也就是说addConfiguration的配置项会覆盖bolt在getComponentConfiguration方法中的配置Executor.normalizedComponentConfstorm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java private Map<String, Object> normalizedComponentConf( Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) { List<String> keysToRemove = retrieveAllConfigKeys(); keysToRemove.remove(Config.TOPOLOGY_DEBUG); keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING); keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM); keysToRemove.remove(Config.TOPOLOGY_TRANSACTIONAL_ID); keysToRemove.remove(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS); keysToRemove.remove(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS); keysToRemove.remove(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY); keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT); keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS); keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT); keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS); keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS); keysToRemove.remove(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME); keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER); keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG); keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM); Map<String, Object> componentConf; String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf(); if (specJsonConf != null) { try { componentConf = (Map<String, Object>) JSONValue.parseWithException(specJsonConf); } catch (ParseException e) { throw new RuntimeException(e); } for (Object p : keysToRemove) { componentConf.remove(p); } } else { componentConf = new HashMap<>(); } Map<String, Object> ret = new HashMap<>(); ret.putAll(topoConf); ret.putAll(componentConf); return ret; }Executor在构造器里头会调用normalizedComponentConf合并一下配置对于componentConf移除掉topology的部分配置项,然后对返回值,先putAll(topoConf)再putAll(componentConf),可以看到如果都有配置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的话,componentConf的会覆盖掉topoConf的配置。Executor.setupTicksstorm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java protected void setupTicks(boolean isSpout) { final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null); if (tickTimeSecs != null) { boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS); if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId)) || (!enableMessageTimeout && isSpout)) { LOG.info(“Timeouts disabled for executor {}:{}”, componentId, executorId); } else { StormTimer timerTask = workerData.getUserTimer(); timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, () -> { TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs), Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID); AddressedTuple tickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); try { receiveQueue.publish(tickTuple); receiveQueue.flush(); // avoid buffering } catch (InterruptedException e) { LOG.warn(“Thread interrupted when emitting tick tuple. Setting interrupt flag.”); Thread.currentThread().interrupt(); return; } } ); } } }这里的topoConf是topoConf与componentConf合并之后的配置,对满足条件的component设置timerTask可以看到这里new的TupleImpl的srcComponent设置为Constants.SYSTEM_COMPONENT_ID(__system),taskId设置为Constants.SYSTEM_TASK_ID(-1),streamId设置为Constants.SYSTEM_TICK_STREAM_ID(__tick)timerTask在调度的时候调用JCQueue(receiveQueue).publish(tickTuple)JCQueue.publish private final DirectInserter directInserter = new DirectInserter(this); /* * Blocking call. Retries till it can successfully publish the obj. Can be interrupted via Thread.interrupt(). / public void publish(Object obj) throws InterruptedException { Inserter inserter = getInserter(); inserter.publish(obj); } private Inserter getInserter() { Inserter inserter; if (producerBatchSz > 1) { inserter = thdLocalBatcher.get(); if (inserter == null) { BatchInserter b = new BatchInserter(this, producerBatchSz); inserter = b; thdLocalBatcher.set(b); } } else { inserter = directInserter; } return inserter; } private static class DirectInserter implements Inserter { private JCQueue q; public DirectInserter(JCQueue q) { this.q = q; } /* * Blocking call, that can be interrupted via Thread.interrupt / @Override public void publish(Object obj) throws InterruptedException { boolean inserted = q.tryPublishInternal(obj); int idleCount = 0; while (!inserted) { q.metrics.notifyInsertFailure(); if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop LOG.debug(“Experiencing Back Pressure on recvQueue: ‘{}’. Entering BackPressure Wait”, q.getName()); } idleCount = q.backPressureWaitStrategy.idle(idleCount); if (Thread.interrupted()) { throw new InterruptedException(); } inserted = q.tryPublishInternal(obj); } } //…… } // Non Blocking. returns true/false indicating success/failure. Fails if full. private boolean tryPublishInternal(Object obj) { if (recvQueue.offer(obj)) { metrics.notifyArrivals(1); return true; } return false; }JCQueue.publish的时候调用inserter.publish,这里inserter可能是BatchInserter或DirectInserter,这里看一下DirectInserter的publish方法DirectInserter的publish方法调用了JCQueue.tryPublishInternal,而该方法调用的是recvQueue.offer(obj),放入到recvQueue队列JCQueue.consumestorm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java /* * Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of * elements consumed from Q / public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) { try { return consumeImpl(consumer, exitCond); } catch (InterruptedException e) { throw new RuntimeException(e); } } /* * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q * * @param consumer * @param exitCond */ private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException { int drainCount = 0; while (exitCond.keepRunning()) { Object tuple = recvQueue.poll(); if (tuple == null) { break; } consumer.accept(tuple); ++drainCount; } int overflowDrainCount = 0; int limit = overflowQ.size(); while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow Object tuple = overflowQ.poll(); ++overflowDrainCount; consumer.accept(tuple); } int total = drainCount + overflowDrainCount; if (total > 0) { consumer.flush(); } return total; }在聊聊storm worker的executor与task这篇文章我们有看到executor的asyncLoop主要是调用Executor.call().call()方法,对于BoltExecutor.call则是调用JCQueue.consume方法,该方法调用的是recvQueue.poll()可以看到tickTuple与bolt的业务tuple是共用一个队列的小结关于tick的参数配置,有topology层面,有BoltDeclarer层面,也有bolt的getComponentConfiguration层面,三种方式,BoltDeclarer优先级最高,然后是bolt的getComponentConfiguration,最后是全局的topology层面的配置对于tickTuple,采用的是StormTimer进行调度,调度的时候,往bolt的JCQueue的publish方法,具体是是调用recvQueue.offer(obj);而executor的asycLoop调用Executor.call().call()方法,对于BoltExecutor.call则是调用JCQueue.consume方法,该方法调用的是recvQueue.poll()因此可以看到timer只负责往队列发送tickTuple,至于触发的时间精度,不一定百分百精确,具体要看recvQueue队列的长度以及executor的消费能力doc关于Storm tickTick tuples within Stormstorm定时的三种方式及tick详解Apache Storm Design Pattern—Micro Batching聊聊storm worker的executor与task