乐趣区

[case45]聊聊storm-kafka-client的ProcessingGuarantee


本文主要研究一下 storm-kafka-client 的 ProcessingGuarantee
ProcessingGuarantee
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
/**
* This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
* i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
* The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
* NO_GUARANTEE may be removed in a later release without warning, we’re still evaluating whether it makes sense to keep.
*/
@InterfaceStability.Unstable
public enum ProcessingGuarantee {
/**
* An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or
* times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits synchronously on the defined
* interval.
*/
AT_LEAST_ONCE,
/**
* Every offset will be synchronously committed to Kafka right after being polled but before being emitted to the downstream
* components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by
* ensuring the spout won’t retry tuples that fail or time out after the commit to Kafka has been done
*/
AT_MOST_ONCE,
/**
* The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may
* be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the
* spout to control when commits occur. Commits asynchronously on the defined interval.
*/
NO_GUARANTEE,
}

storm-kafka-client 与旧版的 storm-kafka 不同之一就是引入了 ProcessingGuarantee,是的整个代码更为清晰
ProcessingGuarantee.AT_LEAST_ONCE 就是开启 ack 的版本,它类似 kafka client 的 auto commit,在指定 interval 定期 commit
ProcessingGuarantee.AT_MOST_ONCE,它就不管 ack 了,在 polled out 消息的时候同步 commit(忽略 interval 配置),因而该消息最多被处理一次
ProcessingGuarantee.NO_GUARANTEE,这个也是不管 ack 的,不过它跟 ProcessingGuarantee.AT_LEAST_ONCE 类似,是在指定 interval 定期 commit,不同的是它是异步提交

KafkaSpout.open
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
public class KafkaSpout<K, V> extends BaseRichSpout {

//Initial delay for the commit and subscription refresh timers
public static final long TIMER_DELAY_MS = 500;

// timer == null only if the processing guarantee is at-most-once
private transient Timer commitTimer;

// Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
// or after a consumer rebalance, or during close/deactivate. Always empty if processing guarantee is none or at-most-once.
private transient Map<TopicPartition, OffsetManager> offsetManagers;

// Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit;

//……

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;

// Spout internals
this.collector = collector;

// Offset management
firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();

// Retries management
retryService = kafkaSpoutConfig.getRetryService();

tupleListener = kafkaSpoutConfig.getTupleListener();

if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
// In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer
commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
}
refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);

offsetManagers = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = new HashMap<>();
commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());

tupleListener.open(conf, context);
if (canRegisterMetrics()) {
registerMetric();
}

LOG.info(“Kafka Spout opened with the following configuration: {}”, kafkaSpoutConfig);
}

//……

}
open 的时候判断,只要不是 ProcessingGuarantee.AT_MOST_ONCE,那么就初始化 commitTimer,period 值为 kafkaSpoutConfig.getPartitionRefreshPeriodMs(),如果没有设置,默认是 2000ms
Timer.isExpiredResetOnTrue
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/internal/Timer.java
public class Timer {
private final long delay;
private final long period;
private final TimeUnit timeUnit;
private final long periodNanos;
private long start;

//……

/**
* Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the
* case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset
* (re-initiated) and a new cycle will start.
*
* @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false
* otherwise.
*/
public boolean isExpiredResetOnTrue() {
final boolean expired = Time.nanoTime() – start >= periodNanos;
if (expired) {
start = Time.nanoTime();
}
return expired;
}
}
Timer 有一个重要的方法是 isExpiredResetOnTrue,用于判断“调度时间”是否到了,这个在 nextTuple 里头有调用到
KafkaSpout.nextTuple
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
// ======== Next Tuple =======
@Override
public void nextTuple() {
try {
if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
kafkaSpoutConfig.getSubscription().refreshAssignment();
}

if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
if (isAtLeastOnceProcessing()) {
commitOffsetsForAckedTuples(kafkaConsumer.assignment());
} else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(kafkaConsumer.assignment());
kafkaConsumer.commitAsync(offsetsToCommit, null);
LOG.debug(“Committed offsets {} to Kafka”, offsetsToCommit);
}
}

PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
if (pollablePartitionsInfo.shouldPoll()) {
try {
setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
} catch (RetriableException e) {
LOG.error(“Failed to poll from kafka.”, e);
}
}

emitIfWaitingNotEmitted();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}

nextTuple 先判断要不要刷新 subscription,然后就判断 commitTimer,判断是否应该提交 commit,这里是调用 commitTimer.isExpiredResetOnTrue()
ProcessingGuarantee 类型如果是 NO_GUARANTEE,则调用 createFetchedOffsetsMetadata 创建待提交的 offset 及 partition 信息,然后调用 kafkaConsumer.commitAsync 进行异步提交;
ProcessingGuarantee 类型如果是 AT_LEAST_ONCE,则调用 commitOffsetsForAckedTuples 进行提交
处理完 offset 提交之后,通过 getPollablePartitionsInfo 获取 PollablePartitionsInfo,如果 shouldPoll 则调用 pollKafkaBroker 拉数据,然后通过 setWaitingToEmit 方法将拉取的数据放入 waitingToEmit
最后调用 emitIfWaitingNotEmitted 方法,当有数据的时候就进行 emit 或者 retry,没有数据时通过 while 循环进行 waiting

createFetchedOffsetsMetadata
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition tp : assignedPartitions) {
offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata()));
}
return offsetsToCommit;
}
这里根据 kafkaConsumer.assignment() 的信息,通过 kafkaConsumer.position(tp) 提取下一步将要 fetch 的 offset 位置,通过 commitMetadataManager.getCommitMetadata() 提取 CommitMetadata 的 json 串作为元信息
commitOffsetsForAckedTuples
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) {
// Find offsets that are ready to be committed for every assigned topic partition
final Map<TopicPartition, OffsetManager> assignedOffsetManagers = new HashMap<>();
for (Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
if (assignedPartitions.contains(entry.getKey())) {
assignedOffsetManagers.put(entry.getKey(), entry.getValue());
}
}

final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) {
final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
if (nextCommitOffset != null) {
nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
}
}

// Commit offsets that are ready to be committed for every topic partition
if (!nextCommitOffsets.isEmpty()) {
kafkaConsumer.commitSync(nextCommitOffsets);
LOG.debug(“Offsets successfully committed to Kafka [{}]”, nextCommitOffsets);
// Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
// in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
//Update the OffsetManager for each committed partition, and update numUncommittedOffsets
final TopicPartition tp = tpOffset.getKey();
long position = kafkaConsumer.position(tp);
long committedOffset = tpOffset.getValue().offset();
if (position < committedOffset) {
/*
* The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, lots of (more
* than max.poll.records) later messages were acked, and the failed message then gets acked. The consumer may only be
* part way through “catching up” to where it was when it went back to retry the failed tuple. Skip the consumer forward
* to the committed offset and drop the current waiting to emit list, since it’ll likely contain committed offsets.
*/
LOG.debug(“Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]”,
position, committedOffset);
kafkaConsumer.seek(tp, committedOffset);
List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp);
if (waitingToEmitForTp != null) {
//Discard the pending records that are already committed
List<ConsumerRecord<K, V>> filteredRecords = new ArrayList<>();
for (ConsumerRecord<K, V> record : waitingToEmitForTp) {
if (record.offset() >= committedOffset) {
filteredRecords.add(record);
}
}
waitingToEmit.put(tp, filteredRecords);
}
}

final OffsetManager offsetManager = assignedOffsetManagers.get(tp);
offsetManager.commit(tpOffset.getValue());
LOG.debug(“[{}] uncommitted offsets for partition [{}] after commit”, offsetManager.getNumUncommittedOffsets(), tp);
}
} else {
LOG.trace(“No offsets to commit. {}”, this);
}
}

