聊聊storm的OpaquePartitionedTridentSpoutExecutor

69次阅读

共计 20443 个字符,预计需要花费 52 分钟才能阅读完成。


本文主要研究一下 storm 的 OpaquePartitionedTridentSpoutExecutor
TridentTopology.newStream
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
}
TridentTopology.newStream 方法,对于 IOpaquePartitionedTridentSpout 类型的 spout 会使用 OpaquePartitionedTridentSpoutExecutor 来包装;而 KafkaTridentSpoutOpaque 则实现了 IOpaquePartitionedTridentSpout 接口
TridentTopologyBuilder.buildTopology
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java
public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
TopologyBuilder builder = new TopologyBuilder();
Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);

Map<String, List<String>> batchesToCommitIds = new HashMap<>();
Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();

for(String id: _spouts.keySet()) {
TransactionalSpoutComponent c = _spouts.get(id);
if(c.spout instanceof IRichSpout) {

//TODO: wrap this to set the stream name
builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
} else {
String batchGroup = c.batchGroupId;
if(!batchesToCommitIds.containsKey(batchGroup)) {
batchesToCommitIds.put(batchGroup, new ArrayList<String>());
}
batchesToCommitIds.get(batchGroup).add(c.commitStateId);

if(!batchesToSpouts.containsKey(batchGroup)) {
batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
}
batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);

BoltDeclarer scd =
builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
.globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
.globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);

for(Map<String, Object> m: c.componentConfs) {
scd.addConfigurations(m);
}

Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
specs.put(c.batchGroupId, new CoordSpec());
BoltDeclarer bd = builder.setBolt(id,
new TridentBoltExecutor(
new TridentSpoutExecutor(
c.commitStateId,
c.streamName,
((ITridentSpout) c.spout)),
batchIdsForSpouts,
specs),
c.parallelism);
bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
if(c.spout instanceof ICommitterTridentSpout) {
bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
}
for(Map<String, Object> m: c.componentConfs) {
bd.addConfigurations(m);
}
}
}

//……

return builder.createTopology();
}
TridentTopologyBuilder.buildTopology 会将 IOpaquePartitionedTridentSpout(OpaquePartitionedTridentSpoutExecutor) 使用 TridentSpoutExecutor 包装,然后再使用 TridentBoltExecutor 包装为 bolt
OpaquePartitionedTridentSpoutExecutor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout<Object> {
protected final Logger LOG = LoggerFactory.getLogger(OpaquePartitionedTridentSpoutExecutor.class);

IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;

//……

public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> spout) {
_spout = spout;
}

@Override
public ITridentSpout.BatchCoordinator<Object> getCoordinator(String txStateId, Map conf, TopologyContext context) {
return new Coordinator(conf, context);
}

@Override
public ICommitterTridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
return new Emitter(txStateId, conf, context);
}

@Override
public Fields getOutputFields() {
return _spout.getOutputFields();
}

@Override
public Map<String, Object> getComponentConfiguration() {
return _spout.getComponentConfiguration();
}

}
OpaquePartitionedTridentSpoutExecutor 实现了 ICommitterTridentSpout,这里 getCoordinator 返回的是 ITridentSpout.BatchCoordinator,getEmitter 返回的是 ICommitterTridentSpout.Emitter
ITridentSpout.BatchCoordinator
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
public class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
IOpaquePartitionedTridentSpout.Coordinator _coordinator;

public Coordinator(Map conf, TopologyContext context) {
_coordinator = _spout.getCoordinator(conf, context);
}

@Override
public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
LOG.debug(“Initialize Transaction. [txid = {}], [prevMetadata = {}], [currMetadata = {}]”, txid, prevMetadata, currMetadata);
return _coordinator.getPartitionsForBatch();
}

@Override
public void close() {
LOG.debug(“Closing”);
_coordinator.close();
LOG.debug(“Closed”);
}

@Override
public void success(long txid) {
LOG.debug(“Success [txid = {}]”, txid);
}

