聊聊flink的AsyncWaitOperator

29次阅读

共计 27747 个字符,预计需要花费 70 分钟才能阅读完成。


本文主要研究一下 flink 的 AsyncWaitOperator
AsyncWaitOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@Internal
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OperatorActions {
private static final long serialVersionUID = 1L;

private static final String STATE_NAME = “_async_wait_operator_state_”;

/** Capacity of the stream element queue. */
private final int capacity;

/** Output mode for this operator. */
private final AsyncDataStream.OutputMode outputMode;

/** Timeout for the async collectors. */
private final long timeout;

protected transient Object checkpointingLock;

/** {@link TypeSerializer} for inputs while making snapshots. */
private transient StreamElementSerializer<IN> inStreamElementSerializer;

/** Recovered input stream elements. */
private transient ListState<StreamElement> recoveredStreamElements;

/** Queue to store the currently in-flight stream elements into. */
private transient StreamElementQueue queue;

/** Pending stream element which could not yet added to the queue. */
private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;

private transient ExecutorService executor;

/** Emitter for the completed stream element queue entries. */
private transient Emitter<OUT> emitter;

/** Thread running the emitter. */
private transient Thread emitterThread;

public AsyncWaitOperator(
AsyncFunction<IN, OUT> asyncFunction,
long timeout,
int capacity,
AsyncDataStream.OutputMode outputMode) {
super(asyncFunction);
chainingStrategy = ChainingStrategy.ALWAYS;

Preconditions.checkArgument(capacity > 0, “The number of concurrent async operation should be greater than 0.”);
this.capacity = capacity;

this.outputMode = Preconditions.checkNotNull(outputMode, “outputMode”);

this.timeout = timeout;
}

@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);

this.checkpointingLock = getContainingTask().getCheckpointLock();

this.inStreamElementSerializer = new StreamElementSerializer<>(
getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));

// create the operators executor for the complete operations of the queue entries
this.executor = Executors.newSingleThreadExecutor();

switch (outputMode) {
case ORDERED:
queue = new OrderedStreamElementQueue(
capacity,
executor,
this);
break;
case UNORDERED:
queue = new UnorderedStreamElementQueue(
capacity,
executor,
this);
break;
default:
throw new IllegalStateException(“Unknown async mode: ” + outputMode + ‘.’);
}
}

@Override
public void open() throws Exception {
super.open();

// create the emitter
this.emitter = new Emitter<>(checkpointingLock, output, queue, this);

// start the emitter thread
this.emitterThread = new Thread(emitter, “AsyncIO-Emitter-Thread (” + getOperatorName() + ‘)’);
emitterThread.setDaemon(true);
emitterThread.start();

// process stream elements from state, since the Emit thread will start as soon as all
// elements from previous state are in the StreamElementQueue, we have to make sure that the
// order to open all operators in the operator chain proceeds from the tail operator to the
// head operator.
if (recoveredStreamElements != null) {
for (StreamElement element : recoveredStreamElements.get()) {
if (element.isRecord()) {
processElement(element.<IN>asRecord());
}
else if (element.isWatermark()) {
processWatermark(element.asWatermark());
}
else if (element.isLatencyMarker()) {
processLatencyMarker(element.asLatencyMarker());
}
else {
throw new IllegalStateException(“Unknown record type ” + element.getClass() +
” encountered while opening the operator.”);
}
}
recoveredStreamElements = null;
}

}

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);

if (timeout > 0L) {
// register a timeout for this AsyncStreamRecordBufferEntry
long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();

final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
timeoutTimestamp,
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
}
});

// Cancel the timer once we’ve completed the stream record buffer entry. This will remove
// the register trigger task
streamRecordBufferEntry.onComplete(
(StreamElementQueueEntry<Collection<OUT>> value) -> {
timerFuture.cancel(true);
},
executor);
}

addAsyncBufferEntry(streamRecordBufferEntry);

userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}