这里首先通过 offsetManagers,获取已经 ack 的等待 commit 的 partition 以及 msgId 信息,如果是 ProcessingGuarantee.AT_MOST_ONCE 则该集合为空
之后根据 CommitMetadata 通过 OffsetManager.findNextCommitOffset 获取这一批待 commit 的消息的 offset
然后调用 kafkaConsumer.commitSync 同步提交 offset,之后更新本地的 OffsetManager 的 committed 相关信息

getPollablePartitionsInfo
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
private PollablePartitionsInfo getPollablePartitionsInfo() {
if (isWaitingToEmit()) {
LOG.debug(“Not polling. Tuples waiting to be emitted.”);
return new PollablePartitionsInfo(Collections.<TopicPartition>emptySet(), Collections.<TopicPartition, Long>emptyMap());
}

Set<TopicPartition> assignment = kafkaConsumer.assignment();
if (!isAtLeastOnceProcessing()) {
return new PollablePartitionsInfo(assignment, Collections.<TopicPartition, Long>emptyMap());
}

Map<TopicPartition, Long> earliestRetriableOffsets = retryService.earliestRetriableOffsets();
Set<TopicPartition> pollablePartitions = new HashSet<>();
final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
for (TopicPartition tp : assignment) {
OffsetManager offsetManager = offsetManagers.get(tp);
int numUncommittedOffsets = offsetManager.getNumUncommittedOffsets();
if (numUncommittedOffsets < maxUncommittedOffsets) {
//Allow poll if the partition is not at the maxUncommittedOffsets limit
pollablePartitions.add(tp);
} else {
long offsetAtLimit = offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets);
Long earliestRetriableOffset = earliestRetriableOffsets.get(tp);
if (earliestRetriableOffset != null && earliestRetriableOffset <= offsetAtLimit) {
//Allow poll if there are retriable tuples within the maxUncommittedOffsets limit
pollablePartitions.add(tp);
} else {
LOG.debug(“Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. “, tp,
numUncommittedOffsets, maxUncommittedOffsets);
}
}
}
return new PollablePartitionsInfo(pollablePartitions, earliestRetriableOffsets);
}

这里对于不是 ProcessingGuarantee.AT_LEAST_ONCE 类型的,则直接根据 kafkaConsumer.assignment() 信息返回
如果是 ProcessingGuarantee.AT_LEAST_ONCE 类型类型的,这里会获取 retryService.earliestRetriableOffsets(),把 fail 相关的 offset 信息整合进去
这里有一个 maxUncommittedOffsets 参数,在 numUncommittedOffsets<maxUncommittedOffsets 时会进行重试,如果大于等于 maxUncommittedOffsets,则会进一步判断,如果是 earliestRetriableOffset 小于等于 offsetAtLimit,那么也加入重试

pollKafkaBroker
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
// ======== poll =========
private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
Set<TopicPartition> pausedPartitions = new HashSet<>(kafkaConsumer.assignment());
Iterator<TopicPartition> pausedIter = pausedPartitions.iterator();
while (pausedIter.hasNext()) {
if (pollablePartitionsInfo.pollablePartitions.contains(pausedIter.next())) {
pausedIter.remove();
}
}
try {
kafkaConsumer.pause(pausedPartitions);
final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug(“Polled [{}] records from Kafka”,
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(kafkaConsumer.assignment());
kafkaConsumer.commitSync(offsetsToCommit);
LOG.debug(“Committed offsets {} to Kafka”, offsetsToCommit);
}
return consumerRecords;
} finally {
kafkaConsumer.resume(pausedPartitions);
}
}

private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEarliestRetriableOffsets) {
for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) {
//Seek directly to the earliest retriable message for each retriable topic partition
kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
}
}

