聊聊storm的tickTuple

50次阅读

共计 11768 个字符,预计需要花费 30 分钟才能阅读完成。


本文主要研究一下 storm 的 tickTuple
实例
TickWordCountBolt
public class TickWordCountBolt extends BaseBasicBolt {

private static final Logger LOGGER = LoggerFactory.getLogger(TickWordCountBolt.class);

Map<String, Integer> counts = new HashMap<String, Integer>();

@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}

@Override
public void execute(Tuple input, BasicOutputCollector collector) {
if(TupleUtils.isTick(input)){
//execute tick logic
LOGGER.info(“execute tick tuple, emit and clear counts”);
counts.entrySet().stream()
.forEach(entry -> collector.emit(new Values(entry.getKey(), entry.getValue())));
counts.clear();
}else{
String word = input.getString(0);
Integer count = counts.get(word);
if (count == null) count = 0;
count++;
counts.put(word, count);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“word”, “count”));
}
}

使用 tick 的话,在 execute 方法里头要自己判断 tuple 类型,然后执行相应处理
这里实例是重写 getComponentConfiguration 方法,直接 new 了一个 conf,设置了 Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS 参数

tickTopology
@Test
public void testTickTuple() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
TopologyBuilder builder = new TopologyBuilder();
// 并发度 10
builder.setSpout(“spout”, new TestWordSpout(), 10);
builder.setBolt(“count”, new TickWordCountBolt(), 5)
// .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3)
.fieldsGrouping(“spout”, new Fields(“word”));
builder.setBolt(“print”, new PrintBolt(), 1)
.shuffleGrouping(“count”);
SubmitHelper.submitRemote(“tickDemo”,builder);
}

除了重写 getComponentConfiguration 方法配置 Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS 参数外,还可以在 TopologyBuilder.setBolt 之后调用 addConfiguration 方法在配置,这个配置会覆盖 getComponentConfiguration 方法的配置
另外除了在 bolt 上配置,还可以在 StormSubmitter.submitTopology 时,对传入的 conf 配置 Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS 参数,不过这个配置是全局的,作用于整个 topology 的所有 bolt;当出现既有全局配置,又有 bolt 自己的配置时,作用范围小的优先。