@Override
public void processWatermark(Watermark mark) throws Exception {
WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark);

addAsyncBufferEntry(watermarkBufferEntry);
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);

ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();

Collection<StreamElementQueueEntry<?>> values = queue.values();

try {
for (StreamElementQueueEntry<?> value : values) {
partitionableState.add(value.getStreamElement());
}

// add the pending stream element queue entry if the stream element queue is currently full
if (pendingStreamElementQueueEntry != null) {
partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
} catch (Exception e) {
partitionableState.clear();

throw new Exception(“Could not add stream element queue entries to operator state ” +
“backend of operator ” + getOperatorName() + ‘.’, e);
}
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
recoveredStreamElements = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));

}

@Override
public void close() throws Exception {
try {
assert(Thread.holdsLock(checkpointingLock));

while (!queue.isEmpty()) {
// wait for the emitter thread to output the remaining elements
// for that he needs the checkpointing lock and thus we have to free it
checkpointingLock.wait();
}
}
finally {
Exception exception = null;

try {
super.close();
} catch (InterruptedException interrupted) {
exception = interrupted;

Thread.currentThread().interrupt();
} catch (Exception e) {
exception = e;
}

try {
// terminate the emitter, the emitter thread and the executor
stopResources(true);
} catch (InterruptedException interrupted) {
exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);

Thread.currentThread().interrupt();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

if (exception != null) {
LOG.warn(“Errors occurred while closing the AsyncWaitOperator.”, exception);
}
}
}

@Override
public void dispose() throws Exception {
Exception exception = null;

try {
super.dispose();
} catch (InterruptedException interrupted) {
exception = interrupted;

Thread.currentThread().interrupt();
} catch (Exception e) {
exception = e;
}

try {
stopResources(false);
} catch (InterruptedException interrupted) {
exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);

Thread.currentThread().interrupt();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

if (exception != null) {
throw exception;
}
}

private void stopResources(boolean waitForShutdown) throws InterruptedException {
emitter.stop();
emitterThread.interrupt();

executor.shutdown();

if (waitForShutdown) {
try {
if (!executor.awaitTermination(365L, TimeUnit.DAYS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();

Thread.currentThread().interrupt();
}

/*
* FLINK-5638: If we have the checkpoint lock we might have to free it for a while so
* that the emitter thread can complete/react to the interrupt signal.
*/
if (Thread.holdsLock(checkpointingLock)) {
while (emitterThread.isAlive()) {
checkpointingLock.wait(100L);
}
}

emitterThread.join();
} else {
executor.shutdownNow();
}
}

private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));

pendingStreamElementQueueEntry = streamElementQueueEntry;

while (!queue.tryPut(streamElementQueueEntry)) {
// we wait for the emitter to notify us if the queue has space left again
checkpointingLock.wait();
}

pendingStreamElementQueueEntry = null;
}

@Override
public void failOperator(Throwable throwable) {
getContainingTask().getEnvironment().failExternally(throwable);
}
}

AsyncWaitOperator 继承了 AbstractUdfStreamOperator,覆盖了 AbstractUdfStreamOperator 的 setup、open、initializeState、close、dispose 方法;实现了 OneInputStreamOperator 接口定义的 processElement、processWatermark、processLatencyMarker 方法;实现了 OperatorActions 定义的 failOperator 方法
setup 方法使用 Executors.newSingleThreadExecutor() 创建了 ExecutorService,之后根据不同的 outputMode 创建不同的 StreamElementQueue(OrderedStreamElementQueue 或者 UnorderedStreamElementQueue);open 方法使用 Emitter 创建并启动 AsyncIO-Emitter-Thread,另外就是处理 recoveredStreamElements,根据不同的类型分别调用 processElement、processWatermark、processLatencyMarker 方法
processElement 方法首先根据 timeout 注册一个 timer,在 ProcessingTimeCallback 的 onProcessingTime 方法里头执行 userFunction.timeout,之后将 StreamRecordQueueEntry 添加到 StreamElementQueue 中,最后触发 userFunction.asyncInvoke;close 和 dispose 方法会调用 stopResources 方法来关闭资源,不同的是 waitForShutdown 参数传值不同,close 方法传 true,而 dispose 方法传 false