private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earliestRetriableOffsets,
ConsumerRecords<K, V> consumerRecords) {
for (Entry<TopicPartition, Long> entry : earliestRetriableOffsets.entrySet()) {
TopicPartition tp = entry.getKey();
List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
if (!records.isEmpty()) {
ConsumerRecord<K, V> record = records.get(0);
long seekOffset = entry.getValue();
long earliestReceivedOffset = record.offset();
if (seekOffset < earliestReceivedOffset) {
//Since we asked for tuples starting at seekOffset, some retriable records must have been compacted away.
//Ack up to the first offset received if the record is not already acked or currently in the topology
for (long i = seekOffset; i < earliestReceivedOffset; i++) {
KafkaSpoutMessageId msgId = retryService.getMessageId(new ConsumerRecord<>(tp.topic(), tp.partition(), i, null, null));
if (!offsetManagers.get(tp).contains(msgId) && !emitted.contains(msgId)) {
LOG.debug(“Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked”, i, tp);
retryService.remove(msgId);
emitted.add(msgId);
ack(msgId);
}
}
}
}
}
}

如果 PollablePartitionsInfo 的 pollablePartitions 不为空,则会调用 pollKafkaBroker 拉取消息
首先调用了 doSeekRetriableTopicPartitions,根据要重试的 partition 及 offset 信息,进行 seek 操作,对每个 parition 移动到要重试的最早的 offset 位置
拉取消息的时候,先 pause 不符合 maxUncommitted 等条件的 paritions,然后进行 poll 消息,poll 拉取消息之后判断如果是 ProcessingGuarantee.AT_MOST_ONCE 类型的,则调用 kafkaConsumer.commitSync 同步提交,然后返回拉取的记录 (最后设置到 waitingToEmit),最后再 resume 之前 pause 的 partitions(通过这样避免拉取不符合提交条件的 partitions 的消息);
注意这里的 pollablePartitionsInfo 是根据 getPollablePartitionsInfo() 获取的,它是遍历 kafkaConsumer.assignment() 根据 offsetManager 及 maxUncommittedOffsets 等相关参数进行过滤,因此可以认为 pollablePartitionsInfo.pollablePartitions 是 kafkaConsumer.assignment() 的子集,而 pausedPartitions 是根据 kafkaConsumer.assignment() 过滤掉 pollablePartitionsInfo.pollablePartitions 得来的,因而 pausedPartitions 就是 getPollablePartitionsInfo() 中不满足条件被剔除的 partitions,针对这些 partitions,先 pause 再调用 poll,最后再 resume,也就是此次 poll 不会从 pausedPartitions 拉取消息
在 poll 消息之后还有一个动作就是调用 ackRetriableOffsetsIfCompactedAway,针对已经 compacted 的消息进行 ack 处理

emitIfWaitingNotEmitted
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
private void emitIfWaitingNotEmitted() {
Iterator<List<ConsumerRecord<K, V>>> waitingToEmitIter = waitingToEmit.values().iterator();
outerLoop:
while (waitingToEmitIter.hasNext()) {
List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmitIter.next();
while (!waitingToEmitForTp.isEmpty()) {
final boolean emittedTuple = emitOrRetryTuple(waitingToEmitForTp.remove(0));
if (emittedTuple) {
break outerLoop;
}
}
waitingToEmitIter.remove();
}
}
emitIfWaitingNotEmitted 主要是判断 waitingToEmit 有无数据,有则取出来触发 emitOrRetryTuple,没有则不断循环进行 waiting
emitOrRetryTuple
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
/**
* Creates a tuple from the kafka record and emits it if it was never emitted or it is ready to be retried.
*
* @param record to be emitted
* @return true if tuple was emitted. False if tuple has been acked or has been emitted and is pending ack or fail
*/
private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
final KafkaSpoutMessageId msgId = retryService.getMessageId(record);