@Override
public boolean isReady(long txid) {
boolean ready = _coordinator.isReady(txid);
LOG.debug(“[isReady = {}], [txid = {}]”, ready, txid);
return ready;
}
}
包装了 spout 的_coordinator,它的类型 IOpaquePartitionedTridentSpout.Coordinator,这里仅仅是多了 debug 日志
ICommitterTridentSpout.Emitter
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
public class Emitter implements ICommitterTridentSpout.Emitter {
IOpaquePartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
TransactionalState _state;
TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<>();
Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
int _index;
int _numTasks;

public Emitter(String txStateId, Map conf, TopologyContext context) {
_emitter = _spout.getEmitter(conf, context);
_index = context.getThisTaskIndex();
_numTasks = context.getComponentTasks(context.getThisComponentId()).size();
_state = TransactionalState.newUserState(conf, txStateId);
LOG.debug(“Created {}”, this);
}

Object _savedCoordinatorMeta = null;
boolean _changedMeta = false;

@Override
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
LOG.debug(“Emitting Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]”,
tx, coordinatorMeta, collector, this);

if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
_partitionStates.clear();
final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);
for (ISpoutPartition partition : taskPartitions) {
_partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition));
}

// refresh all partitions for backwards compatibility with old spout
_emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
_savedCoordinatorMeta = coordinatorMeta;
_changedMeta = true;
}
Map<String, Object> metas = new HashMap<>();
_cachedMetas.put(tx.getTransactionId(), metas);

Entry<Long, Map<String, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
Map<String, Object> prevCached;
if(entry!=null) {
prevCached = entry.getValue();
} else {
prevCached = new HashMap<>();
}

for(Entry<String, EmitterPartitionState> e: _partitionStates.entrySet()) {
String id = e.getKey();
EmitterPartitionState s = e.getValue();
s.rotatingState.removeState(tx.getTransactionId());
Object lastMeta = prevCached.get(id);
if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
metas.put(id, meta);
}
LOG.debug(“Emitted Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]”,
tx, coordinatorMeta, collector, this);
}

@Override
public void success(TransactionAttempt tx) {
for(EmitterPartitionState state: _partitionStates.values()) {
state.rotatingState.cleanupBefore(tx.getTransactionId());
}
LOG.debug(“Success transaction {}. [{}]”, tx, this);
}

@Override
public void commit(TransactionAttempt attempt) {
LOG.debug(“Committing transaction {}. [{}]”, attempt, this);
// this code here handles a case where a previous commit failed, and the partitions
// changed since the last commit. This clears out any state for the removed partitions
// for this txid.
// we make sure only a single task ever does this. we’re also guaranteed that
// it’s impossible for there to be another writer to the directory for that partition
// because only a single commit can be happening at once. this is because in order for
// another attempt of the batch to commit, the batch phase must have succeeded in between.
// hence, all tasks for the prior commit must have finished committing (whether successfully or not)
if(_changedMeta && _index==0) {
Set<String> validIds = new HashSet<>();
for(ISpoutPartition p: _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
validIds.add(p.getId());
}
for(String existingPartition: _state.list(“”)) {
if(!validIds.contains(existingPartition)) {
RotatingTransactionalState s = new RotatingTransactionalState(_state, existingPartition);
s.removeState(attempt.getTransactionId());
}
}
_changedMeta = false;
}

Long txid = attempt.getTransactionId();
Map<String, Object> metas = _cachedMetas.remove(txid);
for(Entry<String, Object> entry: metas.entrySet()) {
_partitionStates.get(entry.getKey()).rotatingState.overrideState(txid, entry.getValue());
}
LOG.debug(“Exiting commit method for transaction {}. [{}]”, attempt, this);
}

@Override
public void close() {
LOG.debug(“Closing”);
_emitter.close();
LOG.debug(“Closed”);
}

@Override
public String toString() {
return “Emitter{” +
“, _state=” + _state +
“, _cachedMetas=” + _cachedMetas +
“, _partitionStates=” + _partitionStates +
“, _index=” + _index +
“, _numTasks=” + _numTasks +
“, _savedCoordinatorMeta=” + _savedCoordinatorMeta +
“, _changedMeta=” + _changedMeta +
‘}’;
}
}

static class EmitterPartitionState {
public RotatingTransactionalState rotatingState;
public ISpoutPartition partition;

public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
rotatingState = s;
partition = p;
}
}

这里对 spout 的 IOpaquePartitionedTridentSpout.Emitter 进行了封装,_partitionStates 使用了 EmitterPartitionState
emitBatch 方法首先计算_partitionStates,然后计算 prevCached,最后调用_emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta)
success 方法调用 state.rotatingState.cleanupBefore(tx.getTransactionId()),清空该 txid 之前的状态信息;commit 方法主要是更新_partitionStates