源码解析
TupleUtils.isTick
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java
public static boolean isTick(Tuple tuple) {
return tuple != null
&& Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}
isTick 是根据 tuple 的 sourceComponent 以及 sourceStreamId 来判断
TopologyBuilder.setBolt
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
/**
* Define a new bolt in this topology with the specified amount of parallelism.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt’s
* outputs.
* @param bolt the bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process
* somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);
}

private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if (parallelism != null) {
int dop = parallelism.intValue();
if (dop < 1) {
throw new IllegalArgumentException(“Parallelism must be positive.”);
}
common.set_parallelism_hint(dop);
}
Map<String, Object> conf = component.getComponentConfiguration();
if (conf != null) {
common.set_json_conf(JSONValue.toJSONString(conf));
}
commons.put(id, common);
}
setBolt 的时候调用了 initCommon,这里调用了 bolt 的 getComponentConfiguration,将其配置写入到 commons
BoltGetter.addConfiguration
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {
//……
}
addConfiguration 方法继承自 BaseConfigurationDeclarer
BaseConfigurationDeclarer
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java
public abstract class BaseConfigurationDeclarer<T extends ComponentConfigurationDeclarer> implements ComponentConfigurationDeclarer<T> {
@Override
public T addConfiguration(String config, Object value) {
Map<String, Object> configMap = new HashMap<>();
configMap.put(config, value);
return addConfigurations(configMap);
}
//……
}
这里新建一个 map,然后调用子类的 addConfigurations,这里子类为 ConfigGetter
ConfigGetter.addConfigurations
protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> {
String id;

public ConfigGetter(String id) {
this.id = id;
}

@SuppressWarnings(“unchecked”)
@Override
public T addConfigurations(Map<String, Object> conf) {
if (conf != null) {
if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
throw new IllegalArgumentException(“Cannot set serializations for a component using fluent API”);
}
if (!conf.isEmpty()) {
String currConf = commons.get(id).get_json_conf();
commons.get(id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
}
}
return (T) this;
}
//……
}

private static String mergeIntoJson(Map<String, Object> into, Map<String, Object> newMap) {
Map<String, Object> res = new HashMap<>(into);
res.putAll(newMap);
return JSONValue.toJSONString(res);
}
可以看到这里从 common 获取配置,然后将自己的配置合并到 component 自身的配置中,也就是说 addConfiguration 的配置项会覆盖 bolt 在 getComponentConfiguration 方法中的配置
Executor.normalizedComponentConf
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
private Map<String, Object> normalizedComponentConf(
Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) {
List<String> keysToRemove = retrieveAllConfigKeys();
keysToRemove.remove(Config.TOPOLOGY_DEBUG);
keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
keysToRemove.remove(Config.TOPOLOGY_TRANSACTIONAL_ID);
keysToRemove.remove(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
keysToRemove.remove(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS);
keysToRemove.remove(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME);
keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER);
keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);

Map<String, Object> componentConf;
String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf();
if (specJsonConf != null) {
try {
componentConf = (Map<String, Object>) JSONValue.parseWithException(specJsonConf);
} catch (ParseException e) {
throw new RuntimeException(e);
}
for (Object p : keysToRemove) {
componentConf.remove(p);
}
} else {
componentConf = new HashMap<>();
}

Map<String, Object> ret = new HashMap<>();
ret.putAll(topoConf);
ret.putAll(componentConf);

return ret;
}

Executor 在构造器里头会调用 normalizedComponentConf 合并一下配置
对于 componentConf 移除掉 topology 的部分配置项,然后对返回值,先 putAll(topoConf) 再 putAll(componentConf),可以看到如果都有配置 Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS 的话,componentConf 的会覆盖掉 topoConf 的配置。

Executor.setupTicks
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
protected void setupTicks(boolean isSpout) {
final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
if (tickTimeSecs != null) {
boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId))
|| (!enableMessageTimeout && isSpout)) {
LOG.info(“Timeouts disabled for executor {}:{}”, componentId, executorId);
} else {
StormTimer timerTask = workerData.getUserTimer();
timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs,
() -> {
TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
Constants.SYSTEM_COMPONENT_ID,
(int) Constants.SYSTEM_TASK_ID,
Constants.SYSTEM_TICK_STREAM_ID);
AddressedTuple tickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
try {
receiveQueue.publish(tickTuple);
receiveQueue.flush(); // avoid buffering
} catch (InterruptedException e) {
LOG.warn(“Thread interrupted when emitting tick tuple. Setting interrupt flag.”);
Thread.currentThread().interrupt();
return;
}
}
);
}
}
}

这里的 topoConf 是 topoConf 与 componentConf 合并之后的配置,对满足条件的 component 设置 timerTask
可以看到这里 new 的 TupleImpl 的 srcComponent 设置为 Constants.SYSTEM_COMPONENT_ID(__system),taskId 设置为 Constants.SYSTEM_TASK_ID(-1),streamId 设置为 Constants.SYSTEM_TICK_STREAM_ID(__tick)
timerTask 在调度的时候调用 JCQueue(receiveQueue).publish(tickTuple)

JCQueue.publish
private final DirectInserter directInserter = new DirectInserter(this);

/**
* Blocking call. Retries till it can successfully publish the obj. Can be interrupted via Thread.interrupt().
*/
public void publish(Object obj) throws InterruptedException {
Inserter inserter = getInserter();
inserter.publish(obj);
}