if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {// has been acked
LOG.trace(“Tuple for record [{}] has already been acked. Skipping”, record);
} else if (emitted.contains(msgId)) {// has been emitted and it is pending ack or fail
LOG.trace(“Tuple for record [{}] has already been emitted. Skipping”, record);
} else {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
if (isAtLeastOnceProcessing()
&& committedOffset != null
&& committedOffset.offset() > record.offset()
&& commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
// Ensures that after a topology with this id is started, the consumer fetch
// position never falls behind the committed offset (STORM-2844)
throw new IllegalStateException(“Attempting to emit a message that has already been committed.”
+ ” This should never occur when using the at-least-once processing guarantee.”);
}

final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
if (isEmitTuple(tuple)) {
final boolean isScheduled = retryService.isScheduled(msgId);
// not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried
if (!isScheduled || retryService.isReady(msgId)) {
final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;

if (!isAtLeastOnceProcessing()) {
if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
collector.emit(stream, tuple, msgId);
LOG.trace(“Emitted tuple [{}] for record [{}] with msgId [{}]”, tuple, record, msgId);
} else {
collector.emit(stream, tuple);
LOG.trace(“Emitted tuple [{}] for record [{}]”, tuple, record);
}
} else {
emitted.add(msgId);
offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
if (isScheduled) {// Was scheduled for retry and re-emitted, so remove from schedule.
retryService.remove(msgId);
}
collector.emit(stream, tuple, msgId);
tupleListener.onEmit(tuple, msgId);
LOG.trace(“Emitted tuple [{}] for record [{}] with msgId [{}]”, tuple, record, msgId);
}
return true;
}
} else {
/*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately
* to allow its offset to be commited to Kafka*/
LOG.debug(“Not emitting null tuple for record [{}] as defined in configuration.”, record);
if (isAtLeastOnceProcessing()) {
msgId.setNullTuple(true);
offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
ack(msgId);
}
}
}
return false;
}

emitOrRetryTuple 是整个 nextTuple 的核心,这里包含了 emit 操作以及 retry 操作
由于针对 fail 的消息,是使用 seek 方法进行重新拉取的,因而这里要使用 offsetManagers(已经 acked 等待 commit) 以及 emitted(已经 emit 等待 ack) 进行去重判断,如果这两者都不包含,才进行 emit 或者 retry
进行 emit 处理时,先通过 retryService.isScheduled(msgId) 判断是否是失败重试的,如果不是失败重试的,或者是失败重试的且已经到期了,那么就是进行下面的 emit 处理
针对 ProcessingGuarantee.AT_LEAST_ONCE 类型的,这里要维护 emitted 以及 offsetManagers,然后进行 emit 操作,回调 tupleListener.onEmit(tuple, msgId) 方法;如果不是 ProcessingGuarantee.AT_LEAST_ONCE 类型的,则仅仅是进行 collector.emit 操作

KafkaSpout.ack
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
// ======== Ack =======
@Override
public void ack(Object messageId) {
if (!isAtLeastOnceProcessing()) {
return;
}

// Only need to keep track of acked tuples if commits to Kafka are controlled by
// tuple acks, which happens only for at-least-once processing semantics
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;

if (msgId.isNullTuple()) {
//a null tuple should be added to the ack list since by definition is a direct ack
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
LOG.debug(“Received direct ack for message [{}], associated with null tuple”, msgId);
tupleListener.onAck(msgId);
return;
}

if (!emitted.contains(msgId)) {
LOG.debug(“Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that ”
+ “came from a topic-partition that this consumer group instance is no longer tracking ”
+ “due to rebalance/partition reassignment. No action taken.”, msgId);
} else {
Validate.isTrue(!retryService.isScheduled(msgId), “The message id ” + msgId + ” is queued for retry while being acked.”
+ ” This should never occur barring errors in the RetryService implementation or the spout code.”);
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
emitted.remove(msgId);
}
tupleListener.onAck(msgId);
}

ack 的时候,如果不是 ProcessingGuarantee.AT_LEAST_ONCE 类型,就立马返回
之后将已经 acked 的 msgId 放入到 offsetManagers 这个 map 中,等待在 nextTuple 中进行 commit,然后将其从 emitted 中移除
这里有一个 emitted 的去重判断,如果不是之前 emit 过的就不处理,这种通常是 rebalance/partition reassignment 引起的