KafkaTridentSpoutOpaque
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<Map<String, Object>>,
KafkaTridentSpoutTopicPartition, Map<String, Object>> {
private static final long serialVersionUID = -8003272486566259640L;

private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);

private final KafkaTridentSpoutManager<K, V> kafkaManager;

public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
this(new KafkaTridentSpoutManager<>(conf));
}

public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
this.kafkaManager = kafkaManager;
LOG.debug(“Created {}”, this.toString());
}

@Override
public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter(
Map conf, TopologyContext context) {
return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
}

@Override
public Coordinator<List<Map<String, Object>>> getCoordinator(Map conf, TopologyContext context) {
return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}

@Override
public Fields getOutputFields() {
final Fields outputFields = kafkaManager.getFields();
LOG.debug(“OutputFields = {}”, outputFields);
return outputFields;
}

@Override
public final String toString() {
return super.toString() +
“{kafkaManager=” + kafkaManager + ‘}’;
}
}
KafkaTridentSpoutOpaque 的 getCoordinator 返回的是 KafkaTridentSpoutOpaqueCoordinator;getEmitter 返回的是 KafkaTridentSpoutEmitter
KafkaTridentSpoutOpaqueCoordinator
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
Serializable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);

private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
private final KafkaTridentSpoutManager<K,V> kafkaManager;

public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) {
this.kafkaManager = kafkaManager;
LOG.debug(“Created {}”, this.toString());
}

@Override
public boolean isReady(long txid) {
LOG.debug(“isReady = true”);
return true; // the “old” trident kafka spout always returns true, like this
}

@Override
public List<Map<String, Object>> getPartitionsForBatch() {
final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions());
LOG.debug(“TopicPartitions for batch {}”, topicPartitions);
List<Map<String, Object>> tps = new ArrayList<>();
for(TopicPartition tp : topicPartitions) {
tps.add(tpSerializer.toMap(tp));
}
return tps;
}

@Override
public void close() {
LOG.debug(“Closed”); // the “old” trident kafka spout is no op like this
}

@Override
public final String toString() {
return super.toString() +
“{kafkaManager=” + kafkaManager +
‘}’;
}
}
这里的 isReady 始终返回 true,getPartitionsForBatch 方法主要是将 kafkaManager.getTopicPartitions() 信息转换为 map 结构
KafkaTridentSpoutEmitter
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
List<Map<String, Object>>,
KafkaTridentSpoutTopicPartition,
Map<String, Object>>,
Serializable {

private static final long serialVersionUID = -7343927794834130435L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);

// Kafka
private final KafkaConsumer<K, V> kafkaConsumer;

// Bookkeeping
private final KafkaTridentSpoutManager<K, V> kafkaManager;
// set of topic-partitions for which first poll has already occurred, and the first polled txid
private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>();

// Declare some KafkaTridentSpoutManager references for convenience
private final long pollTimeoutMs;
private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
private final RecordTranslator<K, V> translator;
private final Timer refreshSubscriptionTimer;
private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();

private TopologyContext topologyContext;

/**
* Create a new Kafka spout emitter.
* @param kafkaManager The Kafka consumer manager to use
* @param topologyContext The topology context
* @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription
*/
public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
this.kafkaManager = kafkaManager;
this.topologyContext = topologyContext;
this.refreshSubscriptionTimer = refreshSubscriptionTimer;
this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();

final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
LOG.debug(“Created {}”, this.toString());
}

/**
* Creates instance of this class with default 500 millisecond refresh subscription timer
*/
public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) {
this(kafkaManager, topologyContext, new Timer(500,
kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
}

//……

@Override
public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {

LOG.debug(“Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]”,
tx, currBatchPartition, lastBatch, collector);

final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
final Set<TopicPartition> assignments = kafkaConsumer.assignment();
KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();

if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
LOG.warn(“SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], ” +
“[collector = {}] because it is not part of the assignments {} of consumer instance [{}] ” +
“of consumer group [{}]”, tx, currBatchPartition, lastBatch, collector, assignments,
kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
} else {
try {
// pause other topic-partitions to only poll from current topic-partition
pausedTopicPartitions = pauseTopicPartitions(currBatchTp);

seek(currBatchTp, lastBatchMeta, tx.getTransactionId());

// poll
if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
}

final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
LOG.debug(“Polled [{}] records from Kafka.”, records.count());

if (!records.isEmpty()) {
emitTuples(collector, records);
// build new metadata
currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records);
}
} finally {
kafkaConsumer.resume(pausedTopicPartitions);
LOG.trace(“Resumed topic-partitions {}”, pausedTopicPartitions);
}
LOG.debug(“Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], ” +
“[currBatchMetadata = {}], [collector = {}]”, tx, currBatchPartition, lastBatch, currentBatch, collector);
}