private Inserter getInserter() {
Inserter inserter;
if (producerBatchSz > 1) {
inserter = thdLocalBatcher.get();
if (inserter == null) {
BatchInserter b = new BatchInserter(this, producerBatchSz);
inserter = b;
thdLocalBatcher.set(b);
}
} else {
inserter = directInserter;
}
return inserter;
}

private static class DirectInserter implements Inserter {
private JCQueue q;

public DirectInserter(JCQueue q) {
this.q = q;
}

/**
* Blocking call, that can be interrupted via Thread.interrupt
*/
@Override
public void publish(Object obj) throws InterruptedException {
boolean inserted = q.tryPublishInternal(obj);
int idleCount = 0;
while (!inserted) {
q.metrics.notifyInsertFailure();
if (idleCount == 0) {// check avoids multiple log msgs when in a idle loop
LOG.debug(“Experiencing Back Pressure on recvQueue: ‘{}’. Entering BackPressure Wait”, q.getName());
}

idleCount = q.backPressureWaitStrategy.idle(idleCount);
if (Thread.interrupted()) {
throw new InterruptedException();
}
inserted = q.tryPublishInternal(obj);
}

}
//……
}

// Non Blocking. returns true/false indicating success/failure. Fails if full.
private boolean tryPublishInternal(Object obj) {
if (recvQueue.offer(obj)) {
metrics.notifyArrivals(1);
return true;
}
return false;
}

JCQueue.publish 的时候调用 inserter.publish,这里 inserter 可能是 BatchInserter 或 DirectInserter,这里看一下 DirectInserter 的 publish 方法
DirectInserter 的 publish 方法调用了 JCQueue.tryPublishInternal,而该方法调用的是 recvQueue.offer(obj),放入到 recvQueue 队列

JCQueue.consume
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
/**
* Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of
* elements consumed from Q
*/
public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
try {
return consumeImpl(consumer, exitCond);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

/**
* Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
*
* @param consumer
* @param exitCond
*/
private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
int drainCount = 0;
while (exitCond.keepRunning()) {
Object tuple = recvQueue.poll();
if (tuple == null) {
break;
}
consumer.accept(tuple);
++drainCount;
}

int overflowDrainCount = 0;
int limit = overflowQ.size();
while (exitCond.keepRunning() && (overflowDrainCount < limit)) {// 2nd cond prevents staying stuck with consuming overflow
Object tuple = overflowQ.poll();
++overflowDrainCount;
consumer.accept(tuple);
}
int total = drainCount + overflowDrainCount;
if (total > 0) {
consumer.flush();
}
return total;
}

在聊聊 storm worker 的 executor 与 task 这篇文章我们有看到 executor 的 asyncLoop 主要是调用 Executor.call().call() 方法,对于 BoltExecutor.call 则是调用 JCQueue.consume 方法,该方法调用的是 recvQueue.poll()
可以看到 tickTuple 与 bolt 的业务 tuple 是共用一个队列的

小结

关于 tick 的参数配置,有 topology 层面,有 BoltDeclarer 层面,也有 bolt 的 getComponentConfiguration 层面,三种方式,BoltDeclarer 优先级最高,然后是 bolt 的 getComponentConfiguration,最后是全局的 topology 层面的配置
对于 tickTuple,采用的是 StormTimer 进行调度,调度的时候,往 bolt 的 JCQueue 的 publish 方法,具体是是调用 recvQueue.offer(obj);而 executor 的 asycLoop 调用 Executor.call().call() 方法,对于 BoltExecutor.call 则是调用 JCQueue.consume 方法,该方法调用的是 recvQueue.poll()
因此可以看到 timer 只负责往队列发送 tickTuple,至于触发的时间精度,不一定百分百精确,具体要看 recvQueue 队列的长度以及 executor 的消费能力

doc

关于 Storm tick
Tick tuples within Storm
storm 定时的三种方式及 tick 详解
Apache Storm Design Pattern—Micro Batching
聊聊 storm worker 的 executor 与 task

正文完
 0