Emitter
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/Emitter.java
@Internal
public class Emitter<OUT> implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);

/** Lock to hold before outputting. */
private final Object checkpointLock;

/** Output for the watermark elements. */
private final Output<StreamRecord<OUT>> output;

/** Queue to consume the async results from. */
private final StreamElementQueue streamElementQueue;

private final OperatorActions operatorActions;

/** Output for stream records. */
private final TimestampedCollector<OUT> timestampedCollector;

private volatile boolean running;

public Emitter(
final Object checkpointLock,
final Output<StreamRecord<OUT>> output,
final StreamElementQueue streamElementQueue,
final OperatorActions operatorActions) {

this.checkpointLock = Preconditions.checkNotNull(checkpointLock, “checkpointLock”);
this.output = Preconditions.checkNotNull(output, “output”);
this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, “streamElementQueue”);
this.operatorActions = Preconditions.checkNotNull(operatorActions, “operatorActions”);

this.timestampedCollector = new TimestampedCollector<>(this.output);
this.running = true;
}

@Override
public void run() {
try {
while (running) {
LOG.debug(“Wait for next completed async stream element result.”);
AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();

output(streamElementEntry);
}
} catch (InterruptedException e) {
if (running) {
operatorActions.failOperator(e);
} else {
// Thread got interrupted which means that it should shut down
LOG.debug(“Emitter thread got interrupted, shutting down.”);
}
} catch (Throwable t) {
operatorActions.failOperator(new Exception(“AsyncWaitOperator’s emitter caught an ” +
“unexpected throwable.”, t));
}
}

private void output(AsyncResult asyncResult) throws InterruptedException {
if (asyncResult.isWatermark()) {
synchronized (checkpointLock) {
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

LOG.debug(“Output async watermark.”);
output.emitWatermark(asyncWatermarkResult.getWatermark());

// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
}
} else {
AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();

if (streamRecordResult.hasTimestamp()) {
timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
} else {
timestampedCollector.eraseTimestamp();
}

synchronized (checkpointLock) {
LOG.debug(“Output async stream element collection result.”);

try {
Collection<OUT> resultCollection = streamRecordResult.get();

if (resultCollection != null) {
for (OUT result : resultCollection) {
timestampedCollector.collect(result);
}
}
} catch (Exception e) {
operatorActions.failOperator(
new Exception(“An async function call terminated with an exception. ” +
“Failing the AsyncWaitOperator.”, e));
}

// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
}
}
}

public void stop() {
running = false;
}
}

Emitter 实现了 Runnable 接口,它主要负责从 StreamElementQueue 取出 element,然后输出到 TimestampedCollector
Emitter 的 run 方法就是不断循环调用 streamElementQueue.peekBlockingly() 阻塞获取 AsyncResult,获取到之后就调用 output 方法将 result 输出出去
Emitter 的 output 方法根据 asyncResult 是否是 watermark 做不同处理,不是 watermark 的话,就会将 result 通过 timestampedCollector.collect 输出,如果出现异常则调用 operatorActions.failOperator 传递异常,最后调用 streamElementQueue.poll() 来移除队首的元素

StreamElementQueue
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
@Internal
public interface StreamElementQueue {

<T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;

<T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;

AsyncResult peekBlockingly() throws InterruptedException;

AsyncResult poll() throws InterruptedException;

Collection<StreamElementQueueEntry<?>> values() throws InterruptedException;

boolean isEmpty();

int size();
}
StreamElementQueue 接口主要定义了 AsyncWaitOperator 所要用的 blocking stream element queue 的接口;它定义了 put、tryPut、peekBlockingly、poll、values、isEmpty、size 方法;StreamElementQueue 接口有两个子类分别是 UnorderedStreamElementQueue 及 OrderedStreamElementQueue;队列元素类型为 StreamElementQueueEntry
UnorderedStreamElementQueue
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@Internal
public class UnorderedStreamElementQueue implements StreamElementQueue {

private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);