KafkaSpout.fail
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
// ======== Fail =======
@Override
public void fail(Object messageId) {
if (!isAtLeastOnceProcessing()) {
return;
}
// Only need to keep track of failed tuples if commits to Kafka are controlled by
// tuple acks, which happens only for at-least-once processing semantics
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!emitted.contains(msgId)) {
LOG.debug(“Received fail for tuple this spout is no longer tracking.”
+ ” Partitions may have been reassigned. Ignoring message [{}]”, msgId);
return;
}
Validate.isTrue(!retryService.isScheduled(msgId), “The message id ” + msgId + ” is queued for retry while being failed.”
+ ” This should never occur barring errors in the RetryService implementation or the spout code.”);

msgId.incrementNumFails();

if (!retryService.schedule(msgId)) {
LOG.debug(“Reached maximum number of retries. Message [{}] being marked as acked.”, msgId);
// this tuple should be removed from emitted only inside the ack() method. This is to ensure
// that the OffsetManager for that TopicPartition is updated and allows commit progression
tupleListener.onMaxRetryReached(msgId);
ack(msgId);
} else {
tupleListener.onRetry(msgId);
emitted.remove(msgId);
}
}

fail 的时候也先判断,如果不是 ProcessingGuarantee.AT_LEAST_ONCE 类型,就立马返回
然后判断 emitted 中是否存在,如果不存在,则立刻返回,这通常是 partition reassigned 引起的
fail 的时候,调用 retryService.schedule(msgId),如果不成功,则触发 tupleListener.onMaxRetryReached,然后进行 ack;如果成功则调用 tupleListener.onRetry 回调,然后从 emitted 中删除

KafkaSpoutRetryExponentialBackoff.schedule
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();

//This class assumes that there is at most one retry schedule per message id in this set at a time.
private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);

/**
* Comparator ordering by timestamp
*/
private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
@Override
public int compare(RetrySchedule entry1, RetrySchedule entry2) {
int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());

if(result == 0) {
//TreeSet uses compareTo instead of equals() for the Set contract
//Ensure that we can save two retry schedules with the same timestamp
result = entry1.hashCode() – entry2.hashCode();
}
return result;
}
}

@Override
public boolean schedule(KafkaSpoutMessageId msgId) {
if (msgId.numFails() > maxRetries) {
LOG.debug(“Not scheduling [{}] because reached maximum number of retries [{}].”, msgId, maxRetries);
return false;
} else {
//Remove existing schedule for the message id
remove(msgId);
final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
retrySchedules.add(retrySchedule);
toRetryMsgs.add(msgId);
LOG.debug(“Scheduled. {}”, retrySchedule);
LOG.trace(“Current state {}”, retrySchedules);
return true;
}
}

@Override
public Map<TopicPartition, Long> earliestRetriableOffsets() {
final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
final KafkaSpoutMessageId msgId = retrySchedule.msgId;
final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage);
if(currentLowestOffset != null) {
tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset()));
} else {
tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset());
}
} else {
break; // Stop searching as soon as passed current time
}
}
LOG.debug(“Topic partitions with entries ready to be retried [{}] “, tpToEarliestRetriableOffset);
return tpToEarliestRetriableOffset;
}

@Override
public boolean isReady(KafkaSpoutMessageId msgId) {
boolean retry = false;
if (isScheduled(msgId)) {
final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
if (retrySchedule.msgId.equals(msgId)) {
retry = true;
LOG.debug(“Found entry to retry {}”, retrySchedule);
break; //Stop searching if the message is known to be ready for retry
}
} else {
LOG.debug(“Entry to retry not found {}”, retrySchedule);
break; // Stop searching as soon as passed current time
}
}
}
return retry;
}

schedule 首先判断失败次数是否超过 maxRetries,如果超过了则返回 false,表示不再调度了,之后 KafkaSpout 在 fail 方法回调 tupleListener.onMaxRetryReached 方法,然后进行 ack,表示不再处理了
没有超过 maxRetries 的话,则创建 retrySchedule 信息,然后添加到 retrySchedules 中;retrySchedules 是一个 TreeSet,默认使用 RetryEntryTimeStampComparator,根据 nextRetryTimeNanos 进行排序,如果相等则按 hashCode 进行排序
earliestRetriableOffsets 以及 isReady 都会用到 retrySchedules 的信息

