序
本文主要研究一下 flink 的 ScheduledExecutor
Executor
java.base/java/util/concurrent/Executor.java
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
jdk 的 Executor 接口定义了 execute 方法,接收参数类型为 Runnable
ScheduledExecutor
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
public interface ScheduledExecutor extends Executor {
/**
* Executes the given command after the given delay.
*
* @param command the task to execute in the future
* @param delay the time from now to delay the execution
* @param unit the time unit of the delay parameter
* @return a ScheduledFuture representing the completion of the scheduled task
*/
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
* Executes the given callable after the given delay. The result of the callable is returned
* as a {@link ScheduledFuture}.
*
* @param callable the callable to execute
* @param delay the time from now to delay the execution
* @param unit the time unit of the delay parameter
* @param <V> result type of the callable
* @return a ScheduledFuture which holds the future value of the given callable
*/
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/**
* Executes the given command periodically. The first execution is started after the
* {@code initialDelay}, the second execution is started after {@code initialDelay + period},
* the third after {@code initialDelay + 2*period} and so on.
* The task is executed until either an execution fails, or the returned {@link ScheduledFuture}
* is cancelled.
*
* @param command the task to be executed periodically
* @param initialDelay the time from now until the first execution is triggered
* @param period the time after which the next execution is triggered
* @param unit the time unit of the delay and period parameter
* @return a ScheduledFuture representing the periodic task. This future never completes
* unless an execution of the given task fails or if the future is cancelled
*/
ScheduledFuture<?> scheduleAtFixedRate(
Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* Executed the given command repeatedly with the given delay between the end of an execution
* and the start of the next execution.
* The task is executed repeatedly until either an exception occurs or if the returned
* {@link ScheduledFuture} is cancelled.
*
* @param command the task to execute repeatedly
* @param initialDelay the time from now until the first execution is triggered
* @param delay the time between the end of the current and the start of the next execution
* @param unit the time unit of the initial delay and the delay parameter
* @return a ScheduledFuture representing the repeatedly executed task. This future never
* completes unless the execution of the given task fails or if the future is cancelled
*/
ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
ScheduledExecutor 接口继承了 Executor,它定义了 schedule、scheduleAtFixedRate、scheduleWithFixedDelay 方法,其中 schedule 方法可以接收 Runnable 或者 Callable,这些方法返回的都是 ScheduledFuture;该接口有两个实现类,分别是 ScheduledExecutorServiceAdapter 及 ActorSystemScheduledExecutorAdapter
ScheduledExecutorServiceAdapter
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
public class ScheduledExecutorServiceAdapter implements ScheduledExecutor {
private final ScheduledExecutorService scheduledExecutorService;
public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return scheduledExecutorService.schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return scheduledExecutorService.schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
@Override
public void execute(Runnable command) {
scheduledExecutorService.execute(command);
}
}
ScheduledExecutorServiceAdapter 实现了 ScheduledExecutor 接口,它使用的是 jdk 的 ScheduledExecutorService 来实现,使用了 scheduledExecutorService 的 schedule、scheduleAtFixedRate、
scheduleWithFixedDelay、execute 方法
ActorSystemScheduledExecutorAdapter
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java
public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {
private final ActorSystem actorSystem;
public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) {
this.actorSystem = Preconditions.checkNotNull(actorSystem, “rpcService”);
}
@Override
@Nonnull
public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L);
Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);
scheduledFutureTask.setCancellable(cancellable);
return scheduledFutureTask;
}
@Override
@Nonnull
public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L);
Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);
scheduledFutureTask.setCancellable(cancellable);
return scheduledFutureTask;
}
@Override
@Nonnull
public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
command,
triggerTime(unit.toNanos(initialDelay)),
unit.toNanos(period));
Cancellable cancellable = actorSystem.scheduler().schedule(
new FiniteDuration(initialDelay, unit),
new FiniteDuration(period, unit),
scheduledFutureTask,
actorSystem.dispatcher());
scheduledFutureTask.setCancellable(cancellable);
return scheduledFutureTask;
}
@Override
@Nonnull
public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
command,
triggerTime(unit.toNanos(initialDelay)),
unit.toNanos(-delay));
Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit);
scheduledFutureTask.setCancellable(cancellable);
return scheduledFutureTask;
}
@Override
public void execute(@Nonnull Runnable command) {
actorSystem.dispatcher().execute(command);
}
private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
return actorSystem.scheduler().scheduleOnce(
new FiniteDuration(delay, unit),
runnable,
actorSystem.dispatcher());
}
private long now() {
return System.nanoTime();
}
private long triggerTime(long delay) {
return now() + delay;
}
private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
private long time;
private final long period;
private volatile Cancellable cancellable;
ScheduledFutureTask(Callable<V> callable, long time, long period) {
super(callable);
this.time = time;
this.period = period;
}
ScheduledFutureTask(Runnable runnable, long time, long period) {
super(runnable, null);
this.time = time;
this.period = period;
}
public void setCancellable(Cancellable newCancellable) {
this.cancellable = newCancellable;
}
@Override
public void run() {
if (!isPeriodic()) {
super.run();
} else if (runAndReset()){
if (period > 0L) {
time += period;
} else {
cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS);
// check whether we have been cancelled concurrently
if (isCancelled()) {
cancellable.cancel();
} else {
time = triggerTime(-period);
}
}
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = super.cancel(mayInterruptIfRunning);
return result && cancellable.cancel();
}
@Override
public long getDelay(@Nonnull TimeUnit unit) {
return unit.convert(time – now(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(@Nonnull Delayed o) {
if (o == this) {
return 0;
}
long diff = getDelay(TimeUnit.NANOSECONDS) – o.getDelay(TimeUnit.NANOSECONDS);
return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0;
}
@Override
public boolean isPeriodic() {
return period != 0L;
}
}
}
ActorSystemScheduledExecutorAdapter 实现了 ScheduledExecutor 接口,它使用的是 actorSystem 来实现;其中 execute 方法使用的是 actorSystem.dispatcher().execute 方法
schedule 及 scheduleWithFixedDelay 方法调用的是 internalSchedule 方法,它使用的是 actorSystem.scheduler().scheduleOnce 方法,只是它们的 ScheduledFutureTask 不同,其中 schedule 方法的 ScheduledFutureTask 的 period 为 0,而 scheduleWithFixedDelay 方法的 ScheduledFutureTask 的 period 为 unit.toNanos(-delay);ScheduledFutureTask 的 run 方法会对 period 进行判断,小于等于 0 的,会再次调用 internalSchedule 方法,来实现以 FixedDelay 进行调度的效果
scheduleAtFixedRate 方法,它使用的是 actorSystem.scheduler().schedule 方法,其 ScheduledFutureTask 的 period 即为方法参数的 period,没有像 scheduleWithFixedDelay 方法那样用 unit.toNanos(-delay) 作为 period
小结
ScheduledExecutor 接口继承了 Executor,它定义了 schedule、scheduleAtFixedRate、scheduleWithFixedDelay 方法,其中 schedule 方法可以接收 Runnable 或者 Callable,这些方法返回的都是 ScheduledFuture;该接口有两个实现类,分别是 ScheduledExecutorServiceAdapter 及 ActorSystemScheduledExecutorAdapter
ScheduledExecutorServiceAdapter 实现了 ScheduledExecutor 接口,它使用的是 jdk 的 ScheduledExecutorService 来实现,使用了 scheduledExecutorService 的 schedule、scheduleAtFixedRate、scheduleWithFixedDelay、execute 方法
ActorSystemScheduledExecutorAdapter 实现了 ScheduledExecutor 接口,它使用的是 actorSystem 来实现;其中 execute 方法使用的是 actorSystem.dispatcher().execute 方法;schedule 及 scheduleWithFixedDelay 方法调用的是 internalSchedule 方法,它使用的是 actorSystem.scheduler().scheduleOnce 方法,只是它们的 ScheduledFutureTask 不同,其中 schedule 方法的 ScheduledFutureTask 的 period 为 0,而 scheduleWithFixedDelay 方法的 ScheduledFutureTask 的 period 为 unit.toNanos(-delay);ScheduledFutureTask 的 run 方法会对 period 进行判断,小于等于 0 的,会再次调用 internalSchedule 方法,来实现以 FixedDelay 进行调度的效果;scheduleAtFixedRate 方法,它使用的是 actorSystem.scheduler().schedule 方法,其 ScheduledFutureTask 的 period 即为方法参数的 period,没有像 scheduleWithFixedDelay 方法那样用 unit.toNanos(-delay) 作为 period
doc
ScheduledExecutor
ScheduledExecutorServiceAdapter
ActorSystemScheduledExecutorAdapter