/** Capacity of this queue. */
private final int capacity;

/** Executor to run the onComplete callbacks. */
private final Executor executor;

/** OperatorActions to signal the owning operator a failure. */
private final OperatorActions operatorActions;

/** Queue of uncompleted stream element queue entries segmented by watermarks. */
private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;

/** Queue of completed stream element queue entries. */
private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;

/** First (chronologically oldest) uncompleted set of stream element queue entries. */
private Set<StreamElementQueueEntry<?>> firstSet;

// Last (chronologically youngest) uncompleted set of stream element queue entries. New
// stream element queue entries are inserted into this set.
private Set<StreamElementQueueEntry<?>> lastSet;
private volatile int numberEntries;

/** Locks and conditions for the blocking queue. */
private final ReentrantLock lock;
private final Condition notFull;
private final Condition hasCompletedEntries;

public UnorderedStreamElementQueue(
int capacity,
Executor executor,
OperatorActions operatorActions) {

Preconditions.checkArgument(capacity > 0, “The capacity must be larger than 0.”);
this.capacity = capacity;

this.executor = Preconditions.checkNotNull(executor, “executor”);

this.operatorActions = Preconditions.checkNotNull(operatorActions, “operatorActions”);

this.uncompletedQueue = new ArrayDeque<>(capacity);
this.completedQueue = new ArrayDeque<>(capacity);

this.firstSet = new HashSet<>(capacity);
this.lastSet = firstSet;

this.numberEntries = 0;

this.lock = new ReentrantLock();
this.notFull = lock.newCondition();
this.hasCompletedEntries = lock.newCondition();
}

@Override
public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();

try {
while (numberEntries >= capacity) {
notFull.await();
}

addEntry(streamElementQueueEntry);
} finally {
lock.unlock();
}
}

@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();

try {
if (numberEntries < capacity) {
addEntry(streamElementQueueEntry);

LOG.debug(“Put element into unordered stream element queue. New filling degree ” +
“({}/{}).”, numberEntries, capacity);

return true;
} else {
LOG.debug(“Failed to put element into unordered stream element queue because it ” +
“was full ({}/{}).”, numberEntries, capacity);

return false;
}
} finally {
lock.unlock();
}
}

@Override
public AsyncResult peekBlockingly() throws InterruptedException {
lock.lockInterruptibly();

try {
while (completedQueue.isEmpty()) {
hasCompletedEntries.await();
}

LOG.debug(“Peeked head element from unordered stream element queue with filling degree ” +
“({}/{}).”, numberEntries, capacity);

return completedQueue.peek();
} finally {
lock.unlock();
}
}

@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();

try {
while (completedQueue.isEmpty()) {
hasCompletedEntries.await();
}

numberEntries–;
notFull.signalAll();

LOG.debug(“Polled element from unordered stream element queue. New filling degree ” +
“({}/{}).”, numberEntries, capacity);

return completedQueue.poll();
} finally {
lock.unlock();
}
}

@Override
public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
lock.lockInterruptibly();

try {
StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[numberEntries];

array = completedQueue.toArray(array);

int counter = completedQueue.size();

for (StreamElementQueueEntry<?> entry: firstSet) {
array[counter] = entry;
counter++;
}

for (Set<StreamElementQueueEntry<?>> asyncBufferEntries : uncompletedQueue) {

for (StreamElementQueueEntry<?> streamElementQueueEntry : asyncBufferEntries) {
array[counter] = streamElementQueueEntry;
counter++;
}
}

return Arrays.asList(array);
} finally {
lock.unlock();
}
}

@Override
public boolean isEmpty() {
return numberEntries == 0;
}