return currentBatch == null ? null : currentBatch.toMap();
}

private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
for (ConsumerRecord<K, V> record : records) {
final List<Object> tuple = translator.apply(record);
collector.emit(tuple);
LOG.debug(“Emitted tuple {} for record [{}]”, tuple, record);
}
}

@Override
public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
LOG.trace(“Refreshing of topic-partitions handled by Kafka. ” +
“No action taken by this method for topic partitions {}”, partitionResponsibilities);
}

/**
* Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
* for this task must be assigned to the Kafka consumer running on this task.
*
* @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
* @return ordered list of topic partitions for this task
*/
@Override
public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
List<TopicPartition> allTopicPartitions = new ArrayList<>();
for(Map<String, Object> map : allPartitionInfo) {
allTopicPartitions.add(tpSerializer.fromMap(map));
}
final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
LOG.debug(“Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] “,
allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
return allPartitions;
}

@Override
public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
List<Map<String, Object>> allPartitionInfo) {
final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
LOG.debug(“Consumer [{}], running on task with index [{}], has assigned topic-partitions {}”, kafkaConsumer, taskId, assignedTps);
final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
LOG.debug(“Returning topic-partitions {} for task with index [{}]”, taskTps, taskId);
return taskTps;
}

@Override
public void close() {
kafkaConsumer.close();
LOG.debug(“Closed”);
}

@Override
public final String toString() {
return super.toString() +
“{kafkaManager=” + kafkaManager +
‘}’;
}
}

这里的 refreshSubscriptionTimer 的 interval 取的是 kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(),默认是 2000
emitPartitionBatch 方法没调用一次都会判断 refreshSubscriptionTimer.isExpiredResetOnTrue(),如果时间到了,就会调用 kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment() 刷新 assignment
emitPartitionBatch 方法主要是找到与该 batch 关联的 partition,停止从其他 parition 拉取消息,然后根据 firstPollOffsetStrategy 以及 lastBatchMeta 信息,调用 kafkaConsumer 的 seek 相关方法 seek 到指定位置
之后就是用 kafkaConsumer.poll(pollTimeoutMs) 拉取数据,然后 emitTuples;emitTuples 方法会是用 translator 转换数据,然后调用 collector.emit 发射出去
refreshPartitions 方法目前仅仅是 trace 下日志;getOrderedPartitions 方法先将 allPartitionInfo 的数据从 map 结构反序列化回来,然后转换为 KafkaTridentSpoutTopicPartition 返回;getPartitionsForTask 方法主要是通过 kafkaConsumer.assignment() 的信息转换为 KafkaTridentSpoutTopicPartition 返回

小结

storm-kafka-client 提供了 KafkaTridentSpoutOpaque 这个 spout 作为 trident 的 kafka spout(旧版的为 OpaqueTridentKafkaSpout,在 storm-kafka 类库中),它实现了 IOpaquePartitionedTridentSpout 接口
TridentTopology.newStream 方法,对于 IOpaquePartitionedTridentSpout 类型的 spout 会使用 OpaquePartitionedTridentSpoutExecutor 来包装;TridentTopologyBuilder.buildTopology 会将 IOpaquePartitionedTridentSpout(OpaquePartitionedTridentSpoutExecutor) 先使用 TridentSpoutExecutor 包装,然后再使用 TridentBoltExecutor 包装为 bolt
OpaquePartitionedTridentSpoutExecutor 的 getCoordinator 返回的是 ITridentSpout.BatchCoordinator,getEmitter 返回的是 ICommitterTridentSpout.Emitter;他们分别对 KafkaTridentSpoutOpaque 这个原始 spout 返回的 KafkaTridentSpoutOpaqueCoordinator 以及 KafkaTridentSpoutEmitter 进行包装再处理;其中对 coordinator 加了 debug 日志,对 emitter 则主要多了对 EmitterPartitionState 的存取

doc
Storm Kafka Integration (0.10.x+)

正文完
 0