小结

storm-kafka-client 主要针对 kafka0.10 及以上版本,它引入了 ProcessingGuarantee 枚举,该枚举有三个值,分别是

ProcessingGuarantee.AT_LEAST_ONCE 就是开启 ack 的版本,它类似 kafka client 的 auto commit,在指定 interval 定期 commit;它会维护已经 emitted(已经 emitted 但尚未 ack),offsetManagers(已经 ack 但尚未 commit) 以及 fail 需要重试的 retrySchedules
ProcessingGuarantee.AT_MOST_ONCE,它就不管 ack 了,在 polled out 消息的时候同步 commit(忽略 interval 配置),因而该消息最多被处理一次
ProcessingGuarantee.NO_GUARANTEE,这个也是不管 ack 的,不过它跟 ProcessingGuarantee.AT_LEAST_ONCE 类似,是在指定 interval 定期 commit(都依赖 commitTimer),不同的是它是异步

ProcessingGuarantee.AT_LEAST_ONCE 它结合了 storm 的 ack 机制,在 spout 的 ack 方法维护 emitted(已经 emitted 但尚未 ack);在 fail 方法将 msgId 放入到 retryService 进行重试 (这个是 ProcessingGuarantee.NO_GUARANTEE 所没有的);它跟 ProcessingGuarantee.NO_GUARANTEE 一样是依赖 commitTimer,在 initerval 期间提交 offset 信息,不同的是它是 commitSync,即同步提交,而且提交的是已经 acked 的消息;而 ProcessingGuarantee.NO_GUARANTEE 是异步提交,而且提交的是 offset 是不管是否在 storm spout 已经 ack,而是以 consumer 的 poll 为准的
ProcessingGuarantee.AT_MOST_ONCE 是在 pollKafkaBroker 方法里头,在调用完 kafkaConsumer.poll 之后,调用 kafkaConsumer.commitSync 进行同步提交 commit;它是同步提交,而且不依赖 commitTimer,即不是 interval 提交 offset
ProcessingGuarantee.NO_GUARANTEE 在 nextTuple 中判断需要 commit 的时候,调用 kafkaConsumer.commitAsync 进行异步提交,它跟 ProcessingGuarantee.AT_LEAST_ONCE 一样,都依赖 commitTimer,在 initerval 期间提交 offset,但是它是异步提交,而 ProcessingGuarantee.AT_LEAST_ONCE 是同步提交
nextTuple() 方法会 pollKafkaBroker 会调用 kafkaConsumer.poll 方法拉取消息,然后将拉取到的消息放入 waitingToEmit,之后调用 emitIfWaitingNotEmitted 方法进行 emit 或者 waiting,如果 emit 则是调用 emitOrRetryTuple 方法;由于 pollKafkaBroker 会执行 seek 操作将 offset 移动到每个 parition 中失败的 offset 中最小的位置,从那个位置开始重新拉取消息,拉取消息调用了 kafkaConsumer.poll 方法,KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE 是在这里进行 kafkaConsumer.commitSync 同步提交 offset 的;由于包含了要重试的消息,emitOrRetryTuple 这里要根据 offsetManagers( 已经 ack 等待 commit) 以及 emitted(已经 emit 等待 ack) 进行去重判断是否需要调用 collector.emit;对于 ProcessingGuarantee.AT_LEAST_ONCE 类型,这里不仅调用 emit 方法,还需要维护 offsetManagers、emitted 及重试信息相关状态,然后回调 tupleListener.onEmit 方法;对于非 ProcessingGuarantee.AT_LEAST_ONCE 类型这里仅仅是 emit。

doc
Storm Apache Kafka integration using the kafka-client jar

退出移动版