@Override
public int size() {
return numberEntries;
}

public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();

try {
if (firstSet.remove(streamElementQueueEntry)) {
completedQueue.offer(streamElementQueueEntry);

while (firstSet.isEmpty() && firstSet != lastSet) {
firstSet = uncompletedQueue.poll();

Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();

while (it.hasNext()) {
StreamElementQueueEntry<?> bufferEntry = it.next();

if (bufferEntry.isDone()) {
completedQueue.offer(bufferEntry);
it.remove();
}
}
}

LOG.debug(“Signal unordered stream element queue has completed entries.”);
hasCompletedEntries.signalAll();
}
} finally {
lock.unlock();
}
}

private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
assert(lock.isHeldByCurrentThread());

if (streamElementQueueEntry.isWatermark()) {
lastSet = new HashSet<>(capacity);

if (firstSet.isEmpty()) {
firstSet.add(streamElementQueueEntry);
} else {
Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
watermarkSet.add(streamElementQueueEntry);
uncompletedQueue.offer(watermarkSet);
}
uncompletedQueue.offer(lastSet);
} else {
lastSet.add(streamElementQueueEntry);
}

streamElementQueueEntry.onComplete(
(StreamElementQueueEntry<T> value) -> {
try {
onCompleteHandler(value);
} catch (InterruptedException e) {
// The accept executor thread got interrupted. This is probably cause by
// the shutdown of the executor.
LOG.debug(“AsyncBufferEntry could not be properly completed because the ” +
“executor thread has been interrupted.”, e);
} catch (Throwable t) {
operatorActions.failOperator(new Exception(“Could not complete the ” +
“stream element queue entry: ” + value + ‘.’, t));
}
},
executor);

numberEntries++;
}
}

UnorderedStreamElementQueue 实现了 StreamElementQueue 接口,它 emit 结果的顺序是无序的,其内部使用了两个 ArrayDeque,一个是 uncompletedQueue,一个是 completedQueue
peekBlockingly 方法首先判断 completedQueue 是否有元素,没有的话则执行 hasCompletedEntries.await(),有则执行 completedQueue.peek();put 及 tryPut 都会调用 addEntry 方法,该方法会往 uncompletedQueue 队列新增元素,然后同时给每个 streamElementQueueEntry 的 onComplete 方法注册一个 onCompleteHandler
onCompleteHandler 方法会将执行完成的 streamElementQueueEntry 从 uncompletedQueue 移除,然后添加到 completedQueue

OrderedStreamElementQueue
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@Internal
public class OrderedStreamElementQueue implements StreamElementQueue {

private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);

/** Capacity of this queue. */
private final int capacity;

/** Executor to run the onCompletion callback. */
private final Executor executor;

/** Operator actions to signal a failure to the operator. */
private final OperatorActions operatorActions;

/** Lock and conditions for the blocking queue. */
private final ReentrantLock lock;
private final Condition notFull;
private final Condition headIsCompleted;

/** Queue for the inserted StreamElementQueueEntries. */
private final ArrayDeque<StreamElementQueueEntry<?>> queue;

public OrderedStreamElementQueue(
int capacity,
Executor executor,
OperatorActions operatorActions) {

Preconditions.checkArgument(capacity > 0, “The capacity must be larger than 0.”);
this.capacity = capacity;

this.executor = Preconditions.checkNotNull(executor, “executor”);

this.operatorActions = Preconditions.checkNotNull(operatorActions, “operatorActions”);

this.lock = new ReentrantLock(false);
this.headIsCompleted = lock.newCondition();
this.notFull = lock.newCondition();

this.queue = new ArrayDeque<>(capacity);
}

@Override
public AsyncResult peekBlockingly() throws InterruptedException {
lock.lockInterruptibly();

try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}

LOG.debug(“Peeked head element from ordered stream element queue with filling degree ” +
“({}/{}).”, queue.size(), capacity);

return queue.peek();
} finally {
lock.unlock();
}
}

@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();

try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}

notFull.signalAll();

LOG.debug(“Polled head element from ordered stream element queue. New filling degree ” +
“({}/{}).”, queue.size() – 1, capacity);

return queue.poll();
} finally {
lock.unlock();
}
}

@Override
public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
lock.lockInterruptibly();

try {
StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[queue.size()];

array = queue.toArray(array);

return Arrays.asList(array);
} finally {
lock.unlock();
}
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
}

@Override
public int size() {
return queue.size();
}

@Override
public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();

try {
while (queue.size() >= capacity) {
notFull.await();
}

addEntry(streamElementQueueEntry);
} finally {
lock.unlock();
}
}

@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();

try {
if (queue.size() < capacity) {
addEntry(streamElementQueueEntry);

LOG.debug(“Put element into ordered stream element queue. New filling degree ” +
“({}/{}).”, queue.size(), capacity);

return true;
} else {
LOG.debug(“Failed to put element into ordered stream element queue because it ” +
“was full ({}/{}).”, queue.size(), capacity);

return false;
}
} finally {
lock.unlock();
}
}

private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
assert(lock.isHeldByCurrentThread());

queue.addLast(streamElementQueueEntry);

streamElementQueueEntry.onComplete(
(StreamElementQueueEntry<T> value) -> {
try {
onCompleteHandler(value);
} catch (InterruptedException e) {
// we got interrupted. This indicates a shutdown of the executor
LOG.debug(“AsyncBufferEntry could not be properly completed because the ” +
“executor thread has been interrupted.”, e);
} catch (Throwable t) {
operatorActions.failOperator(new Exception(“Could not complete the ” +
“stream element queue entry: ” + value + ‘.’, t));
}
},
executor);
}

private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();

try {
if (!queue.isEmpty() && queue.peek().isDone()) {
LOG.debug(“Signal ordered stream element queue has completed head element.”);
headIsCompleted.signalAll();
}
} finally {
lock.unlock();
}
}
}

OrderedStreamElementQueue 实现了 StreamElementQueue 接口,它有序地 emit 结果,它内部有一个 ArrayDeque 类型的 queue
peekBlockingly 方法首先判断 queue 是否有元素而且是执行完成的,没有就执行 headIsCompleted.await(),有则执行 queue.peek();put 及 tryPut 都会调用 addEntry 方法,该方法会执行 queue.addLast(streamElementQueueEntry),然后同时给每个 streamElementQueueEntry 的 onComplete 方法注册一个 onCompleteHandler
onCompleteHandler 方法会检测执行完成的元素是否是队列的第一个元素,如果是则执行 headIsCompleted.signalAll()

AsyncResult
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
@Internal
public interface AsyncResult {

boolean isWatermark();

boolean isResultCollection();

AsyncWatermarkResult asWatermark();

<T> AsyncCollectionResult<T> asResultCollection();
}
AsyncResult 接口定义了 StreamElementQueue 的元素异步返回的结果要实现的方法,该 async result 可能是 watermark,可能是真正的结果
StreamElementQueueEntry
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@Internal
public abstract class StreamElementQueueEntry<T> implements AsyncResult {

private final StreamElement streamElement;

public StreamElementQueueEntry(StreamElement streamElement) {
this.streamElement = Preconditions.checkNotNull(streamElement);
}

public StreamElement getStreamElement() {
return streamElement;
}

public boolean isDone() {
return getFuture().isDone();
}

public void onComplete(
final Consumer<StreamElementQueueEntry<T>> completeFunction,
Executor executor) {
final StreamElementQueueEntry<T> thisReference = this;

getFuture().whenCompleteAsync(
// call the complete function for normal completion as well as exceptional completion
// see FLINK-6435
(value, throwable) -> completeFunction.accept(thisReference),
executor);
}

protected abstract CompletableFuture<T> getFuture();

@Override
public final boolean isWatermark() {
return AsyncWatermarkResult.class.isAssignableFrom(getClass());
}

@Override
public final boolean isResultCollection() {
return AsyncCollectionResult.class.isAssignableFrom(getClass());
}

@Override
public final AsyncWatermarkResult asWatermark() {
return (AsyncWatermarkResult) this;
}

@Override
public final <T> AsyncCollectionResult<T> asResultCollection() {
return (AsyncCollectionResult<T>) this;
}
}
StreamElementQueueEntry 实现了 AsyncResult 接口,它定义了 onComplete 方法用于结果完成时的回调处理,同时它还定义了抽象方法 getFuture 供子类实现;它有两个子类,分别是 WatermarkQueueEntry 及 StreamRecordQueueEntry
WatermarkQueueEntry
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
@Internal
public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {

private final CompletableFuture<Watermark> future;

public WatermarkQueueEntry(Watermark watermark) {
super(watermark);

this.future = CompletableFuture.completedFuture(watermark);
}

@Override
public Watermark getWatermark() {
return (Watermark) getStreamElement();
}

@Override
protected CompletableFuture<Watermark> getFuture() {
return future;
}
}
WatermarkQueueEntry 继承了 StreamElementQueueEntry,其元素类型为 Watermark,同时实现了 AsyncWatermarkResult 接口
StreamRecordQueueEntry
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@Internal
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {

/** Timestamp information. */
private final boolean hasTimestamp;
private final long timestamp;

/** Future containing the collection result. */
private final CompletableFuture<Collection<OUT>> resultFuture;

public StreamRecordQueueEntry(StreamRecord<?> streamRecord) {
super(streamRecord);

hasTimestamp = streamRecord.hasTimestamp();
timestamp = streamRecord.getTimestamp();

resultFuture = new CompletableFuture<>();
}

@Override
public boolean hasTimestamp() {
return hasTimestamp;
}

@Override
public long getTimestamp() {
return timestamp;
}

@Override
public Collection<OUT> get() throws Exception {
return resultFuture.get();
}

@Override
protected CompletableFuture<Collection<OUT>> getFuture() {
return resultFuture;
}

@Override
public void complete(Collection<OUT> result) {
resultFuture.complete(result);
}

@Override
public void completeExceptionally(Throwable error) {
resultFuture.completeExceptionally(error);
}
}
StreamRecordQueueEntry 继承了 StreamElementQueueEntry,同时实现了 AsyncCollectionResult、ResultFuture 接口
小结

AsyncWaitOperator 继承了 AbstractUdfStreamOperator,覆盖了 AbstractUdfStreamOperator 的 setup、open、initializeState、close、dispose 方法;实现了 OneInputStreamOperator 接口定义的 processElement、processWatermark、processLatencyMarker 方法;实现了 OperatorActions 定义的 failOperator 方法;open 方法使用 Emitter 创建并启动 AsyncIO-Emitter-Thread
Emitter 实现了 Runnable 接口,它主要负责从 StreamElementQueue 取出 element,然后输出到 TimestampedCollector;其 run 方法就是不断循环调用 streamElementQueue.peekBlockingly() 阻塞获取 AsyncResult,获取到之后就调用 output 方法将 result 输出出去
StreamElementQueue 接口主要定义了 AsyncWaitOperator 所要用的 blocking stream element queue 的接口;它定义了 put、tryPut、peekBlockingly、poll、values、isEmpty、size 方法;StreamElementQueue 接口有两个子类分别是 UnorderedStreamElementQueue 及 OrderedStreamElementQueue;队列元素类型为 StreamElementQueueEntry,StreamElementQueueEntry 实现了 AsyncResult 接口,它定义了 onComplete 方法用于结果完成时的回调处理,同时它还定义了抽象方法 getFuture 供子类实现;它有两个子类,分别是 WatermarkQueueEntry 及 StreamRecordQueueEntry

doc
AsyncWaitOperator

正文完
 0