聊聊flink的MemoryStateBackend

序本文主要研究一下flink的MemoryStateBackendStateBackendflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateBackend.java@PublicEvolvingpublic interface StateBackend extends java.io.Serializable { // ———————————————————————— // Checkpoint storage - the durable persistence of checkpoint data // ———————————————————————— /** * Resolves the given pointer to a checkpoint/savepoint into a checkpoint location. The location * supports reading the checkpoint metadata, or disposing the checkpoint storage location. * * <p>If the state backend cannot understand the format of the pointer (for example because it * was created by a different state backend) this method should throw an {@code IOException}. * * @param externalPointer The external checkpoint pointer to resolve. * @return The checkpoint location handle. * * @throws IOException Thrown, if the state backend does not understand the pointer, or if * the pointer could not be resolved due to an I/O error. / CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException; /* * Creates a storage for checkpoints for the given job. The checkpoint storage is * used to write checkpoint data and metadata. * * @param jobId The job to store checkpoint data for. * @return A checkpoint storage for the given job. * * @throws IOException Thrown if the checkpoint storage cannot be initialized. / CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException; // ———————————————————————— // Structure Backends // ———————————————————————— /* * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b> * and checkpointing it. Uses default TTL time provider. * * <p><i>Keyed State</i> is state where each value is bound to a key. * * @param <K> The type of the keys by which the state is organized. * * @return The Keyed State Backend for the given job, operator, and key group range. * * @throws Exception This method may forward all exceptions that occur while instantiating the backend. / default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws Exception { return createKeyedStateBackend( env, jobID, operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry, TtlTimeProvider.DEFAULT ); } /* * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b> * and checkpointing it. * * <p><i>Keyed State</i> is state where each value is bound to a key. * * @param <K> The type of the keys by which the state is organized. * * @return The Keyed State Backend for the given job, operator, and key group range. * * @throws Exception This method may forward all exceptions that occur while instantiating the backend. / default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider ) throws Exception { return createKeyedStateBackend( env, jobID, operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry, ttlTimeProvider, new UnregisteredMetricsGroup()); } /* * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b> * and checkpointing it. * * <p><i>Keyed State</i> is state where each value is bound to a key. * * @param <K> The type of the keys by which the state is organized. * * @return The Keyed State Backend for the given job, operator, and key group range. * * @throws Exception This method may forward all exceptions that occur while instantiating the backend. / <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup) throws Exception; /* * Creates a new {@link OperatorStateBackend} that can be used for storing operator state. * * <p>Operator state is state that is associated with parallel operator (or function) instances, * rather than with keys. * * @param env The runtime environment of the executing task. * @param operatorIdentifier The identifier of the operator whose state should be stored. * * @return The OperatorStateBackend for operator identified by the job and operator identifier. * * @throws Exception This method may forward all exceptions that occur while instantiating the backend. / OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception;}StateBackend接口定义了有状态的streaming应用的state是如何stored以及checkpointedflink目前内置支持MemoryStateBackend、FsStateBackend、RocksDBStateBackend三种,如果没有配置默认为MemoryStateBackend;在flink-conf.yaml里头可以进行全局的默认配置,不过具体每个job还可以通过StreamExecutionEnvironment.setStateBackend来覆盖全局的配置MemoryStateBackend可以在构造器中指定大小,默认是5MB,可以增大但是不能超过akka frame size;FsStateBackend模式把TaskManager的state存储在内存,但是可以把checkpoint的state存储到filesystem中(比如HDFS);RocksDBStateBackend把working state存储在RocksDB中,checkpoint的state存储在filesystemStateBackend接口定义了createCheckpointStorage、createKeyedStateBackend、createOperatorStateBackend方法;同时继承了Serializable接口;StateBackend接口的实现要求是线程安全的StateBackend有个直接实现的抽象类AbstractStateBackend,而AbstractFileStateBackend及RocksDBStateBackend继承了AbstractStateBackend,之后MemoryStateBackend、FsStateBackend都继承了AbstractFileStateBackendAbstractStateBackendflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractStateBackend.java/* * An abstract base implementation of the {@link StateBackend} interface. * * <p>This class has currently no contents and only kept to not break the prior class hierarchy for users. /@PublicEvolvingpublic abstract class AbstractStateBackend implements StateBackend, java.io.Serializable { private static final long serialVersionUID = 4620415814639230247L; // ———————————————————————— // State Backend - State-Holding Backends // ———————————————————————— @Override public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup) throws IOException; @Override public abstract OperatorStateBackend createOperatorStateBackend( Environment env, String operatorIdentifier) throws Exception;}AbstractStateBackend声明实现StateBackend及Serializable接口,这里没有新增其他内容AbstractFileStateBackendflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java@PublicEvolvingpublic abstract class AbstractFileStateBackend extends AbstractStateBackend { private static final long serialVersionUID = 1L; // ———————————————————————— // State Backend Properties // ———————————————————————— /* The path where checkpoints will be stored, or null, if none has been configured. / @Nullable private final Path baseCheckpointPath; /* The path where savepoints will be stored, or null, if none has been configured. / @Nullable private final Path baseSavepointPath; //…… // ———————————————————————— // Initialization and metadata storage // ———————————————————————— @Override public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException { return AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer); } // ———————————————————————— // Utilities // ———————————————————————— /* * Checks the validity of the path’s scheme and path. * * @param path The path to check. * @return The URI as a Path. * * @throws IllegalArgumentException Thrown, if the URI misses scheme or path. / private static Path validatePath(Path path) { final URI uri = path.toUri(); final String scheme = uri.getScheme(); final String pathPart = uri.getPath(); // some validity checks if (scheme == null) { throw new IllegalArgumentException(“The scheme (hdfs://, file://, etc) is null. " + “Please specify the file system scheme explicitly in the URI.”); } if (pathPart == null) { throw new IllegalArgumentException(“The path to store the checkpoint data in is null. " + “Please specify a directory path for the checkpoint data.”); } if (pathPart.length() == 0 || pathPart.equals(”/”)) { throw new IllegalArgumentException(“Cannot use the root directory for checkpoints.”); } return path; } @Nullable private static Path parameterOrConfigured(@Nullable Path path, Configuration config, ConfigOption<String> option) { if (path != null) { return path; } else { String configValue = config.getString(option); try { return configValue == null ? null : new Path(configValue); } catch (IllegalArgumentException e) { throw new IllegalConfigurationException(“Cannot parse value for " + option.key() + " : " + configValue + " . Not a valid path.”); } } }}AbstractFileStateBackend继承了AbstractStateBackend,它有baseCheckpointPath、baseSavepointPath两个属性,允许为null,路径的格式为hdfs://或者file://开头;resolveCheckpoint方法用于解析checkpoint或savepoint的location,这里使用AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer)来完成MemoryStateBackendflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemoryStateBackend.java@PublicEvolvingpublic class MemoryStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend { private static final long serialVersionUID = 4109305377809414635L; /* The default maximal size that the snapshotted memory state may have (5 MiBytes). / public static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024; /* The maximal size that the snapshotted memory state may have. / private final int maxStateSize; /* Switch to chose between synchronous and asynchronous snapshots. * A value of ‘UNDEFINED’ means not yet configured, in which case the default will be used. / private final TernaryBoolean asynchronousSnapshots; // ———————————————————————— /* * Creates a new memory state backend that accepts states whose serialized forms are * up to the default state size (5 MB). * * <p>Checkpoint and default savepoint locations are used as specified in the * runtime configuration. / public MemoryStateBackend() { this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED); } /* * Creates a new memory state backend that accepts states whose serialized forms are * up to the default state size (5 MB). The state backend uses asynchronous snapshots * or synchronous snapshots as configured. * * <p>Checkpoint and default savepoint locations are used as specified in the * runtime configuration. * * @param asynchronousSnapshots Switch to enable asynchronous snapshots. / public MemoryStateBackend(boolean asynchronousSnapshots) { this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.fromBoolean(asynchronousSnapshots)); } /* * Creates a new memory state backend that accepts states whose serialized forms are * up to the given number of bytes. * * <p>Checkpoint and default savepoint locations are used as specified in the * runtime configuration. * * <p><b>WARNING:</b> Increasing the size of this value beyond the default value * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care. * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there * and the JobManager needs to be able to hold all aggregated state in its memory. * * @param maxStateSize The maximal size of the serialized state / public MemoryStateBackend(int maxStateSize) { this(null, null, maxStateSize, TernaryBoolean.UNDEFINED); } /* * Creates a new memory state backend that accepts states whose serialized forms are * up to the given number of bytes and that uses asynchronous snashots as configured. * * <p>Checkpoint and default savepoint locations are used as specified in the * runtime configuration. * * <p><b>WARNING:</b> Increasing the size of this value beyond the default value * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care. * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there * and the JobManager needs to be able to hold all aggregated state in its memory. * * @param maxStateSize The maximal size of the serialized state * @param asynchronousSnapshots Switch to enable asynchronous snapshots. / public MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) { this(null, null, maxStateSize, TernaryBoolean.fromBoolean(asynchronousSnapshots)); } /* * Creates a new MemoryStateBackend, setting optionally the path to persist checkpoint metadata * to, and to persist savepoints to. * * @param checkpointPath The path to write checkpoint metadata to. If null, the value from * the runtime configuration will be used. * @param savepointPath The path to write savepoints to. If null, the value from * the runtime configuration will be used. / public MemoryStateBackend(@Nullable String checkpointPath, @Nullable String savepointPath) { this(checkpointPath, savepointPath, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED); } /* * Creates a new MemoryStateBackend, setting optionally the paths to persist checkpoint metadata * and savepoints to, as well as configuring state thresholds and asynchronous operations. * * <p><b>WARNING:</b> Increasing the size of this value beyond the default value * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care. * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there * and the JobManager needs to be able to hold all aggregated state in its memory. * * @param checkpointPath The path to write checkpoint metadata to. If null, the value from * the runtime configuration will be used. * @param savepointPath The path to write savepoints to. If null, the value from * the runtime configuration will be used. * @param maxStateSize The maximal size of the serialized state. * @param asynchronousSnapshots Flag to switch between synchronous and asynchronous * snapshot mode. If null, the value configured in the * runtime configuration will be used. / public MemoryStateBackend( @Nullable String checkpointPath, @Nullable String savepointPath, int maxStateSize, TernaryBoolean asynchronousSnapshots) { super(checkpointPath == null ? null : new Path(checkpointPath), savepointPath == null ? null : new Path(savepointPath)); checkArgument(maxStateSize > 0, “maxStateSize must be > 0”); this.maxStateSize = maxStateSize; this.asynchronousSnapshots = asynchronousSnapshots; } /* * Private constructor that creates a re-configured copy of the state backend. * * @param original The state backend to re-configure * @param configuration The configuration / private MemoryStateBackend(MemoryStateBackend original, Configuration configuration) { super(original.getCheckpointPath(), original.getSavepointPath(), configuration); this.maxStateSize = original.maxStateSize; // if asynchronous snapshots were configured, use that setting, // else check the configuration this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined( configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS)); } // ———————————————————————— // Properties // ———————————————————————— /* * Gets the maximum size that an individual state can have, as configured in the * constructor (by default {@value #DEFAULT_MAX_STATE_SIZE}). * * @return The maximum size that an individual state can have / public int getMaxStateSize() { return maxStateSize; } /* * Gets whether the key/value data structures are asynchronously snapshotted. * * <p>If not explicitly configured, this is the default value of * {@link CheckpointingOptions#ASYNC_SNAPSHOTS}. / public boolean isUsingAsynchronousSnapshots() { return asynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue()); } // ———————————————————————— // Reconfiguration // ———————————————————————— /* * Creates a copy of this state backend that uses the values defined in the configuration * for fields where that were not specified in this state backend. * * @param config the configuration * @return The re-configured variant of the state backend */ @Override public MemoryStateBackend configure(Configuration config) { return new MemoryStateBackend(this, config); } // ———————————————————————— // checkpoint state persistence // ———————————————————————— @Override public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException { return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath(), maxStateSize); } // ———————————————————————— // state holding structures // ———————————————————————— @Override public OperatorStateBackend createOperatorStateBackend( Environment env, String operatorIdentifier) throws Exception { return new DefaultOperatorStateBackend( env.getUserClassLoader(), env.getExecutionConfig(), isUsingAsynchronousSnapshots()); } @Override public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup) { TaskStateManager taskStateManager = env.getTaskStateManager(); HeapPriorityQueueSetFactory priorityQueueSetFactory = new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128); return new HeapKeyedStateBackend<>( kvStateRegistry, keySerializer, env.getUserClassLoader(), numberOfKeyGroups, keyGroupRange, isUsingAsynchronousSnapshots(), env.getExecutionConfig(), taskStateManager.createLocalRecoveryConfig(), priorityQueueSetFactory, ttlTimeProvider); } // ———————————————————————— // utilities // ———————————————————————— @Override public String toString() { return “MemoryStateBackend (data in heap memory / checkpoints to JobManager) " + “(checkpoints: ‘” + getCheckpointPath() + “’, savepoints: ‘” + getSavepointPath() + “’, asynchronous: " + asynchronousSnapshots + “, maxStateSize: " + maxStateSize + “)”; }}MemoryStateBackend继承了AbstractFileStateBackend,实现ConfigurableStateBackend接口(configure方法);它将TaskManager的working state及JobManager的checkpoint state存储在JVM heap中(但是为了高可用,也可以设置checkpoint state存储到filesystem);MemoryStateBackend仅仅用来做实验用途,比如本地启动或者所需的state非常小,对于生产需要改为使用FsStateBackend(将TaskManager的working state存储在内存,但是将JobManager的checkpoint state存储到文件系统以支持更大的state存储)MemoryStateBackend有个maxStateSize属性(默认DEFAULT_MAX_STATE_SIZE为5MB),每个state的大小不能超过maxStateSize,一个task的所有state不能超过RPC系统的限制(默认是10MB,可以修改但不建议),所有retained checkpoints的state大小总和不能超过JobManager的JVM heap大小;另外如果创建MemoryStateBackend时未指定checkpointPath及savepointPath,则会从flink-conf.yaml中读取全局默认值;MemoryStateBackend里头还有一个asynchronousSnapshots属性,是TernaryBoolean类型(TRUE、FALSE、UNDEFINED),其中UNDEFINED表示没有配置,将会使用默认值MemoryStateBackend的createCheckpointStorage创建的是MemoryBackendCheckpointStorage;createOperatorStateBackend方法创建的是OperatorStateBackend;createKeyedStateBackend方法创建的是HeapKeyedStateBackend小结StateBackend接口定义了有状态的streaming应用的state是如何stored以及checkpointed;目前内置支持MemoryStateBackend、FsStateBackend、RocksDBStateBackend三种,如果没有配置默认为MemoryStateBackend;在flink-conf.yaml里头可以进行全局的默认配置,不过具体每个job还可以通过StreamExecutionEnvironment.setStateBackend来覆盖全局的配置StateBackend接口定义了createCheckpointStorage、createKeyedStateBackend、createOperatorStateBackend方法;同时继承了Serializable接口;StateBackend接口的实现要求是线程安全的;StateBackend有个直接实现的抽象类AbstractStateBackend,而AbstractFileStateBackend及RocksDBStateBackend继承了AbstractStateBackend,之后MemoryStateBackend、FsStateBackend都继承了AbstractFileStateBackendMemoryStateBackend继承了AbstractFileStateBackend,实现ConfigurableStateBackend接口(configure方法);它将TaskManager的working state及JobManager的checkpoint state存储在JVM heap中;MemoryStateBackend的createCheckpointStorage创建的是MemoryBackendCheckpointStorage;createOperatorStateBackend方法创建的是OperatorStateBackend;createKeyedStateBackend方法创建的是HeapKeyedStateBackenddocState Backends ...

December 10, 2018 · 10 min · jiezi

[case49]聊聊flink的checkpoint配置

序本文主要研究下flink的checkpoint配置实例StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// start a checkpoint every 1000 msenv.enableCheckpointing(1000);// advanced options:// set mode to exactly-once (this is the default)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// checkpoints have to complete within one minute, or are discardedenv.getCheckpointConfig().setCheckpointTimeout(60000);// make sure 500 ms of progress happen between checkpointsenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// allow only one checkpoint to be in progress at the same timeenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// enable externalized checkpoints which are retained after job cancellationenv.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.env.getCheckpointConfig().setFailOnCheckpointingErrors(true);使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode);interval用于指定checkpoint的触发间隔(单位milliseconds),而CheckpointingMode默认是CheckpointingMode.EXACTLY_ONCE,也可以指定为CheckpointingMode.AT_LEAST_ONCE也可以通过StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode,一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.AT_LEAST_ONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以checkpointTimeout用于指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉minPauseBetweenCheckpoints用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个,用于包装topology不会花太多的时间在checkpoints上面;如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)enableExternalizedCheckpoints用于开启checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state;ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint statefailOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行flink-conf.yaml相关配置#==============================================================================# Fault tolerance and checkpointing#==============================================================================# The backend that will be used to store operator state checkpoints if# checkpointing is enabled.## Supported backends are ‘jobmanager’, ‘filesystem’, ‘rocksdb’, or the# <class-name-of-factory>.## state.backend: filesystem# Directory for checkpoints filesystem, when using any of the default bundled# state backends.## state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints# Default target directory for savepoints, optional.## state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints# Flag to enable/disable incremental checkpoints for backends that# support incremental checkpoints (like the RocksDB state backend). ## state.backend.incremental: falsestate.backend用于指定checkpoint state存储的backend,默认为nonestate.backend.async用于指定backend是否使用异步snapshot(默认为true),有些不支持async或者只支持async的state backend可能会忽略这个参数state.backend.fs.memory-threshold,默认为1024,用于指定存储于files的state大小阈值,如果小于该值则会存储在root checkpoint metadata filestate.backend.incremental,默认为false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend会忽略该配置state.backend.local-recovery,默认为falsestate.checkpoints.dir,默认为none,用于指定checkpoint的data files和meta data存储的目录,该目录必须对所有参与的TaskManagers及JobManagers可见state.checkpoints.num-retained,默认为1,用于指定保留的已完成的checkpoints个数state.savepoints.dir,默认为none,用于指定savepoints的默认目录taskmanager.state.local.root-dirs,默认为none小结可以通过使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode)checkpoint的高级配置可以配置checkpointTimeout(用于指定checkpoint执行的超时时间,单位milliseconds),minPauseBetweenCheckpoints(用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint),maxConcurrentCheckpoints(用于指定运行中的checkpoint最多可以有多少个,如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数大于1的值不起作用),enableExternalizedCheckpoints(用于开启checkpoints的外部持久化,在job failed的时候externalized checkpoint state无法自动清理,但是在job canceled的时候可以配置是删除还是保留state)在flink-conf.yaml里头也有checkpoint的相关配置,主要是state backend的配置,比如state.backend.async、state.backend.incremental、state.checkpoints.dir、state.savepoints.dir等docCheckpointing ...

December 9, 2018 · 1 min · jiezi

聊聊flink StreamOperator的initializeState方法

序本文主要研究一下flink StreamOperator的initializeState方法Task.runflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javapublic class Task implements Runnable, TaskActions, CheckpointListener { public void run() { // —————————- // Initial State transition // —————————- while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.CREATED) { if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) { // success, we can start our work break; } } else if (current == ExecutionState.FAILED) { // we were immediately failed. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } else if (current == ExecutionState.CANCELING) { if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) { // we were immediately canceled. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } } else { if (metrics != null) { metrics.close(); } throw new IllegalStateException(“Invalid state for beginning of operation of task " + this + ‘.’); } } // all resource acquisitions and registrations from here on // need to be undone in the end Map<String, Future<Path>> distributedCacheEntries = new HashMap<>(); AbstractInvokable invokable = null; try { // —————————- // Task Bootstrap - We periodically // check for canceling as a shortcut // —————————- //…… // —————————————————————- // call the user code initialization methods // —————————————————————- TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId()); Environment env = new RuntimeEnvironment( jobId, vertexId, executionId, executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, taskStateManager, accumulatorRegistry, kvStateRegistry, inputSplitProvider, distributedCacheEntries, producedPartitions, inputGates, network.getTaskEventDispatcher(), checkpointResponder, taskManagerConfig, metrics, this); // now load and instantiate the task’s invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // —————————————————————- // actual task core work // —————————————————————- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); // make sure, we enter the catch block if the task leaves the invoke() method due // to the fact that it has been canceled if (isCanceledOrFailed()) { throw new CancelTaskException(); } // —————————————————————- // finalization of a successful execution // —————————————————————- // finish the produced partitions. if this fails, we consider the execution failed. for (ResultPartition partition : producedPartitions) { if (partition != null) { partition.finish(); } } // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { throw new CancelTaskException(); } } catch (Throwable t) { //…… } finally { //…… } } //……}Task的run方法会调用invokable.invoke(),这里的invokable为StreamTaskStreamTask.invokeflink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java@Internalpublic abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { @Override public final void invoke() throws Exception { boolean disposed = false; try { // ——– Initialize ——— LOG.debug(“Initializing {}.”, getName()); asyncOperationsThreadPool = Executors.newCachedThreadPool(); CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory(); synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler( getExecutionConfig().isFailTaskOnCheckpointError(), getEnvironment()); asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this); stateBackend = createStateBackend(); checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID()); // if the clock is not already set, then assign a default TimeServiceProvider if (timerService == null) { ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, “Time Trigger for " + getName(), getUserCodeClassLoader()); timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory); } operatorChain = new OperatorChain<>(this, streamRecordWriters); headOperator = operatorChain.getHeadOperator(); // task specific initialization init(); // save the work of reloading state, etc, if the task is already canceled if (canceled) { throw new CancelTaskException(); } // ——– Invoke ——– LOG.debug(“Invoking {}”, getName()); // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened synchronized (lock) { // both the following operations are protected by the lock // so that we avoid race conditions in the case that initializeState() // registers a timer, that fires before the open() is called. initializeState(); openAllOperators(); } // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the “clean shutdown” is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug(“Finished task {}”, getName()); // make sure no further checkpoint and notification actions happen. // we make sure that no other thread is currently in the locked scope before // we close the operators by trying to acquire the checkpoint scope lock // we also need to make sure that no triggers fire concurrently with the close logic // at the same time, this makes sure that during any “regular” exit where still synchronized (lock) { // this is part of the main logic, so if this fails, the task is considered failed closeAllOperators(); // make sure no new timers can come timerService.quiesce(); // only set the StreamTask to not running after all operators have been closed! // See FLINK-7430 isRunning = false; } // make sure all timers finish timerService.awaitPendingAfterQuiesce(); LOG.debug(“Closed operators for task {}”, getName()); // make sure all buffered data is flushed operatorChain.flushOutputs(); // make an attempt to dispose the operators such that failures in the dispose call // still let the computation fail tryDisposeAllOperators(); disposed = true; } finally { //…… } } private void initializeState() throws Exception { StreamOperator<?>[] allOperators = operatorChain.getAllOperators(); for (StreamOperator<?> operator : allOperators) { if (null != operator) { operator.initializeState(); } } } //……}StreamTask的invoke方法会调用initializeState方法,该方法会遍历operatorChain上的allOperators(StreamOperator),调用其initializeState方法;比如这里的operator为StreamSourceStreamOperator.initializeStateflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamOperator.java@PublicEvolvingpublic interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable { /** * Provides a context to initialize all state in the operator. / void initializeState() throws Exception; //……}StreamOperator接口定义了initializeState方法用于初始化operator的stateStreamSource.initializeStateflink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java@Internalpublic class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> { //……}StreamSource继承了AbstractUdfStreamOperator,它没有覆盖initializeState,而AbstractUdfStreamOperator也没有覆盖initializeState方法,因而是执行的是AbstractUdfStreamOperator的父类AbstractStreamOperator的initializeStateAbstractStreamOperator.initializeStateflink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java@PublicEvolvingpublic abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, Serializable { @Override public final void initializeState() throws Exception { final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask()); final CloseableRegistry streamTaskCloseableRegistry = Preconditions.checkNotNull(containingTask.getCancelables()); final StreamTaskStateInitializer streamTaskStateManager = Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer()); final StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext( getOperatorID(), getClass().getSimpleName(), this, keySerializer, streamTaskCloseableRegistry, metrics); this.operatorStateBackend = context.operatorStateBackend(); this.keyedStateBackend = context.keyedStateBackend(); if (keyedStateBackend != null) { this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig()); } timeServiceManager = context.internalTimerServiceManager(); CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs(); CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs(); try { StateInitializationContext initializationContext = new StateInitializationContextImpl( context.isRestored(), // information whether we restore or start for the first time operatorStateBackend, // access to operator state backend keyedStateStore, // access to keyed state backend keyedStateInputs, // access to keyed state stream operatorStateInputs); // access to operator state stream initializeState(initializationContext); } finally { closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry); closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry); } } /* * Stream operators with state which can be restored need to override this hook method. * * @param context context that allows to register different states. */ public void initializeState(StateInitializationContext context) throws Exception { } //……}AbstractStreamOperator实现了StreamOperator接口定义的initializeState方法,该方法会调用initializeState(initializationContext)方法,其子类AbstractUdfStreamOperator对该方法进行了覆盖AbstractUdfStreamOperator.initializeState(initializationContext)flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java@PublicEvolvingpublic abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> { @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); StreamingFunctionUtils.restoreFunctionState(context, userFunction); } //……}initializeState(initializationContext)方法这里调用了StreamingFunctionUtils.restoreFunctionStateStreamingFunctionUtils.restoreFunctionStateflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java public static void restoreFunctionState( StateInitializationContext context, Function userFunction) throws Exception { Preconditions.checkNotNull(context); while (true) { if (tryRestoreFunction(context, userFunction)) { break; } // inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function if (userFunction instanceof WrappingFunction) { userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); } else { break; } } } private static boolean tryRestoreFunction( StateInitializationContext context, Function userFunction) throws Exception { if (userFunction instanceof CheckpointedFunction) { ((CheckpointedFunction) userFunction).initializeState(context); return true; } if (context.isRestored() && userFunction instanceof ListCheckpointed) { @SuppressWarnings(“unchecked”) ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction; ListState<Serializable> listState = context.getOperatorStateStore(). getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); List<Serializable> list = new ArrayList<>(); for (Serializable serializable : listState.get()) { list.add(serializable); } try { listCheckpointedFun.restoreState(list); } catch (Exception e) { throw new Exception(“Failed to restore state to function: " + e.getMessage(), e); } return true; } return false; }restoreFunctionState主要是调用了tryRestoreFunction方法,而该方法会判断,如果userFunction实现了CheckpointedFunction接口则调用其initializeState方法,如果userFunction实现了ListCheckpointed接口,而且是context.isRestored()为true,那么就会从OperatorStateStore获取ListState,将里头的值转换为List,调用ListCheckpointed.restoreState方法小结Task的run方法会触发invokable.invoke(),这里的invokable为StreamTask,StreamTask的invoke方法会调用initializeState方法,该方法会遍历operatorChain上的allOperators(StreamOperator),调用其initializeState方法;比如这里的operator为StreamSource,它继承了AbstractUdfStreamOperatorStreamOperator接口定义了initializeState方法用于初始化operator的state,其抽象子类AbstractStreamOperator实现了initializeState方法,但是它内部会调用调用initializeState(initializationContext)方法,而其子类AbstractUdfStreamOperator对该方法进行了覆盖AbstractUdfStreamOperator的initializeState(initializationContext)方法调用了StreamingFunctionUtils.restoreFunctionState,而后者会判断,如果userFunction实现了CheckpointedFunction接口则调用其initializeState方法,如果userFunction实现了ListCheckpointed接口,而且是context.isRestored()为true,那么就会从OperatorStateStore获取ListState,将里头的值转换为List,调用ListCheckpointed.restoreState方法docWorking with State ...

December 8, 2018 · 6 min · jiezi

Flink SQL 核心解密 —— 提升吞吐的利器 MicroBatch

之前我们在 Flink SQL 中支持了 MiniBatch, 在支持高吞吐场景发挥了重要作用。今年我们在 Flink SQL 性能优化中一项重要的改进就是升级了微批模型,我们称之为 MicroBatch,也叫 MiniBatch2.0。在设计和实现 Flink 的流计算算子时,我们一般会把“面向状态编程”作为第一准则。因为在流计算中,为了保证状态(State)的一致性,需要将状态数据存储在状态后端(StateBackend),由框架来做分布式快照。而目前主要使用的RocksDB,Niagara状态后端都会在每次read和write操作时发生序列化和反序列化操作,甚至是磁盘的 I/O 操作。因此状态的相关操作通常都会成为整个任务的性能瓶颈,状态的数据结构设计以及对状态的每一次访问都需要特别注意。微批的核心思想就是缓存一小批数据,在访问状态状态时,多个同 key 的数据就只需要发生一次状态的操作。当批次内数据的 key 重复率较大时,能显著降低对状态的访问频次,从而大幅提高吞吐。MicroBatch 和 MiniBatch 的核心机制是一样的,就是攒批,然后触发计算。只是攒批策略不太一样。我们先讲解触发计算时是如何节省状态访问频次的。微批计算MicroBatch 的一个典型应用场景就是 Group Aggregate。例如简单的求和例子:SELECT key, SUM(value) FROM T GROUP BY key如上图所示,当未开启 MicroBatch 时,Aggregate 的处理模式是每来一条数据,查询一次状态,进行聚合计算,然后写入一次状态。当有 N 条数据时,需要操作 2*N 次状态。当开启 MicroBatch 时,对于缓存下来的 N 条数据一起触发,同 key 的数据只会读写状态一次。例如上图缓存的 4 条 A 的记录,只会对状态读写各一次。所以当数据的 key 的重复率越大,攒批的大小越大,那么对状态的访问会越少,得到的吞吐量越高。攒批策略攒批策略一般分成两个维度,一个是延时,一个是内存。延时即控制多久攒一次批,这也是用来权衡吞吐和延迟的重要参数。内存即为了避免瞬间 TPS 太大导致内存无法存下缓存的数据,避免造成 Full GC 和 OOM。下面会分别介绍旧版 MiniBatch 和 新版 MicroBatch 在这两个维度上的区别。MiniBatch 攒批策略MiniBatch 攒批策略的延时维度是通过在每个聚合节点注册单独的定时器来实现,时间分配策略采用简单的均分。比如有4个 aggregate 节点,用户配置 10s 的 MiniBatch,那么每个节点会分配2.5s,例如下图所示:但是这种策略有以下几个问题:用户能容忍 10s 的延时,但是真正用来攒批的只有2.5秒,攒批效率低。拓扑越复杂,差异越明显。由于上下游的定时器的触发是纯异步的,可能导致上游触发微批的时候,下游也正好触发微批,而处理微批时会一段时间不消费网络数据,导致上游很容易被反压。计时器会引入额外的线程,增加了线程调度和抢锁上的开销。MiniBatch 攒批策略在内存维度是通过统计输入条数,当输入的条数超过用户配置的 blink.miniBatch.size 时,就会触发批次以防止 OOM。但是 size 参数并不是很好评估,一方面当 size 配的过大,可能会失去保护内存的作用;而当 size 配的太小,又会导致攒批效率降低。MicroBatch 攒批策略MicroBatch 的提出就是为了解决 MiniBatch 遇到的上述问题。MicroBatch 引入了 watermark 来控制聚合节点的定时触发功能,用 watermark 作为特殊事件插入数据流中将数据流切分成相等时间间隔的一个个批次。实现原理如下所示:MicroBatch 会在数据源之后插入一个 MicroBatchAssigner 的节点,用来定时发送 watermark,其间隔是用户配置的延时参数,如10s。那么每隔10s,不管数据源有没有数据,都会发一个当前系统时间戳的 watermark 下去。一个节点的当前 watermark 取自所有 channel 的最小 watermark 值,所以当聚合节点的 watermark 值前进时,也就意味着攒齐了上游的一个批次,我们就可以触发这个批次了。处理完这个批次后,需要将当前 watermark 广播给下游所有 task。当下游 task 收齐上游 watermark 时,也会触发批次。这样批次的触发会从上游到下游逐级触发。这里将 watermark 作为划分批次的特殊事件是很有意思的一点。Watermark 是一个非常强大的工具,一般我们用来衡量业务时间的进度,解决业务时间乱序的问题。但其实换一个维度,它也可以用来衡量全局系统时间的进度,从而非常巧妙地解决数据划批的问题。因此与 MiniBatch 策略相比,MicroBatch 具有以下优点:相同延时下,MicroBatch 的攒批效率更高,能攒更多的数据。由于 MicroBatch 的批次触发是靠事件的,当上游触发时,下游不会同时触发,所以不像 MiniBatch 那么容易引起反压。解决数据抖动问题(下一小节分析)我们利用一个 DAU 作业进行了性能测试对比,在相同的 allowLatency(6秒)配置的情况下,MicroBatch 能得到更高的吞吐,而且还能得到与 MiniBatch 相同的端到端延迟!另外,仍然是上述的性能测试对比,可以发现运行稳定后 MicroBatch 的队列使用率平均值在 50% 以下,而 MiniBatch 基本是一直处于队列满载下。说明 MicroBatch 比 MiniBatch 更加稳定,更不容易引起反压。MicroBatch 在内存维度目前仍然与 MiniBatch 一样,使用 size 参数来控制条数。但是将来会基于内存管理,将缓存的数据存于管理好的内存块中(BytesHashMap),从而减少 Java 对象的空间成本,减少 GC 的压力和防止 OOM。防止数据抖动所谓数据抖动问题是指,两层 AGG 时,第一层 AGG 发出的更新消息会拆成两条独立的消息被下游消费,分别是retract 消息和 accumulate 消息。而当第二层 AGG 消费这两条消息时也会发出两条消息。从前端看到就是数据会有抖动的现象。例如下面的例子,统计买家数,这里做了两层打散,第一层先做 UV 统计,第二级做SUM。SELECT day, SUM(cnt) totalFROM ( SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, MOD(buy_id, 1024))GROUP BY day当第一层count distinct的结果从100上升到101时,它会发出 -100, +101 的两条消息。当第二层的 SUM 会依次收到这两条消息并处理,假设此时 SUM 值是 900,那么在处理 -100 时,会先发出 800 的结果值,然后处理 +101 时,再发出 901 的结果值。从用户端的感受就是买家数从 900 降到了 800 又上升到了 901,我们称之为数据抖动。而理论上买家数只应该只增不减的,所以我们也一直在思考如何解决这个问题。数据抖动的本质原因是 retract 和 accumulate 消息是一个事务中的两个操作,但是这两个操作的中间结果被用户看到了,也就是传统数据库 ACID 中的隔离性(I) 中最弱的 READ UNCOMMITTED 的事务保障。要从根本上解决这个问题的思路是,如何原子地处理 retract & accumulate 的消息。如上文所述的 MicroBatch 策略,借助 watermark 划批,watermark 不会插在 retract & accumulate 中间,那么 watermark 就是事务的天然分界。按照 watermark 来处理批次可以达到原子处理 retract & accumulate 的目的。从而解决抖动问题。适用场景与使用方式MicroBatch 是使用一定的延迟来换取大量吞吐的策略,如果用户有超低延迟的要求的话,不建议开启微批处理。MicroBatch 目前对于无限流的聚合、Join 都有显著的性能提升,所以建议开启。如果遇到了上述的数据抖动问题,也建议开启。MicroBatch默认关闭,开启方式:# 攒批的间隔时间,使用 microbatch 策略时需要加上该配置,且建议和 blink.miniBatch.allowLatencyMs 保持一致blink.microBatch.allowLatencyMs=5000# 使用 microbatch 时需要保留以下两个 minibatch 配置blink.miniBatch.allowLatencyMs=5000# 防止OOM,每个批次最多缓存多少条数据blink.miniBatch.size=20000后续优化MicroBatch 目前只支持无限流的聚合和 Join,暂不支持 Window Aggregate。所以后续 Window Aggregate 会重点支持 MicroBatch 策略,以提升吞吐性能。另一方面,MicroBatch 的内存会考虑使用二进制的数据结构管理起来,提升内存的利用率和减轻 GC 的影响。本文作者:jark阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

December 7, 2018 · 2 min · jiezi

聊聊flink的CheckpointScheduler

序本文主要研究一下flink的CheckpointSchedulerCheckpointCoordinatorDeActivatorflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java/** * This actor listens to changes in the JobStatus and activates or deactivates the periodic * checkpoint scheduler. /public class CheckpointCoordinatorDeActivator implements JobStatusListener { private final CheckpointCoordinator coordinator; public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) { this.coordinator = checkNotNull(coordinator); } @Override public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { if (newJobStatus == JobStatus.RUNNING) { // start the checkpoint scheduler coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } }}CheckpointCoordinatorDeActivator实现了JobStatusListener接口,在jobStatusChanges的时候,根据状态来调用coordinator.startCheckpointScheduler或者coordinator.stopCheckpointSchedulerCheckpointCoordinator.ScheduledTriggerflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java/* * The checkpoint coordinator coordinates the distributed snapshots of operators and state. * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. /public class CheckpointCoordinator { /* Map from checkpoint ID to the pending checkpoint / private final Map<Long, PendingCheckpoint> pendingCheckpoints; /* The number of consecutive failed trigger attempts / private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); //…… public void startCheckpointScheduler() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException(“Checkpoint coordinator is shut down”); } // make sure all prior timers are cancelled stopCheckpointScheduler(); periodicScheduling = true; long initialDelay = ThreadLocalRandom.current().nextLong( minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L); currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS); } } public void stopCheckpointScheduler() { synchronized (lock) { triggerRequestQueued = false; periodicScheduling = false; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } for (PendingCheckpoint p : pendingCheckpoints.values()) { p.abortError(new Exception(“Checkpoint Coordinator is suspending.”)); } pendingCheckpoints.clear(); numUnsuccessfulCheckpointsTriggers.set(0); } } private final class ScheduledTrigger implements Runnable { @Override public void run() { try { triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { LOG.error(“Exception while triggering checkpoint for job {}.”, job, e); } } } //……}CheckpointCoordinator的startCheckpointScheduler方法首先调用stopCheckpointScheduler取消PendingCheckpoint,之后使用timer.scheduleAtFixedRate重新调度ScheduledTriggerstopCheckpointScheduler会调用PendingCheckpoint.abortError来取消pendingCheckpoints,然后清空pendingCheckpoints(Map<Long, PendingCheckpoint>)以及numUnsuccessfulCheckpointsTriggers(AtomicInteger)ScheduledTrigger实现了Runnable接口,其run方法主要是调用triggerCheckpoint,传递的isPeriodic参数为trueCheckpointCoordinator.triggerCheckpointflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java/* * The checkpoint coordinator coordinates the distributed snapshots of operators and state. * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. /public class CheckpointCoordinator { /* Tasks who need to be sent a message when a checkpoint is started / private final ExecutionVertex[] tasksToTrigger; /* Tasks who need to acknowledge a checkpoint before it succeeds / private final ExecutionVertex[] tasksToWaitFor; /* Map from checkpoint ID to the pending checkpoint / private final Map<Long, PendingCheckpoint> pendingCheckpoints; /* The maximum number of checkpoints that may be in progress at the same time / private final int maxConcurrentCheckpointAttempts; /* The min time(in ns) to delay after a checkpoint could be triggered. Allows to * enforce minimum processing time between checkpoint attempts / private final long minPauseBetweenCheckpointsNanos; /* * Triggers a new standard checkpoint and uses the given timestamp as the checkpoint * timestamp. * * @param timestamp The timestamp for the checkpoint. * @param isPeriodic Flag indicating whether this triggered checkpoint is * periodic. If this flag is true, but the periodic scheduler is disabled, * the checkpoint will be declined. * @return <code>true</code> if triggering the checkpoint succeeded. / public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess(); } @VisibleForTesting public CheckpointTriggerResult triggerCheckpoint( long timestamp, CheckpointProperties props, @Nullable String externalSavepointLocation, boolean isPeriodic) { // make some eager pre-checks synchronized (lock) { // abort if the coordinator has been shutdown in the meantime if (shutdown) { return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } // Don’t allow periodic checkpoint if scheduling has been disabled if (isPeriodic && !periodicScheduling) { return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN); } // validate whether the checkpoint can be triggered, with respect to the limit of // concurrent checkpoints, and the minimum time between checkpoints. // these checks are not relevant for savepoints if (!props.forceCheckpoint()) { // sanity check: there should never be more than one trigger request queued if (triggerRequestQueued) { LOG.warn(“Trying to trigger another checkpoint for job {} while one was queued already.”, job); return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); } // if too many checkpoints are currently in progress, we need to mark that a request is queued if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } // make sure the minimum interval between checkpoints has passed final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } // Reassign the new trigger to the currentPeriodicTrigger currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } } // check if all tasks that we need to trigger are running. // if not, abort the checkpoint Execution[] executions = new Execution[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); if (ee == null) { LOG.info(“Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.”, tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } else if (ee.getState() == ExecutionState.RUNNING) { executions[i] = ee; } else { LOG.info(“Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.”, tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } // next, check if all tasks that need to acknowledge the checkpoint are running. // if not, abort the checkpoint Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length); for (ExecutionVertex ev : tasksToWaitFor) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ackTasks.put(ee.getAttemptId(), ev); } else { LOG.info(“Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.”, ev.getTaskNameWithSubtaskIndex(), job); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } // we will actually trigger this checkpoint! // we lock with a special lock to make sure that trigger requests do not overtake each other. // this is not done with the coordinator-wide lock, because the ‘checkpointIdCounter’ // may issue blocking operations. Using a different lock than the coordinator-wide lock, // we avoid blocking the processing of ‘acknowledge/decline’ messages during that time. synchronized (triggerLock) { final CheckpointStorageLocation checkpointStorageLocation; final long checkpointID; try { // this must happen outside the coordinator-wide lock, because it communicates // with external services (in HA mode) and may block for a while. checkpointID = checkpointIdCounter.getAndIncrement(); checkpointStorageLocation = props.isSavepoint() ? checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) : checkpointStorage.initializeLocationForCheckpoint(checkpointID); } catch (Throwable t) { int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn(“Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).”, job, numUnsuccessful, t); return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } final PendingCheckpoint checkpoint = new PendingCheckpoint( job, checkpointID, timestamp, ackTasks, props, checkpointStorageLocation, executor); if (statsTracker != null) { PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint( checkpointID, timestamp, props); checkpoint.setStatsCallback(callback); } // schedule the timer that will clean up the expired checkpoints final Runnable canceller = () -> { synchronized (lock) { // only do the work if the checkpoint is not discarded anyways // note that checkpoint completion discards the pending checkpoint object if (!checkpoint.isDiscarded()) { LOG.info(“Checkpoint {} of job {} expired before completing.”, checkpointID, job); checkpoint.abortExpired(); pendingCheckpoints.remove(checkpointID); rememberRecentCheckpointId(checkpointID); triggerQueuedRequests(); } } }; try { // re-acquire the coordinator-wide lock synchronized (lock) { // since we released the lock in the meantime, we need to re-check // that the conditions still hold. if (shutdown) { return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } else if (!props.forceCheckpoint()) { if (triggerRequestQueued) { LOG.warn(“Trying to trigger another checkpoint for job {} while one was queued already.”, job); return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); } if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } // make sure the minimum interval between checkpoints has passed final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; if (durationTillNextMillis > 0) { if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } // Reassign the new trigger to the currentPeriodicTrigger currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } LOG.info(“Triggering checkpoint {} @ {} for job {}.”, checkpointID, timestamp, job); pendingCheckpoints.put(checkpointID, checkpoint); ScheduledFuture<?> cancellerHandle = timer.schedule( canceller, checkpointTimeout, TimeUnit.MILLISECONDS); if (!checkpoint.setCancellerHandle(cancellerHandle)) { // checkpoint is already disposed! cancellerHandle.cancel(false); } // trigger the master hooks for the checkpoint final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(), checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout)); for (MasterState s : masterStates) { checkpoint.addMasterState(s); } } // end of lock scope final CheckpointOptions checkpointOptions = new CheckpointOptions( props.getCheckpointType(), checkpointStorageLocation.getLocationReference()); // send the messages to the tasks that trigger their checkpoint for (Execution execution: executions) { execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); } numUnsuccessfulCheckpointsTriggers.set(0); return new CheckpointTriggerResult(checkpoint); } catch (Throwable t) { // guard the map against concurrent modifications synchronized (lock) { pendingCheckpoints.remove(checkpointID); } int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn(“Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)”, checkpointID, job, numUnsuccessful, t); if (!checkpoint.isDiscarded()) { checkpoint.abortError(new Exception(“Failed to trigger checkpoint”, t)); } try { checkpointStorageLocation.disposeOnFailure(); } catch (Throwable t2) { LOG.warn(“Cannot dispose failed checkpoint storage location {}”, checkpointStorageLocation, t2); } return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } } // end trigger lock } //……}首先判断如果不是forceCheckpoint的话,则判断当前的pendingCheckpoints值是否超过maxConcurrentCheckpointAttempts,超过的话,立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);之后判断距离lastCheckpointCompletionNanos的时间是否大于等于minPauseBetweenCheckpointsNanos,否则fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS),确保checkpoint不被频繁触发之后检查tasksToTrigger的任务(触发checkpoint的时候需要通知到的task)是否都处于RUNNING状态,不是的话则立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)之后检查tasksToWaitFor的任务(需要在执行成功的时候ack checkpoint的任务)是否都处于RUNNING状态,不是的话立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)前面几步检查通过了之后才开始真正的checkpoint的触发,它首先分配一个checkpointID,然后初始化checkpointStorageLocation,如果异常则返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);之后创建PendingCheckpoint,同时准备canceller(用于在失效的时候执行abort操作);之后对于不是forceCheckpoint的,再重新来一轮TOO_MANY_CONCURRENT_CHECKPOINTS、MINIMUM_TIME_BETWEEN_CHECKPOINTS校验最后就是针对Execution,挨个触发execution的triggerCheckpoint操作,成功返回CheckpointTriggerResult(checkpoint),异常则返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION)Execution.triggerCheckpointflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.javapublic class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload { /* * Trigger a new checkpoint on the task of this execution. * * @param checkpointId of th checkpoint to trigger * @param timestamp of the checkpoint to trigger * @param checkpointOptions of the checkpoint to trigger / public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { final LogicalSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions); } else { LOG.debug(“The execution has no slot assigned. This indicates that the execution is " + “no longer running.”); } } //……}triggerCheckpoint主要是调用taskManagerGateway.triggerCheckpoint,这里的taskManagerGateway为RpcTaskManagerGatewayRpcTaskManagerGatewayflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java/* * Implementation of the {@link TaskManagerGateway} for Flink’s RPC system. /public class RpcTaskManagerGateway implements TaskManagerGateway { private final TaskExecutorGateway taskExecutorGateway; public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { taskExecutorGateway.triggerCheckpoint( executionAttemptID, checkpointId, timestamp, checkpointOptions); } //……}RpcTaskManagerGateway的triggerCheckpoint方法调用taskExecutorGateway.triggerCheckpoint,这里的taskExecutorGateway为AkkaInvocationHandler,通过rpc通知TaskExecutorTaskExecutor.triggerCheckpointflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java/* * TaskExecutor implementation. The task executor is responsible for the execution of multiple * {@link Task}. /public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { public CompletableFuture<Acknowledge> triggerCheckpoint( ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) { log.debug(“Trigger checkpoint {}@{} for {}.”, checkpointId, checkpointTimestamp, executionAttemptID); final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions); return CompletableFuture.completedFuture(Acknowledge.get()); } else { final String message = “TaskManager received a checkpoint request for unknown task " + executionAttemptID + ‘.’; log.debug(message); return FutureUtils.completedExceptionally(new CheckpointException(message)); } } //……}TaskExecutor的triggerCheckpoint方法这里调用task.triggerCheckpointBarrierTask.triggerCheckpointBarrierflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javapublic class Task implements Runnable, TaskActions, CheckpointListener { /* The invokable of this task, if initialized. All accesses must copy the reference and * check for null, as this field is cleared as part of the disposal logic. / @Nullable private volatile AbstractInvokable invokable; /* * Calls the invokable to trigger a checkpoint. * * @param checkpointID The ID identifying the checkpoint. * @param checkpointTimestamp The timestamp associated with the checkpoint. * @param checkpointOptions Options for performing this checkpoint. / public void triggerCheckpointBarrier( final long checkpointID, long checkpointTimestamp, final CheckpointOptions checkpointOptions) { final AbstractInvokable invokable = this.invokable; final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); if (executionState == ExecutionState.RUNNING && invokable != null) { // build a local closure final String taskName = taskNameWithSubtask; final SafetyNetCloseableRegistry safetyNetCloseableRegistry = FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread(); Runnable runnable = new Runnable() { @Override public void run() { // set safety net from the task’s context for checkpointing thread LOG.debug(“Creating FileSystem stream leak safety net for {}”, Thread.currentThread().getName()); FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try { boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions); if (!success) { checkpointResponder.declineCheckpoint( getJobID(), getExecutionId(), checkpointID, new CheckpointDeclineTaskNotReadyException(taskName)); } } catch (Throwable t) { if (getExecutionState() == ExecutionState.RUNNING) { failExternally(new Exception( “Error while triggering checkpoint " + checkpointID + " for " + taskNameWithSubtask, t)); } else { LOG.debug(“Encountered error while triggering checkpoint {} for " + “{} ({}) while being not in state running.”, checkpointID, taskNameWithSubtask, executionId, t); } } finally { FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null); } } }; executeAsyncCallRunnable(runnable, String.format(“Checkpoint Trigger for %s (%s).”, taskNameWithSubtask, executionId)); } else { LOG.debug(“Declining checkpoint request for non-running task {} ({}).”, taskNameWithSubtask, executionId); // send back a message that we did not do the checkpoint checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask)); } } //……}Task的triggerCheckpointBarrier方法首先判断executionState是否RUNNING以及invokable是否不为null,不满足条件则执行checkpointResponder.declineCheckpoint满足条件则执行executeAsyncCallRunnable(runnable, String.format(“Checkpoint Trigger for %s (%s).”, taskNameWithSubtask, executionId))这个runnable方法里头会执行invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions),这里的invokable为SourceStreamTaskSourceStreamTask.triggerCheckpointflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java@Internalpublic class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> { private volatile boolean externallyInducedCheckpoints; @Override public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { if (!externallyInducedCheckpoints) { return super.triggerCheckpoint(checkpointMetaData, checkpointOptions); } else { // we do not trigger checkpoints here, we simply state whether we can trigger them synchronized (getCheckpointLock()) { return isRunning(); } } } //……}SourceStreamTask的triggerCheckpoint先判断,如果externallyInducedCheckpoints为false,则调用父类StreamTask的triggerCheckpointStreamTask.triggerCheckpoint@Internalpublic abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { @Override public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { try { // No alignment if we inject a checkpoint CheckpointMetrics checkpointMetrics = new CheckpointMetrics() .setBytesBufferedInAlignment(0L) .setAlignmentDurationNanos(0L); return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics); } catch (Exception e) { // propagate exceptions only if the task is still in “running” state if (isRunning) { throw new Exception(“Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + ‘.’, e); } else { LOG.debug(“Could not perform checkpoint {} for operator {} while the " + “invokable was not in state running.”, checkpointMetaData.getCheckpointId(), getName(), e); return false; } } } private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { LOG.debug(“Starting checkpoint ({}) {} on task {}”, checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); synchronized (lock) { if (isRunning) { // we can do a checkpoint // All of the following steps happen as an atomic step from the perspective of barriers and // records/watermarks/timers/callbacks. // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream // checkpoint alignments // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work. // The pre-barrier work should be nothing or minimal in the common case. operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId()); // Step (2): Send the checkpoint barrier downstream operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); // Step (3): Take the state snapshot. This should be largely asynchronous, to not // impact progress of the streaming topology checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); return true; } else { // we cannot perform our checkpoint - let the downstream operators know that they // should not wait for any input from this operator // we cannot broadcast the cancellation markers on the ‘operator chain’, because it may not // yet be created final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); Exception exception = null; for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) { try { streamRecordWriter.broadcastEvent(message); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed( new Exception(“Could not send cancel checkpoint marker to downstream tasks.”, e), exception); } } if (exception != null) { throw exception; } return false; } } } private void checkpointState( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation( checkpointMetaData.getCheckpointId(), checkpointOptions.getTargetLocation()); CheckpointingOperation checkpointingOperation = new CheckpointingOperation( this, checkpointMetaData, checkpointOptions, storage, checkpointMetrics); checkpointingOperation.executeCheckpointing(); } //……}StreamTask的triggerCheckpoint方法的主要处理逻辑在performCheckpoint方法上,该方法针对task的isRunning分别进行不同处理isRunning为true的时候,这里头分了三步来处理,第一步执行operatorChain.prepareSnapshotPreBarrier,第二步执行operatorChain.broadcastCheckpointBarrier,第三步执行checkpointState方法,checkpointState里头创建CheckpointingOperation,然后调用checkpointingOperation.executeCheckpointing()如果isRunning为false,则这里streamRecordWriter.broadcastEvent(message),这里的message为CancelCheckpointMarkerOperatorChain.prepareSnapshotPreBarrierflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java@Internalpublic class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer { public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { // go forward through the operator chain and tell each operator // to prepare the checkpoint final StreamOperator<?>[] operators = this.allOperators; for (int i = operators.length - 1; i >= 0; –i) { final StreamOperator<?> op = operators[i]; if (op != null) { op.prepareSnapshotPreBarrier(checkpointId); } } } //……}OperatorChain的prepareSnapshotPreBarrier会遍历allOperators挨个调用StreamOperator的prepareSnapshotPreBarrier方法OperatorChain.broadcastCheckpointBarrierflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java@Internalpublic class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer { public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException { CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions); for (RecordWriterOutput<?> streamOutput : streamOutputs) { streamOutput.broadcastEvent(barrier); } } //……}OperatorChain的broadcastCheckpointBarrier方法则会遍历streamOutputs挨个调用streamOutput的broadcastEvent方法CheckpointingOperation.executeCheckpointingflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java private static final class CheckpointingOperation { private final StreamTask<?, ?> owner; private final CheckpointMetaData checkpointMetaData; private final CheckpointOptions checkpointOptions; private final CheckpointMetrics checkpointMetrics; private final CheckpointStreamFactory storageLocation; private final StreamOperator<?>[] allOperators; private long startSyncPartNano; private long startAsyncPartNano; // ———————— private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress; public CheckpointingOperation( StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStorageLocation, CheckpointMetrics checkpointMetrics) { this.owner = Preconditions.checkNotNull(owner); this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData); this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions); this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics); this.storageLocation = Preconditions.checkNotNull(checkpointStorageLocation); this.allOperators = owner.operatorChain.getAllOperators(); this.operatorSnapshotsInProgress = new HashMap<>(allOperators.length); } public void executeCheckpointing() throws Exception { startSyncPartNano = System.nanoTime(); try { for (StreamOperator<?> op : allOperators) { checkpointStreamOperator(op); } if (LOG.isDebugEnabled()) { LOG.debug(“Finished synchronous checkpoints for checkpoint {} on task {}”, checkpointMetaData.getCheckpointId(), owner.getName()); } startAsyncPartNano = System.nanoTime(); checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000); // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( owner, operatorSnapshotsInProgress, checkpointMetaData, checkpointMetrics, startAsyncPartNano); owner.cancelables.registerCloseable(asyncCheckpointRunnable); owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable); if (LOG.isDebugEnabled()) { LOG.debug(”{} - finished synchronous part of checkpoint {}. " + “Alignment duration: {} ms, snapshot duration {} ms”, owner.getName(), checkpointMetaData.getCheckpointId(), checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, checkpointMetrics.getSyncDurationMillis()); } } catch (Exception ex) { // Cleanup to release resources for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) { if (null != operatorSnapshotResult) { try { operatorSnapshotResult.cancel(); } catch (Exception e) { LOG.warn(“Could not properly cancel an operator snapshot result.”, e); } } } if (LOG.isDebugEnabled()) { LOG.debug(”{} - did NOT finish synchronous part of checkpoint {}. " + “Alignment duration: {} ms, snapshot duration {} ms”, owner.getName(), checkpointMetaData.getCheckpointId(), checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, checkpointMetrics.getSyncDurationMillis()); } owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex); } } @SuppressWarnings(“deprecation”) private void checkpointStreamOperator(StreamOperator<?> op) throws Exception { if (null != op) { OperatorSnapshotFutures snapshotInProgress = op.snapshotState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions, storageLocation); operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress); } } private enum AsyncCheckpointState { RUNNING, DISCARDED, COMPLETED } }CheckpointingOperation定义在StreamTask类里头,executeCheckpointing方法先对所有的StreamOperator执行checkpointStreamOperator操作,checkpointStreamOperator方法会调用StreamOperator的snapshotState方法,之后创建AsyncCheckpointRunnable任务并提交异步运行AbstractStreamOperator.snapshotStateflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java@PublicEvolvingpublic abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, Serializable { @Override public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception { KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures(); try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, factory, keyGroupRange, getContainingTask().getCancelables())) { snapshotState(snapshotContext); snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } } catch (Exception snapshotException) { try { snapshotInProgress.cancel(); } catch (Exception e) { snapshotException.addSuppressed(e); } String snapshotFailMessage = “Could not complete snapshot " + checkpointId + " for operator " + getOperatorName() + “.”; if (!getContainingTask().isCanceled()) { LOG.info(snapshotFailMessage, snapshotException); } throw new Exception(snapshotFailMessage, snapshotException); } return snapshotInProgress; } /* * Stream operators with state, which want to participate in a snapshot need to override this hook method. * * @param context context that provides information and means required for taking a snapshot */ public void snapshotState(StateSnapshotContext context) throws Exception { final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend(); //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots if (keyedStateBackend instanceof AbstractKeyedStateBackend && ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) { KeyedStateCheckpointOutputStream out; try { out = context.getRawKeyedOperatorStateOutput(); } catch (Exception exception) { throw new Exception(“Could not open raw keyed operator state stream for " + getOperatorName() + ‘.’, exception); } try { KeyGroupsList allKeyGroups = out.getKeyGroupList(); for (int keyGroupIdx : allKeyGroups) { out.startNewKeyGroup(keyGroupIdx); timeServiceManager.snapshotStateForKeyGroup( new DataOutputViewStreamWrapper(out), keyGroupIdx); } } catch (Exception exception) { throw new Exception(“Could not write timer service of " + getOperatorName() + " to checkpoint state stream.”, exception); } finally { try { out.close(); } catch (Exception closeException) { LOG.warn(“Could not close raw keyed operator state stream for {}. This " + “might have prevented deleting some state data.”, getOperatorName(), closeException); } } } } //……}AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend类型,而且requiresLegacySynchronousTimerSnapshots为true的条件下才会操作,具体是触发timeServiceManager.snapshotStateForKeyGroup(new DataOutputViewStreamWrapper(out), keyGroupIdx);不过它有不同的子类可能覆盖了snapshotState方法,比如AbstractUdfStreamOperatorAbstractUdfStreamOperatorflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java@PublicEvolvingpublic abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> { @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction); } //……}AbstractUdfStreamOperator覆盖了父类AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作StreamingFunctionUtils.snapshotFunctionStateflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java@Internalpublic final class StreamingFunctionUtils { public static void snapshotFunctionState( StateSnapshotContext context, OperatorStateBackend backend, Function userFunction) throws Exception { Preconditions.checkNotNull(context); Preconditions.checkNotNull(backend); while (true) { if (trySnapshotFunctionState(context, backend, userFunction)) { break; } // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function if (userFunction instanceof WrappingFunction) { userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); } else { break; } } } private static boolean trySnapshotFunctionState( StateSnapshotContext context, OperatorStateBackend backend, Function userFunction) throws Exception { if (userFunction instanceof CheckpointedFunction) { ((CheckpointedFunction) userFunction).snapshotState(context); return true; } if (userFunction instanceof ListCheckpointed) { @SuppressWarnings(“unchecked”) List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction). snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp()); ListState<Serializable> listState = backend. getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); listState.clear(); if (null != partitionableState) { try { for (Serializable statePartition : partitionableState) { listState.add(statePartition); } } catch (Exception e) { listState.clear(); throw new Exception(“Could not write partitionable state to operator " + “state backend.”, e); } } return true; } return false; } //……}snapshotFunctionState方法,这里执行了trySnapshotFunctionState操作,这里userFunction的类型,如果实现了CheckpointedFunction接口,则调用CheckpointedFunction.snapshotState,如果实现了ListCheckpointed接口,则调用ListCheckpointed.snapshotState方法,注意这里先clear了ListState,然后调用ListState.add方法将返回的List添加到ListState中小结flink的CheckpointCoordinatorDeActivator在job的status为RUNNING的时候会触发CheckpointCoordinator的startCheckpointScheduler,非RUNNING的时候调用CheckpointCoordinator的stopCheckpointScheduler方法CheckpointCoordinator的startCheckpointScheduler主要是注册了ScheduledTrigger任务,其run方法执行triggerCheckpoint操作,triggerCheckpoint方法在真正触发checkpoint之前会进行一系列的校验,不满足则立刻fail fast,其中可能的原因有(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS、CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS、NOT_ALL_REQUIRED_TASKS_RUNNING);满足条件的话,就是挨个遍历executions,调用Execution.triggerCheckpoint,它借助taskManagerGateway.triggerCheckpoint来通过rpc调用TaskExecutor的triggerCheckpoint方法TaskExecutor的triggerCheckpoint主要是调用Task的triggerCheckpointBarrier方法,后者主要是异步执行一个runnable,里头的run方法是调用invokable.triggerCheckpoint,这里的invokable为SourceStreamTask,而它主要是调用父类StreamTask的triggerCheckpoint方法,该方法的主要逻辑在performCheckpoint操作上;performCheckpoint在isRunning为true的时候,分了三步来处理,第一步执行operatorChain.prepareSnapshotPreBarrier,第二步执行operatorChain.broadcastCheckpointBarrier,第三步执行checkpointState方法,checkpointState里头创建CheckpointingOperation,然后调用checkpointingOperation.executeCheckpointing()CheckpointingOperation的executeCheckpointing方法会对所有的StreamOperator执行checkpointStreamOperator操作,而checkpointStreamOperator方法会调用StreamOperator的snapshotState方法;AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend类型,而且requiresLegacySynchronousTimerSnapshots为true的条件下才会操作AbstractUdfStreamOperator覆盖了父类AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作,该操作会根据userFunction的类型调用相应的方法(如果实现了CheckpointedFunction接口,则调用CheckpointedFunction.snapshotState,如果实现了ListCheckpointed接口,则调用ListCheckpointed.snapshotState方法)docWorking with State ...

December 7, 2018 · 16 min · jiezi

聊聊flink的ListCheckpointed

序本文主要研究一下flink的ListCheckpointed实例public static class CounterSource extends RichParallelSourceFunction<Long> implements ListCheckpointed<Long> { /** current offset for exactly once semantics / private Long offset; /* flag for job cancellation / private volatile boolean isRunning = true; @Override public void run(SourceContext<Long> ctx) { final Object lock = ctx.getCheckpointLock(); while (isRunning) { // output and state update are atomic synchronized (lock) { ctx.collect(offset); offset += 1; } } } @Override public void cancel() { isRunning = false; } @Override public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) { return Collections.singletonList(offset); } @Override public void restoreState(List<Long> state) { for (Long s : state) offset = s; }}CounterSource是一个有状态的RichParallelSourceFunction,它实现了ListCheckpointed接口,snapshotState方法返回了当前的offset,而restoreState方法则根据传入的state来恢复本地的offset;这里要注意,如果要在failure或者recovery的时候达到exactly-once的语义,这里更新offset的时候要使用SourceContext.getCheckpointLock来进行同步操作ListCheckpointedflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java@PublicEvolvingpublic interface ListCheckpointed<T extends Serializable> { /* * Gets the current state of the function. The state must reflect the result of all prior * invocations to this function. * * <p>The returned list should contain one entry for redistributable unit of state. See * the {@link ListCheckpointed class docs} for an illustration how list-style state * redistribution works. * * <p>As special case, the returned list may be null or empty (if the operator has no state) * or it may contain a single element (if the operator state is indivisible). * * @param checkpointId The ID of the checkpoint - a unique and monotonously increasing value. * @param timestamp The wall clock timestamp when the checkpoint was triggered by the master. * * @return The operator state in a list of redistributable, atomic sub-states. * Should not return null, but empty list instead. * * @throws Exception Thrown if the creation of the state object failed. This causes the * checkpoint to fail. The system may decide to fail the operation (and trigger * recovery), or to discard this checkpoint attempt and to continue running * and to try again with the next checkpoint attempt. / List<T> snapshotState(long checkpointId, long timestamp) throws Exception; /* * Restores the state of the function or operator to that of a previous checkpoint. * This method is invoked when the function is executed after a failure recovery. * The state list may be empty if no state is to be recovered by the particular parallel instance * of the function. * * <p>The given state list will contain all the <i>sub states</i> that this parallel * instance of the function needs to handle. Refer to the {@link ListCheckpointed class docs} * for an illustration how list-style state redistribution works. * * <p><b>Important:</b> When implementing this interface together with {@link RichFunction}, * then the {@code restoreState()} method is called before {@link RichFunction#open(Configuration)}. * * @param state The state to be restored as a list of atomic sub-states. * * @throws Exception Throwing an exception in this method causes the recovery to fail. * The exact consequence depends on the configured failure handling strategy, * but typically the system will re-attempt the recovery, or try recovering * from a different checkpoint. */ void restoreState(List<T> state) throws Exception;}ListCheckpointed定义了两个接口,一个是snapshotState方法,一个是restoreState方法snapshotState方法,方法有个checkpointId参数,是唯一单调递增的数字,而timestamp则是master触发checkpoint的时间戳,该方法要返回当前的state(List结构)restoreState方法会在failure recovery的时候被调用,传递的参数为List类型的state,方法里头可以将state恢复到本地小结stateful function可以通过CheckpointedFunction接口或者ListCheckpointed接口来使用managed operator state;对于manageed operator state,目前仅仅支持list-style的形式,即要求state是serializable objects的List结构,方便在rescale的时候进行redistributed;关于redistribution schemes的模式目前有两种,分别是Even-split redistribution(在restore/redistribution的时候每个operator仅仅得到整个state的sublist)及Union redistribution(在restore/redistribution的时候每个operator得到整个state的完整list)ListCheckpointed是CheckpointedFunction的限制版,它只能支持Even-split redistribution模式的list-style stateListCheckpointed定义了两个方法,分别是snapshotState方法及restoreState方法;snapshotState方法在master触发checkpoint的时候被调用,用户需要返回当前的状态,而restoreState方法会在failure recovery的时候被调用,传递的参数为List类型的state,方法里头可以将state恢复到本地docWorking with State listcheckpointed聊聊flink的CheckpointedFunction ...

December 6, 2018 · 3 min · jiezi

聊聊flink的CheckpointedFunction

序本文主要研究一下flink的CheckpointedFunction实例public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction { private final int threshold; private transient ListState<Tuple2<String, Integer>> checkpointedState; private List<Tuple2<String, Integer>> bufferedElements; public BufferingSink(int threshold) { this.threshold = threshold; this.bufferedElements = new ArrayList<>(); } @Override public void invoke(Tuple2<String, Integer> value) throws Exception { bufferedElements.add(value); if (bufferedElements.size() == threshold) { for (Tuple2<String, Integer> element: bufferedElements) { // send it to the sink } bufferedElements.clear(); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { checkpointedState.clear(); for (Tuple2<String, Integer> element : bufferedElements) { checkpointedState.add(element); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<>( “buffered-elements”, TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})); checkpointedState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()) { for (Tuple2<String, Integer> element : checkpointedState.get()) { bufferedElements.add(element); } } }}这个BufferingSink实现了CheckpointedFunction接口,它定义了ListState类型的checkpointedState,以及List结构的bufferedElements在invoke方法里头先将value缓存到bufferedElements,缓存个数触发阈值时,执行sink操作,然后清空bufferedElements在snapshotState方法里头对bufferedElements进行snapshot操作,在initializeState先创建ListStateDescriptor,然后通过FunctionInitializationContext.getOperatorStateStore().getListState(descriptor)来获取ListState,之后判断state是否有在前一次execution的snapshot中restored,如果有则将ListState中的数据恢复到bufferedElementsCheckpointedFunctionflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java@PublicEvolving@SuppressWarnings(“deprecation”)public interface CheckpointedFunction { /** * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself. * * @param context the context for drawing a snapshot of the operator * @throws Exception / void snapshotState(FunctionSnapshotContext context) throws Exception; /* * This method is called when the parallel function instance is created during distributed * execution. Functions typically set up their state storing data structures in this method. * * @param context the context for initializing the operator * @throws Exception / void initializeState(FunctionInitializationContext context) throws Exception;}CheckpointedFunction是stateful transformation functions的核心接口,用于跨stream维护statesnapshotState在checkpoint的时候会被调用,用于snapshot state,通常用于flush、commit、synchronize外部系统initializeState在parallel function被创建时调用,通常用于初始化存储于state的数据FunctionSnapshotContextflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/FunctionSnapshotContext.java/* * This interface provides a context in which user functions that use managed state (i.e. state that is managed by state * backends) can participate in a snapshot. As snapshots of the backends themselves are taken by the system, this * interface mainly provides meta information about the checkpoint. /@PublicEvolvingpublic interface FunctionSnapshotContext extends ManagedSnapshotContext {}FunctionSnapshotContext继承了ManagedSnapshotContext接口ManagedSnapshotContextflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ManagedSnapshotContext.java/* * This interface provides a context in which operators that use managed state (i.e. state that is managed by state * backends) can perform a snapshot. As snapshots of the backends themselves are taken by the system, this interface * mainly provides meta information about the checkpoint. /@PublicEvolvingpublic interface ManagedSnapshotContext { /* * Returns the ID of the checkpoint for which the snapshot is taken. * * <p>The checkpoint ID is guaranteed to be strictly monotonously increasing across checkpoints. * For two completed checkpoints <i>A</i> and <i>B</i>, {@code ID_B > ID_A} means that checkpoint * <i>B</i> subsumes checkpoint <i>A</i>, i.e., checkpoint <i>B</i> contains a later state * than checkpoint <i>A</i>. / long getCheckpointId(); /* * Returns timestamp (wall clock time) when the master node triggered the checkpoint for which * the state snapshot is taken. / long getCheckpointTimestamp();}ManagedSnapshotContext定义了getCheckpointId、getCheckpointTimestamp方法FunctionInitializationContextflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/FunctionInitializationContext.java/* * This interface provides a context in which user functions can initialize by registering to managed state (i.e. state * that is managed by state backends). * * <p> * Operator state is available to all functions, while keyed state is only available for functions after keyBy. * * <p> * For the purpose of initialization, the context signals if the state is empty or was restored from a previous * execution. * /@PublicEvolvingpublic interface FunctionInitializationContext extends ManagedInitializationContext {}FunctionInitializationContext继承了ManagedInitializationContext接口ManagedInitializationContextflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ManagedInitializationContext.java/* * This interface provides a context in which operators can initialize by registering to managed state (i.e. state that * is managed by state backends). * * <p> * Operator state is available to all operators, while keyed state is only available for operators after keyBy. * * <p> * For the purpose of initialization, the context signals if the state is empty (new operator) or was restored from * a previous execution of this operator. * /public interface ManagedInitializationContext { /* * Returns true, if state was restored from the snapshot of a previous execution. This returns always false for * stateless tasks. / boolean isRestored(); /* * Returns an interface that allows for registering operator state with the backend. / OperatorStateStore getOperatorStateStore(); /* * Returns an interface that allows for registering keyed state with the backend. */ KeyedStateStore getKeyedStateStore();}ManagedInitializationContext接口定义了isRestored、getOperatorStateStore、getKeyedStateStore方法小结flink有两种基本的state,分别是Keyed State以及Operator State(non-keyed state);其中Keyed State只能在KeyedStream上的functions及operators上使用;每个operator state会跟parallel operator中的一个实例绑定;Operator State支持parallelism变更时进行redistributingKeyed State及Operator State都分别有managed及raw两种形式,managed由flink runtime来管理,由runtime负责encode及写入checkpoint;raw形式的state由operators自己管理,flink runtime无法了解该state的数据结构,将其视为raw bytes;所有的datastream function都可以使用managed state,而raw state一般仅限于自己实现operators来使用stateful function可以通过CheckpointedFunction接口或者ListCheckpointed接口来使用managed operator state;CheckpointedFunction定义了snapshotState、initializeState两个方法;每当checkpoint执行的时候,snapshotState会被调用;而initializeState方法在每次用户定义的function初始化的时候(第一次初始化或者从前一次checkpoint recover的时候)被调用,该方法不仅可以用来初始化state,还可以用于处理state recovery的逻辑对于manageed operator state,目前仅仅支持list-style的形式,即要求state是serializable objects的List结构,方便在rescale的时候进行redistributed;关于redistribution schemes的模式目前有两种,分别是Even-split redistribution(在restore/redistribution的时候每个operator仅仅得到整个state的sublist)及Union redistribution(在restore/redistribution的时候每个operator得到整个state的完整list)FunctionSnapshotContext继承了ManagedSnapshotContext接口,它定义了getCheckpointId、getCheckpointTimestamp方法;FunctionInitializationContext继承了ManagedInitializationContext接口,它定义了isRestored、getOperatorStateStore、getKeyedStateStore方法,可以用来判断是否是在前一次execution的snapshot中restored,以及获取OperatorStateStore、KeyedStateStore对象docUsing Managed Operator State ...

December 5, 2018 · 4 min · jiezi

聊聊flink的JDBCOutputFormat

序本文主要研究一下flink的JDBCOutputFormatJDBCOutputFormatflink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java/** * OutputFormat to write Rows into a JDBC database. * The OutputFormat has to be configured using the supplied OutputFormatBuilder. * * @see Row * @see DriverManager /public class JDBCOutputFormat extends RichOutputFormat<Row> { private static final long serialVersionUID = 1L; static final int DEFAULT_BATCH_INTERVAL = 5000; private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); private String username; private String password; private String drivername; private String dbURL; private String query; private int batchInterval = DEFAULT_BATCH_INTERVAL; private Connection dbConn; private PreparedStatement upload; private int batchCount = 0; private int[] typesArray; public JDBCOutputFormat() { } @Override public void configure(Configuration parameters) { } /* * Connects to the target database and initializes the prepared statement. * * @param taskNumber The number of the parallel instance. * @throws IOException Thrown, if the output could not be opened due to an * I/O problem. / @Override public void open(int taskNumber, int numTasks) throws IOException { try { establishConnection(); upload = dbConn.prepareStatement(query); } catch (SQLException sqe) { throw new IllegalArgumentException(“open() failed.”, sqe); } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException(“JDBC driver class not found.”, cnfe); } } private void establishConnection() throws SQLException, ClassNotFoundException { Class.forName(drivername); if (username == null) { dbConn = DriverManager.getConnection(dbURL); } else { dbConn = DriverManager.getConnection(dbURL, username, password); } } /* * Adds a record to the prepared statement. * * <p>When this method is called, the output format is guaranteed to be opened. * * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to * insert a null value but it’s not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null)) * * @param row The records to add to the output. * @see PreparedStatement * @throws IOException Thrown, if the records could not be added due to an I/O problem. / @Override public void writeRecord(Row row) throws IOException { if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) { LOG.warn(“Column SQL types array doesn’t match arity of passed Row! Check the passed array…”); } try { if (typesArray == null) { // no types provided for (int index = 0; index < row.getArity(); index++) { LOG.warn(“Unknown column type for column {}. Best effort approach to set its value: {}.”, index + 1, row.getField(index)); upload.setObject(index + 1, row.getField(index)); } } else { // types provided for (int index = 0; index < row.getArity(); index++) { if (row.getField(index) == null) { upload.setNull(index + 1, typesArray[index]); } else { // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html switch (typesArray[index]) { case java.sql.Types.NULL: upload.setNull(index + 1, typesArray[index]); break; case java.sql.Types.BOOLEAN: case java.sql.Types.BIT: upload.setBoolean(index + 1, (boolean) row.getField(index)); break; case java.sql.Types.CHAR: case java.sql.Types.NCHAR: case java.sql.Types.VARCHAR: case java.sql.Types.LONGVARCHAR: case java.sql.Types.LONGNVARCHAR: upload.setString(index + 1, (String) row.getField(index)); break; case java.sql.Types.TINYINT: upload.setByte(index + 1, (byte) row.getField(index)); break; case java.sql.Types.SMALLINT: upload.setShort(index + 1, (short) row.getField(index)); break; case java.sql.Types.INTEGER: upload.setInt(index + 1, (int) row.getField(index)); break; case java.sql.Types.BIGINT: upload.setLong(index + 1, (long) row.getField(index)); break; case java.sql.Types.REAL: upload.setFloat(index + 1, (float) row.getField(index)); break; case java.sql.Types.FLOAT: case java.sql.Types.DOUBLE: upload.setDouble(index + 1, (double) row.getField(index)); break; case java.sql.Types.DECIMAL: case java.sql.Types.NUMERIC: upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index)); break; case java.sql.Types.DATE: upload.setDate(index + 1, (java.sql.Date) row.getField(index)); break; case java.sql.Types.TIME: upload.setTime(index + 1, (java.sql.Time) row.getField(index)); break; case java.sql.Types.TIMESTAMP: upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index)); break; case java.sql.Types.BINARY: case java.sql.Types.VARBINARY: case java.sql.Types.LONGVARBINARY: upload.setBytes(index + 1, (byte[]) row.getField(index)); break; default: upload.setObject(index + 1, row.getField(index)); LOG.warn(“Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.”, typesArray[index], index + 1, row.getField(index)); // case java.sql.Types.SQLXML // case java.sql.Types.ARRAY: // case java.sql.Types.JAVA_OBJECT: // case java.sql.Types.BLOB: // case java.sql.Types.CLOB: // case java.sql.Types.NCLOB: // case java.sql.Types.DATALINK: // case java.sql.Types.DISTINCT: // case java.sql.Types.OTHER: // case java.sql.Types.REF: // case java.sql.Types.ROWID: // case java.sql.Types.STRUC } } } } upload.addBatch(); batchCount++; } catch (SQLException e) { throw new RuntimeException(“Preparation of JDBC statement failed.”, e); } if (batchCount >= batchInterval) { // execute batch flush(); } } void flush() { try { upload.executeBatch(); batchCount = 0; } catch (SQLException e) { throw new RuntimeException(“Execution of JDBC statement failed.”, e); } } int[] getTypesArray() { return typesArray; } /* * Executes prepared statement and closes all resources of this instance. * * @throws IOException Thrown, if the input could not be closed properly. / @Override public void close() throws IOException { if (upload != null) { flush(); // close the connection try { upload.close(); } catch (SQLException e) { LOG.info(“JDBC statement could not be closed: " + e.getMessage()); } finally { upload = null; } } if (dbConn != null) { try { dbConn.close(); } catch (SQLException se) { LOG.info(“JDBC connection could not be closed: " + se.getMessage()); } finally { dbConn = null; } } } public static JDBCOutputFormatBuilder buildJDBCOutputFormat() { return new JDBCOutputFormatBuilder(); } //……}JDBCOutputFormat继承了RichOutputFormat,这里的泛型为org.apache.flink.types.Rowopen的时候调用了establishConnection来加载驱动,初始化dbConn,然后调用dbConn.prepareStatement(query)来获取upload(PreparedStatement)writeRecord方法先判断是否有提供typesArray,没有的话则使用setObject来设置值,有点话则根据对应的类型进行转换,这里支持了多种java.sql.Types里头的类型writeRecord采取的是PreparedStatement.addBatch操作,当batchCount大于等于batchInterval(默认5000),会执行flush操作,也就是调用PreparedStatement.executeBatch方法,然后重置batchCount;为了以防数据没达到batchInterval而未能提交,在close的时候会再次执行flush操作,然后才关闭PreparedStatement、ConnectionJDBCOutputFormat提供了一个JDBCOutputFormatBuilder,可以用来方便构建JDBCOutputFormatRowflink-core-1.7.0-sources.jar!/org/apache/flink/types/Row.java/* * A Row can have arbitrary number of fields and contain a set of fields, which may all be * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink’s * type extraction mechanism can’t extract correct field types. So that users should manually * tell Flink the type information via creating a {@link RowTypeInfo}. * * <p> * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can * set fields by {@link #setField(int, Object)}. * <p> * Row is in principle serializable. However, it may contain non-serializable fields, * in which case serialization will fail. * /@PublicEvolvingpublic class Row implements Serializable{ private static final long serialVersionUID = 1L; /* The array to store actual values. / private final Object[] fields; /* * Create a new Row instance. * @param arity The number of fields in the Row / public Row(int arity) { this.fields = new Object[arity]; } /* * Get the number of fields in the Row. * @return The number of fields in the Row. / public int getArity() { return fields.length; } /* * Gets the field at the specified position. * @param pos The position of the field, 0-based. * @return The field at the specified position. * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. / public Object getField(int pos) { return fields[pos]; } /* * Sets the field at the specified position. * * @param pos The position of the field, 0-based. * @param value The value to be assigned to the field at the specified position. * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. / public void setField(int pos, Object value) { fields[pos] = value; } @Override public String toString() { StringBuilder sb = new StringBuilder(); for (int i = 0; i < fields.length; i++) { if (i > 0) { sb.append(”,”); } sb.append(StringUtils.arrayAwareToString(fields[i])); } return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } Row row = (Row) o; return Arrays.deepEquals(fields, row.fields); } @Override public int hashCode() { return Arrays.deepHashCode(fields); } /* * Creates a new Row and assigns the given values to the Row’s fields. * This is more convenient than using the constructor. * * <p>For example: * * <pre> * Row.of(“hello”, true, 1L);} * </pre> * instead of * <pre> * Row row = new Row(3); * row.setField(0, “hello”); * row.setField(1, true); * row.setField(2, 1L); * </pre> * / public static Row of(Object… values) { Row row = new Row(values.length); for (int i = 0; i < values.length; i++) { row.setField(i, values[i]); } return row; } /* * Creates a new Row which copied from another row. * This method does not perform a deep copy. * * @param row The row being copied. * @return The cloned new Row / public static Row copy(Row row) { final Row newRow = new Row(row.fields.length); System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length); return newRow; } /* * Creates a new Row with projected fields from another row. * This method does not perform a deep copy. * * @param fields fields to be projected * @return the new projected Row / public static Row project(Row row, int[] fields) { final Row newRow = new Row(fields.length); for (int i = 0; i < fields.length; i++) { newRow.fields[i] = row.fields[fields[i]]; } return newRow; }}Row是JDBCOutputFormat的writeRecord的类型,它里头使用Object数据来存取字段值,同时也提供了诸如of、copy、project等静态方法JDBCOutputFormatBuilderflink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java /* * Builder for a {@link JDBCOutputFormat}. / public static class JDBCOutputFormatBuilder { private final JDBCOutputFormat format; protected JDBCOutputFormatBuilder() { this.format = new JDBCOutputFormat(); } public JDBCOutputFormatBuilder setUsername(String username) { format.username = username; return this; } public JDBCOutputFormatBuilder setPassword(String password) { format.password = password; return this; } public JDBCOutputFormatBuilder setDrivername(String drivername) { format.drivername = drivername; return this; } public JDBCOutputFormatBuilder setDBUrl(String dbURL) { format.dbURL = dbURL; return this; } public JDBCOutputFormatBuilder setQuery(String query) { format.query = query; return this; } public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) { format.batchInterval = batchInterval; return this; } public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) { format.typesArray = typesArray; return this; } /* * Finalizes the configuration and checks validity. * * @return Configured JDBCOutputFormat / public JDBCOutputFormat finish() { if (format.username == null) { LOG.info(“Username was not supplied.”); } if (format.password == null) { LOG.info(“Password was not supplied.”); } if (format.dbURL == null) { throw new IllegalArgumentException(“No database URL supplied.”); } if (format.query == null) { throw new IllegalArgumentException(“No query supplied.”); } if (format.drivername == null) { throw new IllegalArgumentException(“No driver supplied.”); } return format; } }JDBCOutputFormatBuilder提供了对username、password、dbURL、query、drivername、batchInterval、typesArray这几个属性的builder方法JDBCAppendTableSinkflink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java/* * An at-least-once Table sink for JDBC. * * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if * checkpointing is enabled). However, one common use case is to run idempotent queries * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and * achieve exactly-once semantic.</p> */public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> { private final JDBCOutputFormat outputFormat; private String[] fieldNames; private TypeInformation[] fieldTypes; JDBCAppendTableSink(JDBCOutputFormat outputFormat) { this.outputFormat = outputFormat; } public static JDBCAppendTableSinkBuilder builder() { return new JDBCAppendTableSinkBuilder(); } @Override public void emitDataStream(DataStream<Row> dataStream) { dataStream .addSink(new JDBCSinkFunction(outputFormat)) .name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames)); } @Override public void emitDataSet(DataSet<Row> dataSet) { dataSet.output(outputFormat); } @Override public TypeInformation<Row> getOutputType() { return new RowTypeInfo(fieldTypes, fieldNames); } @Override public String[] getFieldNames() { return fieldNames; } @Override public TypeInformation<?>[] getFieldTypes() { return fieldTypes; } @Override public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { int[] types = outputFormat.getTypesArray(); String sinkSchema = String.join(", “, IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList())); String tableSchema = String.join(”, “, Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList())); String msg = String.format(“Schema of output table is incompatible with JDBCAppendTableSink schema. " + “Table schema: [%s], sink schema: [%s]”, tableSchema, sinkSchema); Preconditions.checkArgument(fieldTypes.length == types.length, msg); for (int i = 0; i < types.length; ++i) { Preconditions.checkArgument( JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i], msg); } JDBCAppendTableSink copy; try { copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat)); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException(e); } copy.fieldNames = fieldNames; copy.fieldTypes = fieldTypes; return copy; } @VisibleForTesting JDBCOutputFormat getOutputFormat() { return outputFormat; }}JDBCAppendTableSink里头用到了JDBCOutputFormat,它实现了AppendStreamTableSink以及BatchTableSink接口它的emitDataStream方法会给传入的dataStream设置JDBCSinkFunction的sink(JDBCSinkFunction);而emitDataSet方法则对dataSet设置output这里实现了TableSink(BatchTableSink声明实现TableSink)的getOutputType、getFieldNames、getFieldTypes、configure方法;configure方法这里主要是根据JDBCOutputFormat创建了JDBCAppendTableSinkJDBCSinkFunctionflink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.javaclass JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction { final JDBCOutputFormat outputFormat; JDBCSinkFunction(JDBCOutputFormat outputFormat) { this.outputFormat = outputFormat; } @Override public void invoke(Row value) throws Exception { outputFormat.writeRecord(value); } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { outputFormat.flush(); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); RuntimeContext ctx = getRuntimeContext(); outputFormat.setRuntimeContext(ctx); outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); } @Override public void close() throws Exception { outputFormat.close(); super.close(); }}JDBCSinkFunction继承了RichSinkFunction,同时也实现了CheckpointedFunction接口;invoke方法使用的是JDBCOutputFormat.writeRecord方法,而snapshotState则是调用了JDBCOutputFormat.flush来及时提交记录小结JDBCOutputFormat继承了RichOutputFormat,open的时候调用了establishConnection来加载驱动,初始化dbConn,然后调用dbConn.prepareStatement(query)来获取upload(PreparedStatement);writeRecord采取的是PreparedStatement.addBatch操作,当batchCount大于等于batchInterval(默认5000),会执行flush操作,也就是调用PreparedStatement.executeBatch方法,然后重置batchCount;为了以防数据没达到batchInterval而未能提交,在close的时候会再次执行flush操作,然后才关闭PreparedStatement、ConnectionRow是JDBCOutputFormat的writeRecord的类型,它里头使用Object数据来存取字段值JDBCOutputFormatBuilder提供了对username、password、dbURL、query、drivername、batchInterval、typesArray这几个属性的builder方法JDBCAppendTableSink里头用到了JDBCOutputFormat,它的emitDataStream方法会给传入的dataStream设置JDBCSinkFunction的sink(JDBCSinkFunction);而emitDataSet方法则对dataSet设置outputJDBCSinkFunction继承了RichSinkFunction,同时也实现了CheckpointedFunction接口;invoke方法使用的是JDBCOutputFormat.writeRecord方法,而snapshotState则是调用了JDBCOutputFormat.flush来及时提交记录docJDBCOutputFormat ...

December 4, 2018 · 9 min · jiezi

聊聊flink的TextOutputFormat

序本文主要研究一下flink的TextOutputFormatDataStream.writeAsTextflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java /** * Writes a DataStream to the file specified by path in text format. * * <p>For every element of the DataStream the result of {@link Object#toString()} is written. * * @param path * The path pointing to the location the text file is written to * @param writeMode * Controls the behavior for existing files. Options are * NO_OVERWRITE and OVERWRITE. * * @return The closed DataStream. / @PublicEvolving public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) { TextOutputFormat<T> tof = new TextOutputFormat<>(new Path(path)); tof.setWriteMode(writeMode); return writeUsingOutputFormat(tof); } /* * Writes the dataStream into an output, described by an OutputFormat. * * <p>The output is not participating in Flink’s checkpointing! * * <p>For writing to a file system periodically, the use of the “flink-connector-filesystem” * is recommended. * * @param format The output format * @return The closed DataStream / @PublicEvolving public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) { return addSink(new OutputFormatSinkFunction<>(format)); }DataStream的writeAsText方法创建了TextOutputFormat,然后通过OutputFormatSinkFunction包装为sink functionTextOutputFormatflink-java-1.7.0-sources.jar!/org/apache/flink/api/java/io/TextOutputFormat.java/* * A {@link FileOutputFormat} that writes objects to a text file. * * <p>Objects are converted to Strings using either {@link Object#toString()} or a {@link TextFormatter}. * @param <T> type of elements /@PublicEvolvingpublic class TextOutputFormat<T> extends FileOutputFormat<T> { private static final long serialVersionUID = 1L; private static final int NEWLINE = ‘\n’; private String charsetName; private transient Charset charset; // ——————————————————————————————– /* * Formatter that transforms values into their {@link String} representations. * @param <IN> type of input elements / public interface TextFormatter<IN> extends Serializable { String format(IN value); } public TextOutputFormat(Path outputPath) { this(outputPath, “UTF-8”); } public TextOutputFormat(Path outputPath, String charset) { super(outputPath); this.charsetName = charset; } public String getCharsetName() { return charsetName; } public void setCharsetName(String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { if (charsetName == null) { throw new NullPointerException(); } if (!Charset.isSupported(charsetName)) { throw new UnsupportedCharsetException(“The charset " + charsetName + " is not supported.”); } this.charsetName = charsetName; } // ——————————————————————————————– @Override public void open(int taskNumber, int numTasks) throws IOException { super.open(taskNumber, numTasks); try { this.charset = Charset.forName(charsetName); } catch (IllegalCharsetNameException e) { throw new IOException(“The charset " + charsetName + " is not valid.”, e); } catch (UnsupportedCharsetException e) { throw new IOException(“The charset " + charsetName + " is not supported.”, e); } } @Override public void writeRecord(T record) throws IOException { byte[] bytes = record.toString().getBytes(charset); this.stream.write(bytes); this.stream.write(NEWLINE); } // ——————————————————————————————– @Override public String toString() { return “TextOutputFormat (” + getOutputFilePath() + “) - " + this.charsetName; }}TextOutputFormat继承了FileOutputFormat,其open方法主要是调用FileOutputFormat的open方法,而writeRecord方法则直接往stream进行write,写完一条record之后再写一个换行(\n)FileOutputFormatflink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/FileOutputFormat.java/* * The abstract base class for all Rich output formats that are file based. Contains the logic to * open/close the target * file streams. /@Publicpublic abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implements InitializeOnMaster, CleanupWhenUnsuccessful { //…… /* * Initialization of the distributed file system if it is used. * * @param parallelism The task parallelism. / @Override public void initializeGlobal(int parallelism) throws IOException { final Path path = getOutputFilePath(); final FileSystem fs = path.getFileSystem(); // only distributed file systems can be initialized at start-up time. if (fs.isDistributedFS()) { final WriteMode writeMode = getWriteMode(); final OutputDirectoryMode outDirMode = getOutputDirectoryMode(); if (parallelism == 1 && outDirMode == OutputDirectoryMode.PARONLY) { // output is not written in parallel and should be written to a single file. // prepare distributed output path if(!fs.initOutPathDistFS(path, writeMode, false)) { // output preparation failed! Cancel task. throw new IOException(“Output path could not be initialized.”); } } else { // output should be written to a directory // only distributed file systems can be initialized at start-up time. if(!fs.initOutPathDistFS(path, writeMode, true)) { throw new IOException(“Output directory could not be created.”); } } } } @Override public void tryCleanupOnError() { if (this.fileCreated) { this.fileCreated = false; try { close(); } catch (IOException e) { LOG.error(“Could not properly close FileOutputFormat.”, e); } try { FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false); } catch (FileNotFoundException e) { // ignore, may not be visible yet or may be already removed } catch (Throwable t) { LOG.error(“Could not remove the incomplete file " + actualFilePath + ‘.’, t); } } } @Override public void configure(Configuration parameters) { // get the output file path, if it was not yet set if (this.outputFilePath == null) { // get the file parameter String filePath = parameters.getString(FILE_PARAMETER_KEY, null); if (filePath == null) { throw new IllegalArgumentException(“The output path has been specified neither via constructor/setters” + “, nor via the Configuration.”); } try { this.outputFilePath = new Path(filePath); } catch (RuntimeException rex) { throw new RuntimeException(“Could not create a valid URI from the given file path name: " + rex.getMessage()); } } // check if have not been set and use the defaults in that case if (this.writeMode == null) { this.writeMode = DEFAULT_WRITE_MODE; } if (this.outputDirectoryMode == null) { this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE; } } @Override public void open(int taskNumber, int numTasks) throws IOException { if (taskNumber < 0 || numTasks < 1) { throw new IllegalArgumentException(“TaskNumber: " + taskNumber + “, numTasks: " + numTasks); } if (LOG.isDebugEnabled()) { LOG.debug(“Opening stream for output (” + (taskNumber+1) + “/” + numTasks + “). WriteMode=” + writeMode + “, OutputDirectoryMode=” + outputDirectoryMode); } Path p = this.outputFilePath; if (p == null) { throw new IOException(“The file path is null.”); } final FileSystem fs = p.getFileSystem(); // if this is a local file system, we need to initialize the local output directory here if (!fs.isDistributedFS()) { if (numTasks == 1 && outputDirectoryMode == OutputDirectoryMode.PARONLY) { // output should go to a single file // prepare local output path. checks for write mode and removes existing files in case of OVERWRITE mode if(!fs.initOutPathLocalFS(p, writeMode, false)) { // output preparation failed! Cancel task. throw new IOException(“Output path ‘” + p.toString() + “’ could not be initialized. Canceling task…”); } } else { // numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS if(!fs.initOutPathLocalFS(p, writeMode, true)) { // output preparation failed! Cancel task. throw new IOException(“Output directory ‘” + p.toString() + “’ could not be created. Canceling task…”); } } } // Suffix the path with the parallel instance index, if needed this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix(”/” + getDirectoryFileName(taskNumber)) : p; // create output file this.stream = fs.create(this.actualFilePath, writeMode); // at this point, the file creation must have succeeded, or an exception has been thrown this.fileCreated = true; } @Override public void close() throws IOException { final FSDataOutputStream s = this.stream; if (s != null) { this.stream = null; s.close(); } }}FileOutputFormat继承了RichOutputFormat,实现了InitializeOnMaster(initializeGlobal方法)、CleanupWhenUnsuccessful(tryCleanupOnError方法)接口initializeGlobal主要是判断,如果文件是分布式系统文件,那么就在启动的时候全局初始化一下;tryCleanupOnError方法先close,然后再delete文件FileOutputFormat还实现了OutputFormat接口的configure、open、close方法,而writeRecord方法由子类来实现;configure方法主要是配置outputFilePath、writeMode、outputDirectoryMode这几个属性;open方法则根据taskNumber来获取actualFilePath(对于numTasks大于1的,则根据tasknumber在配置的outputFilePath目录下新增文件,文件名为tasknumber对应的数值+1),然后创建stream;close方法只要是关闭streamRichOutputFormatflink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/RichOutputFormat.java/* * An abstract stub implementation for Rich output formats. * Rich formats have access to their runtime execution context via {@link #getRuntimeContext()}. /@Publicpublic abstract class RichOutputFormat<IT> implements OutputFormat<IT> { private static final long serialVersionUID = 1L; // ——————————————————————————————– // Runtime context access // ——————————————————————————————– private transient RuntimeContext runtimeContext; public void setRuntimeContext(RuntimeContext t) { this.runtimeContext = t; } public RuntimeContext getRuntimeContext() { if (this.runtimeContext != null) { return this.runtimeContext; } else { throw new IllegalStateException(“The runtime context has not been initialized yet. Try accessing " + “it in one of the other life cycle methods.”); } }}RichOutputFormat声明实现OutputFormat接口,它主要是增加了RuntimeContext属性OutputFormatflink-core-1.7.0-sources.jar!/org/apache/flink/api/common/io/OutputFormat.java/* * The base interface for outputs that consumes records. The output format * describes how to store the final records, for example in a file. * <p> * The life cycle of an output format is the following: * <ol> * <li>configure() is invoked a single time. The method can be used to implement initialization from * the parameters (configuration) that may be attached upon instantiation.</li> * <li>Each parallel output task creates an instance, configures it and opens it.</li> * <li>All records of its parallel instance are handed to the output format.</li> * <li>The output format is closed</li> * </ol> * * @param <IT> The type of the consumed records. /@Publicpublic interface OutputFormat<IT> extends Serializable { /* * Configures this output format. Since output formats are instantiated generically and hence parameterless, * this method is the place where the output formats set their basic fields based on configuration values. * <p> * This method is always called first on a newly instantiated output format. * * @param parameters The configuration with all parameters. / void configure(Configuration parameters); /* * Opens a parallel instance of the output format to store the result of its parallel instance. * <p> * When this method is called, the output format it guaranteed to be configured. * * @param taskNumber The number of the parallel instance. * @param numTasks The number of parallel tasks. * @throws IOException Thrown, if the output could not be opened due to an I/O problem. / void open(int taskNumber, int numTasks) throws IOException; /* * Adds a record to the output. * <p> * When this method is called, the output format it guaranteed to be opened. * * @param record The records to add to the output. * @throws IOException Thrown, if the records could not be added to to an I/O problem. / void writeRecord(IT record) throws IOException; /* * Method that marks the end of the life-cycle of parallel output instance. Should be used to close * channels and streams and release resources. * After this method returns without an error, the output is assumed to be correct. * <p> * When this method is called, the output format it guaranteed to be opened. * * @throws IOException Thrown, if the input could not be closed properly. */ void close() throws IOException;}OutputFormat接口定义了configure、open、writeRecord、close方法小结DataStream的writeAsText方法创建了TextOutputFormat,然后通过OutputFormatSinkFunction包装为sink functionTextOutputFormat继承了FileOutputFormat,其open方法主要是调用FileOutputFormat的open方法,而writeRecord方法则直接往stream进行write,写完一条record之后再写一个换行(\n)FileOutputFormat继承了RichOutputFormat,实现了InitializeOnMaster(initializeGlobal方法)、CleanupWhenUnsuccessful(tryCleanupOnError方法)接口,以及OutputFormat接口的configure、open、close方法,而writeRecord方法由子类来实现;FileOutputFormat的open方法则根据taskNumber来获取actualFilePath(对于numTasks大于1的,则根据tasknumber在配置的outputFilePath目录下新增文件,文件名为tasknumber对应的数值+1),然后创建streamRichOutputFormat声明实现OutputFormat接口,它主要是增加了RuntimeContext属性;OutputFormat接口则定义了configure、open、writeRecord、close方法docTextOutputFormat ...

December 3, 2018 · 8 min · jiezi

[case48]聊聊flink的SocketClientSink

序本文主要研究一下flink的SocketClientSinkDataStream.writeToSocketflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java /** * Writes the DataStream to a socket as a byte array. The format of the * output is specified by a {@link SerializationSchema}. * * @param hostName * host of the socket * @param port * port of the socket * @param schema * schema for serialization * @return the closed DataStream / @PublicEvolving public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) { DataStreamSink<T> returnStream = addSink(new SocketClientSink<>(hostName, port, schema, 0)); returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port return returnStream; }DataStream的writeToSocket方法,内部创建了SocketClientSink,这里传递了四个构造参数,分别是hostName、port、schema、maxNumRetries(这里为0)SocketClientSinkflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java/* * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array. * * <p>The sink can be set to retry message sends after the sending failed. * * <p>The sink can be set to ‘autoflush’, in which case the socket stream is flushed after every * message. This significantly reduced throughput, but also decreases message latency. * * @param <IN> data to be written into the Socket. /@PublicEvolvingpublic class SocketClientSink<IN> extends RichSinkFunction<IN> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class); private static final int CONNECTION_RETRY_DELAY = 500; private final SerializableObject lock = new SerializableObject(); private final SerializationSchema<IN> schema; private final String hostName; private final int port; private final int maxNumRetries; private final boolean autoFlush; private transient Socket client; private transient OutputStream outputStream; private int retries; private volatile boolean isRunning = true; /* * Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure * and will not auto-flush the stream. * * @param hostName Hostname of the server to connect to. * @param port Port of the server. * @param schema Schema used to serialize the data into bytes. / public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema) { this(hostName, port, schema, 0); } /* * Creates a new SocketClientSink that retries connections upon failure up to a given number of times. * A value of -1 for the number of retries will cause the system to retry an infinite number of times. * The sink will not auto-flush the stream. * * @param hostName Hostname of the server to connect to. * @param port Port of the server. * @param schema Schema used to serialize the data into bytes. * @param maxNumRetries The maximum number of retries after a message send failed. / public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema, int maxNumRetries) { this(hostName, port, schema, maxNumRetries, false); } /* * Creates a new SocketClientSink that retries connections upon failure up to a given number of times. * A value of -1 for the number of retries will cause the system to retry an infinite number of times. * * @param hostName Hostname of the server to connect to. * @param port Port of the server. * @param schema Schema used to serialize the data into bytes. * @param maxNumRetries The maximum number of retries after a message send failed. * @param autoflush Flag to indicate whether the socket stream should be flushed after each message. / public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema, int maxNumRetries, boolean autoflush) { checkArgument(port > 0 && port < 65536, “port is out of range”); checkArgument(maxNumRetries >= -1, “maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)”); this.hostName = checkNotNull(hostName, “hostname must not be null”); this.port = port; this.schema = checkNotNull(schema); this.maxNumRetries = maxNumRetries; this.autoFlush = autoflush; } // ———————————————————————— // Life cycle // ———————————————————————— /* * Initialize the connection with the Socket in the server. * @param parameters Configuration. / @Override public void open(Configuration parameters) throws Exception { try { synchronized (lock) { createConnection(); } } catch (IOException e) { throw new IOException(“Cannot connect to socket server at " + hostName + “:” + port, e); } } /* * Called when new data arrives to the sink, and forwards it to Socket. * * @param value The value to write to the socket. / @Override public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { outputStream.write(msg); if (autoFlush) { outputStream.flush(); } } catch (IOException e) { // if no re-tries are enable, fail immediately if (maxNumRetries == 0) { throw new IOException(“Failed to send message ‘” + value + “’ to socket server at " + hostName + “:” + port + “. Connection re-tries are not enabled.”, e); } LOG.error(“Failed to send message ‘” + value + “’ to socket server at " + hostName + “:” + port + “. Trying to reconnect…” , e); // do the retries in locked scope, to guard against concurrent close() calls // note that the first re-try comes immediately, without a wait! synchronized (lock) { IOException lastException = null; retries = 0; while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) { // first, clean up the old resources try { if (outputStream != null) { outputStream.close(); } } catch (IOException ee) { LOG.error(“Could not close output stream from failed write attempt”, ee); } try { if (client != null) { client.close(); } } catch (IOException ee) { LOG.error(“Could not close socket from failed write attempt”, ee); } // try again retries++; try { // initialize a new connection createConnection(); // re-try the write outputStream.write(msg); // success! return; } catch (IOException ee) { lastException = ee; LOG.error(“Re-connect to socket server and send message failed. Retry time(s): " + retries, ee); } // wait before re-attempting to connect lock.wait(CONNECTION_RETRY_DELAY); } // throw an exception if the task is still running, otherwise simply leave the method if (isRunning) { throw new IOException(“Failed to send message ‘” + value + “’ to socket server at " + hostName + “:” + port + “. Failed after " + retries + " retries.”, lastException); } } } } /* * Closes the connection with the Socket server. */ @Override public void close() throws Exception { // flag this as not running any more isRunning = false; // clean up in locked scope, so there is no concurrent change to the stream and client synchronized (lock) { // we notify first (this statement cannot fail). The notified thread will not continue // anyways before it can re-acquire the lock lock.notifyAll(); try { if (outputStream != null) { outputStream.close(); } } finally { if (client != null) { client.close(); } } } } // ———————————————————————— // Utilities // ———————————————————————— private void createConnection() throws IOException { client = new Socket(hostName, port); client.setKeepAlive(true); client.setTcpNoDelay(true); outputStream = client.getOutputStream(); } // ———————————————————————— // For testing // ———————————————————————— int getCurrentNumberOfRetries() { synchronized (lock) { return retries; } }}SocketClientSink继承了RichSinkFunction,其autoFlush属性默认为falseopen方法里头调用了createConnection,来初始化与socket的连接,如果此时出现IOException,则立马fail fast;createConnection的时候,这里设置的keepAlive及tcpNoDelay均为trueinvoke方法首先调用schema.serialize方法来序列化value,然后调用socket的outputStream.write,如果autoFlush为true的话,则立马flush outputStream;如果出现IOException则立马进行重试,这里重试的逻辑直接写在catch里头,根据maxNumRetries来,重试的时候,就是先createConnection,然后调用outputStream.write,重试的delay为CONNECTION_RETRY_DELAY(500)小结DataStream的writeToSocket方法,内部创建了SocketClientSink,默认传递的maxNumRetries为0,而且没有调用带autoFlush属性默认为false的构造器,其autoFlush属性默认为falseopen方法创建的socket,其keepAlive及tcpNoDelay均为true,如果open的时候出现IOException,则里头抛出异常终止运行invoke方法比较简单,就是使用SerializationSchema来序列化value,然后write到outputStream;这里进行了简单的失败重试,默认的重试delay为CONNECTION_RETRY_DELAY(500),这个版本实现的重试比较简单,是同步进行的docSocketClientSink ...

December 2, 2018 · 6 min · jiezi

聊聊flink的PrintSinkFunction

序本文主要研究一下flink的PrintSinkFunctionDataStream.printflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java /** * Writes a DataStream to the standard output stream (stdout). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink * worker. * * @return The closed DataStream. / @PublicEvolving public DataStreamSink<T> print() { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(); return addSink(printFunction).name(“Print to Std. Out”); } /* * Writes a DataStream to the standard output stream (stderr). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink * worker. * * @return The closed DataStream. / @PublicEvolving public DataStreamSink<T> printToErr() { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(true); return addSink(printFunction).name(“Print to Std. Err”); } /* * Writes a DataStream to the standard output stream (stdout). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink * worker. * * @param sinkIdentifier The string to prefix the output with. * @return The closed DataStream. / @PublicEvolving public DataStreamSink<T> print(String sinkIdentifier) { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false); return addSink(printFunction).name(“Print to Std. Out”); } /* * Writes a DataStream to the standard output stream (stderr). * * <p>For each element of the DataStream the result of {@link Object#toString()} is written. * * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink * worker. * * @param sinkIdentifier The string to prefix the output with. * @return The closed DataStream. / @PublicEvolving public DataStreamSink<T> printToErr(String sinkIdentifier) { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, true); return addSink(printFunction).name(“Print to Std. Err”); } /* * Adds the given sink to this DataStream. Only streams with sinks added * will be executed once the {@link StreamExecutionEnvironment#execute()} * method is called. * * @param sinkFunction * The object containing the sink’s invoke function. * @return The closed DataStream. / public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); // configure the type if needed if (sinkFunction instanceof InputTypeConfigurable) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig()); } StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction)); DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; }DataStream提供了几个print开头的方法,内部是创建了PrintSinkFunction,通过调用addSink操作把该PrintSinkFunction添加进去addSink方法的注释表明带有sinks的streams,会在StreamExecutionEnvironment.execute()调用的时候被执行SinkFunction先是被StreamSink包装,然后被DataStreamSink包装,最后通过DataStreamSink.getTransformation作为operator添加到ExecutionEnvironmentSinkFunctionflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java/* * Interface for implementing user defined sink functionality. * * @param <IN> Input type parameter. /@Publicpublic interface SinkFunction<IN> extends Function, Serializable { /* * @deprecated Use {@link #invoke(Object, Context)}. / @Deprecated default void invoke(IN value) throws Exception {} /* * Writes the given value to the sink. This function is called for every record. * * <p>You have to override this method when implementing a {@code SinkFunction}, this is a * {@code default} method for backward compatibility with the old-style method only. * * @param value The input record. * @param context Additional context about the input record. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. / default void invoke(IN value, Context context) throws Exception { invoke(value); } /* * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about * an input record. * * <p>The context is only valid for the duration of a * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use * afterwards! * * @param <T> The type of elements accepted by the sink. / @Public // Interface might be extended in the future with additional methods. interface Context<T> { /* Returns the current processing time. / long currentProcessingTime(); /* Returns the current event-time watermark. / long currentWatermark(); /* * Returns the timestamp of the current input record or {@code null} if the element does not * have an assigned timestamp. / Long timestamp(); }}SinkFunction接口定义了invoke方法,用来触发sink逻辑;invoke方法里头传递了一个Context,该接口定义了currentProcessingTime、currentWatermark、timestamp三个方法RichSinkFunctionflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java/* * A {@link org.apache.flink.api.common.functions.RichFunction} version of {@link SinkFunction}. /@Publicpublic abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> { private static final long serialVersionUID = 1L;}RichSinkFunction抽象类继承了AbstractRichFunction类,同时也声明实现SinkFunction接口;大部分内置的sink function都继承了RichSinkFunction;AbstractRichFunction主要是提供了RuntimeContext属性,可以用来获取function运行时的上下文PrintSinkFunctionflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java/* * Implementation of the SinkFunction writing every tuple to the standard * output or standard error stream. * * <p> * Four possible format options: * {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1 * {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1 * taskId> output <- no {@code sinkIdentifier} provided, parallelism > 1 * output <- no {@code sinkIdentifier} provided, parallelism == 1 * </p> * * @param <IN> Input record type /@PublicEvolvingpublic class PrintSinkFunction<IN> extends RichSinkFunction<IN> { private static final long serialVersionUID = 1L; private final PrintSinkOutputWriter<IN> writer; /* * Instantiates a print sink function that prints to standard out. / public PrintSinkFunction() { writer = new PrintSinkOutputWriter<>(false); } /* * Instantiates a print sink function that prints to standard out. * * @param stdErr True, if the format should print to standard error instead of standard out. / public PrintSinkFunction(final boolean stdErr) { writer = new PrintSinkOutputWriter<>(stdErr); } /* * Instantiates a print sink function that prints to standard out and gives a sink identifier. * * @param stdErr True, if the format should print to standard error instead of standard out. * @param sinkIdentifier Message that identify sink and is prefixed to the output of the value / public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) { writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); } @Override public void invoke(IN record) { writer.write(record); } @Override public String toString() { return writer.toString(); }}PrintSinkFunction继承了RichSinkFunction,它主要是使用了PrintSinkOutputWriter,在invoke的时候调用PrintSinkOutputWriter的write方法来执行输出PrintSinkOutputWriterflink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java/* * Print sink output writer for DataStream and DataSet print API. */@Internalpublic class PrintSinkOutputWriter<IN> implements Serializable { private static final long serialVersionUID = 1L; private static final boolean STD_OUT = false; private static final boolean STD_ERR = true; private final boolean target; private transient PrintStream stream; private final String sinkIdentifier; private transient String completedPrefix; public PrintSinkOutputWriter() { this("", STD_OUT); } public PrintSinkOutputWriter(final boolean stdErr) { this("", stdErr); } public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) { this.target = stdErr; this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier); } public void open(int subtaskIndex, int numParallelSubtasks) { // get the target stream stream = target == STD_OUT ? System.out : System.err; completedPrefix = sinkIdentifier; if (numParallelSubtasks > 1) { if (!completedPrefix.isEmpty()) { completedPrefix += “:”; } completedPrefix += (subtaskIndex + 1); } if (!completedPrefix.isEmpty()) { completedPrefix += “> “; } } public void write(IN record) { stream.println(completedPrefix + record.toString()); } @Override public String toString() { return “Print to " + (target == STD_OUT ? “System.out” : “System.err”); }}PrintSinkOutputWriter的构造器最多可以接收两个参数,分别是sinkIdentifier以及stdErr;sinkIdentifier即为输出的前缀,stdErr用于表示是否输出到System.erropen方法主要用于做一些准备工作,它在PrintSinkFunction的open方法里头会被调用,PrintSinkFunction的open方法会从AbstractRichFunction定义的RuntimeContext里头获取subtaskIndex及numParallelSubtasks传递过来;这里的open方法根据sinkIdentifier以及subtaskIndex、numParallelSubtasks信息构建completedPrefixwrite方法就是调用System.out或者System.err的println方法,带上completedPrefix及record的信息小结DataStream的几个print开头的方法内部创建的是PrintSinkFunction,然后调用addSink方法添加到ExecutionEnvironment中(先是被StreamSink包装,然后被DataStreamSink包装,最后通过DataStreamSink.getTransformation作为operator添加到ExecutionEnvironment)SinkFunction是sink function的基础接口,它主要定义了invoke方法,该方法里头传递了一个Context;而内置的一些sink function大多是继承的RichSinkFunction,RichSinkFunction主要是继承了AbstractRichFunction,可以提供funtion运行时的RuntimeContext信息PrintSinkFunction继承了RichSinkFunction,它主要是使用了PrintSinkOutputWriter,在invoke的时候调用PrintSinkOutputWriter的write方法来执行输出docPrintSinkFunction ...

December 1, 2018 · 6 min · jiezi

聊聊flink的ParallelIteratorInputFormat

序本文主要研究一下flink的ParallelIteratorInputFormat实例 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Long> dataSet = env.generateSequence(15,106) .setParallelism(3); dataSet.print();这里使用ExecutionEnvironment的generateSequence方法创建了带NumberSequenceIterator的ParallelIteratorInputFormatParallelIteratorInputFormatflink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java/** * An input format that generates data in parallel through a {@link SplittableIterator}. /@PublicEvolvingpublic class ParallelIteratorInputFormat<T> extends GenericInputFormat<T> { private static final long serialVersionUID = 1L; private final SplittableIterator<T> source; private transient Iterator<T> splitIterator; public ParallelIteratorInputFormat(SplittableIterator<T> iterator) { this.source = iterator; } @Override public void open(GenericInputSplit split) throws IOException { super.open(split); this.splitIterator = this.source.getSplit(split.getSplitNumber(), split.getTotalNumberOfSplits()); } @Override public boolean reachedEnd() { return !this.splitIterator.hasNext(); } @Override public T nextRecord(T reuse) { return this.splitIterator.next(); }}ParallelIteratorInputFormat继承了GenericInputFormat类,而GenericInputFormat类底下还有其他四个子类,分别是CRowValuesInputFormat、CollectionInputFormat、IteratorInputFormat、ValuesInputFormat,它们有一个共同的特点就是都实现了NonParallelInput接口NonParallelInputflink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/NonParallelInput.java/* * This interface acts as a marker for input formats for inputs which cannot be split. * Data sources with a non-parallel input formats are always executed with a parallelism * of one. * * @see InputFormat /@Publicpublic interface NonParallelInput {}这个接口没有定义任何方法,仅仅是一个标识,表示该InputFormat是否支持splitGenericInputFormat.createInputSplitsflink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/GenericInputFormat.java @Override public GenericInputSplit[] createInputSplits(int numSplits) throws IOException { if (numSplits < 1) { throw new IllegalArgumentException(“Number of input splits has to be at least 1.”); } numSplits = (this instanceof NonParallelInput) ? 1 : numSplits; GenericInputSplit[] splits = new GenericInputSplit[numSplits]; for (int i = 0; i < splits.length; i++) { splits[i] = new GenericInputSplit(i, numSplits); } return splits; }GenericInputFormat的createInputSplits方法对输入的numSplits进行了限制,如果小于1则抛出IllegalArgumentException异常,如果当前InputFormat有实现NonParallelInput接口,则将numSplits重置为1ExecutionEnvironment.fromParallelCollectionflink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java /* * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the * framework to create a parallel data source that returns the elements in the iterator. * * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data * returned by the iterator must be given explicitly in the form of the type class (this is due to the * fact that the Java compiler erases the generic type information). * * @param iterator The iterator that produces the elements of the data set. * @param type The class of the data produced by the iterator. Must not be a generic class. * @return A DataSet representing the elements in the iterator. * * @see #fromParallelCollection(SplittableIterator, TypeInformation) / public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, Class<X> type) { return fromParallelCollection(iterator, TypeExtractor.getForClass(type)); } /* * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the * framework to create a parallel data source that returns the elements in the iterator. * * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data * returned by the iterator must be given explicitly in the form of the type information. * This method is useful for cases where the type is generic. In that case, the type class * (as given in {@link #fromParallelCollection(SplittableIterator, Class)} does not supply all type information. * * @param iterator The iterator that produces the elements of the data set. * @param type The TypeInformation for the produced data set. * @return A DataSet representing the elements in the iterator. * * @see #fromParallelCollection(SplittableIterator, Class) / public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type) { return fromParallelCollection(iterator, type, Utils.getCallLocationName()); } // private helper for passing different call location names private <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type, String callLocationName) { return new DataSource<>(this, new ParallelIteratorInputFormat<>(iterator), type, callLocationName); } /* * Creates a new data set that contains a sequence of numbers. The data set will be created in parallel, * so there is no guarantee about the order of the elements. * * @param from The number to start at (inclusive). * @param to The number to stop at (inclusive). * @return A DataSet, containing all number in the {@code [from, to]} interval. / public DataSource<Long> generateSequence(long from, long to) { return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, Utils.getCallLocationName()); }ExecutionEnvironment的fromParallelCollection方法,针对SplittableIterator类型的iterator,会创建ParallelIteratorInputFormat;generateSequence方法也调用了fromParallelCollection方法,它创建的是NumberSequenceIterator(是SplittableIterator的子类)SplittableIteratorflink-core-1.6.2-sources.jar!/org/apache/flink/util/SplittableIterator.java/* * Abstract base class for iterators that can split themselves into multiple disjoint * iterators. The union of these iterators returns the original iterator values. * * @param <T> The type of elements returned by the iterator. /@Publicpublic abstract class SplittableIterator<T> implements Iterator<T>, Serializable { private static final long serialVersionUID = 200377674313072307L; /* * Splits this iterator into a number disjoint iterators. * The union of these iterators returns the original iterator values. * * @param numPartitions The number of iterators to split into. * @return An array with the split iterators. / public abstract Iterator<T>[] split(int numPartitions); /* * Splits this iterator into <i>n</i> partitions and returns the <i>i-th</i> partition * out of those. * * @param num The partition to return (<i>i</i>). * @param numPartitions The number of partitions to split into (<i>n</i>). * @return The iterator for the partition. / public Iterator<T> getSplit(int num, int numPartitions) { if (numPartitions < 1 || num < 0 || num >= numPartitions) { throw new IllegalArgumentException(); } return split(numPartitions)[num]; } /* * The maximum number of splits into which this iterator can be split up. * * @return The maximum number of splits into which this iterator can be split up. / public abstract int getMaximumNumberOfSplits();}SplittableIterator是个抽象类,它定义了抽象方法split以及getMaximumNumberOfSplits;它有两个实现类,分别是LongValueSequenceIterator以及NumberSequenceIterator,这里我们看下NumberSequenceIteratorNumberSequenceIteratorflink-core-1.6.2-sources.jar!/org/apache/flink/util/NumberSequenceIterator.java/* * The {@code NumberSequenceIterator} is an iterator that returns a sequence of numbers (as {@code Long})s. * The iterator is splittable (as defined by {@link SplittableIterator}, i.e., it can be divided into multiple * iterators that each return a subsequence of the number sequence. /@Publicpublic class NumberSequenceIterator extends SplittableIterator<Long> { private static final long serialVersionUID = 1L; /* The last number returned by the iterator. / private final long to; /* The next number to be returned. / private long current; /* * Creates a new splittable iterator, returning the range [from, to]. * Both boundaries of the interval are inclusive. * * @param from The first number returned by the iterator. * @param to The last number returned by the iterator. */ public NumberSequenceIterator(long from, long to) { if (from > to) { throw new IllegalArgumentException(“The ’to’ value must not be smaller than the ‘from’ value.”); } this.current = from; this.to = to; } @Override public boolean hasNext() { return current <= to; } @Override public Long next() { if (current <= to) { return current++; } else { throw new NoSuchElementException(); } } @Override public NumberSequenceIterator[] split(int numPartitions) { if (numPartitions < 1) { throw new IllegalArgumentException(“The number of partitions must be at least 1.”); } if (numPartitions == 1) { return new NumberSequenceIterator[] { new NumberSequenceIterator(current, to) }; } // here, numPartitions >= 2 !!! long elementsPerSplit; if (to - current + 1 >= 0) { elementsPerSplit = (to - current + 1) / numPartitions; } else { // long overflow of the range. // we compute based on half the distance, to prevent the overflow. // in most cases it holds that: current < 0 and to > 0, except for: to == 0 and current == Long.MIN_VALUE // the later needs a special case final long halfDiff; // must be positive if (current == Long.MIN_VALUE) { // this means to >= 0 halfDiff = (Long.MAX_VALUE / 2 + 1) + to / 2; } else { long posFrom = -current; if (posFrom > to) { halfDiff = to + ((posFrom - to) / 2); } else { halfDiff = posFrom + ((to - posFrom) / 2); } } elementsPerSplit = halfDiff / numPartitions * 2; } if (elementsPerSplit < Long.MAX_VALUE) { // figure out how many get one in addition long numWithExtra = -(elementsPerSplit * numPartitions) + to - current + 1; // based on rounding errors, we may have lost one) if (numWithExtra > numPartitions) { elementsPerSplit++; numWithExtra -= numPartitions; if (numWithExtra > numPartitions) { throw new RuntimeException(“Bug in splitting logic. To much rounding loss.”); } } NumberSequenceIterator[] iters = new NumberSequenceIterator[numPartitions]; long curr = current; int i = 0; for (; i < numWithExtra; i++) { long next = curr + elementsPerSplit + 1; iters[i] = new NumberSequenceIterator(curr, next - 1); curr = next; } for (; i < numPartitions; i++) { long next = curr + elementsPerSplit; iters[i] = new NumberSequenceIterator(curr, next - 1, true); curr = next; } return iters; } else { // this can only be the case when there are two partitions if (numPartitions != 2) { throw new RuntimeException(“Bug in splitting logic.”); } return new NumberSequenceIterator[] { new NumberSequenceIterator(current, current + elementsPerSplit), new NumberSequenceIterator(current + elementsPerSplit, to) }; } } @Override public int getMaximumNumberOfSplits() { if (to >= Integer.MAX_VALUE || current <= Integer.MIN_VALUE || to - current + 1 >= Integer.MAX_VALUE) { return Integer.MAX_VALUE; } else { return (int) (to - current + 1); } } //……}NumberSequenceIterator的构造器提供了from及to两个参数,它内部有一个current值,初始的时候等于fromsplit方法首先根据numPartitions,来计算elementsPerSplit,当to - current + 1 >= 0时,计算公式为(to - current + 1) / numPartitions之后根据计算出来的elementsPerSplit来计算numWithExtra,这是因为计算elementsPerSplit的时候用的是取整操作,如果每一批都按elementsPerSplit,可能存在多余的,于是就算出这个多余的numWithExtra,如果它大于numPartitions,则对elementsPerSplit增加1,然后对numWithExtra减去numPartitions最后就是先根据numWithExtra来循环分配前numWithExtra个批次,将多余的numWithExtra平均分配给前numWithExtra个批次;numWithExtra之后到numPartitions的批次,就正常的使用from + elementsPerSplit -1来计算togetMaximumNumberOfSplits则是返回可以split的最大数量,(to >= Integer.MAX_VALUE || current <= Integer.MIN_VALUE || to - current + 1 >= Integer.MAX_VALUE)的条件下返回Integer.MAX_VALUE,否则返回(int) (to - current + 1)小结GenericInputFormat类底下有五个子类,除了ParallelIteratorInputFormat外,其他的分别是CRowValuesInputFormat、CollectionInputFormat、IteratorInputFormat、ValuesInputFormat,后面这四个子类有一个共同的特点就是都实现了NonParallelInput接口GenericInputFormat的createInputSplits会对输入的numSplits进行限制,如果是NonParallelInput类型的,则强制重置为1NumberSequenceIterator是SplittableIterator的一个实现类,在ExecutionEnvironment的fromParallelCollection方法,generateSequence方法(它创建的是NumberSequenceIterator),针对SplittableIterator类型的iterator,创建ParallelIteratorInputFormat;而NumberSequenceIterator的split方法,它先计算elementsPerSplit,然后计算numWithExtra,把numWithExtra均分到前面几个批次,最后在按elementsPerSplit均分剩余的批次docParallelIteratorInputFormatSplittableIteratorNumberSequenceIterator ...

November 30, 2018 · 7 min · jiezi

聊聊flink的InputFormatSourceFunction

序本文主要研究一下flink的InputFormatSourceFunction实例 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); IteratorInputFormat iteratorInputFormat = new IteratorInputFormat<String>(new WordIterator()); env //TypeInformation.of(new TypeHint<String>() {} .createInput(iteratorInputFormat,TypeExtractor.createTypeInfo(String.class)) .setParallelism(1) .print();这里使用IteratorInputFormat调用env的createInput方法来创建SourceFunctionStreamExecutionEnvironment.createInputflink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @PublicEvolving public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) { DataStreamSource<OUT> source; if (inputFormat instanceof FileInputFormat) { @SuppressWarnings(“unchecked”) FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat; source = createFileInput(format, typeInfo, “Custom File source”, FileProcessingMode.PROCESS_ONCE, -1); } else { source = createInput(inputFormat, typeInfo, “Custom Source”); } return source; } private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo, String sourceName) { InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, typeInfo); return addSource(function, sourceName, typeInfo); }StreamExecutionEnvironment.createInput在inputFormat不是FileInputFormat类型的时候创建的是InputFormatSourceFunctionInputFormatSourceFunction/** * A {@link SourceFunction} that reads data using an {@link InputFormat}. /@Internalpublic class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<OUT> { private static final long serialVersionUID = 1L; private TypeInformation<OUT> typeInfo; private transient TypeSerializer<OUT> serializer; private InputFormat<OUT, InputSplit> format; private transient InputSplitProvider provider; private transient Iterator<InputSplit> splitIterator; private volatile boolean isRunning = true; @SuppressWarnings(“unchecked”) public InputFormatSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) { this.format = (InputFormat<OUT, InputSplit>) format; this.typeInfo = typeInfo; } @Override @SuppressWarnings(“unchecked”) public void open(Configuration parameters) throws Exception { StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).setRuntimeContext(context); } format.configure(parameters); provider = context.getInputSplitProvider(); serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); splitIterator = getInputSplits(); isRunning = splitIterator.hasNext(); } @Override public void run(SourceContext<OUT> ctx) throws Exception { try { Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter(“numSplitsProcessed”); if (isRunning && format instanceof RichInputFormat) { ((RichInputFormat) format).openInputFormat(); } OUT nextElement = serializer.createInstance(); while (isRunning) { format.open(splitIterator.next()); // for each element we also check if cancel // was called by checking the isRunning flag while (isRunning && !format.reachedEnd()) { nextElement = format.nextRecord(nextElement); if (nextElement != null) { ctx.collect(nextElement); } else { break; } } format.close(); completedSplitsCounter.inc(); if (isRunning) { isRunning = splitIterator.hasNext(); } } } finally { format.close(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).closeInputFormat(); } isRunning = false; } } @Override public void cancel() { isRunning = false; } @Override public void close() throws Exception { format.close(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).closeInputFormat(); } } /* * Returns the {@code InputFormat}. This is only needed because we need to set the input * split assigner on the {@code StreamGraph}. / public InputFormat<OUT, InputSplit> getFormat() { return format; } private Iterator<InputSplit> getInputSplits() { return new Iterator<InputSplit>() { private InputSplit nextSplit; private boolean exhausted; @Override public boolean hasNext() { if (exhausted) { return false; } if (nextSplit != null) { return true; } final InputSplit split; try { split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader()); } catch (InputSplitProviderException e) { throw new RuntimeException(“Could not retrieve next input split.”, e); } if (split != null) { this.nextSplit = split; return true; } else { exhausted = true; return false; } } @Override public InputSplit next() { if (this.nextSplit == null && !hasNext()) { throw new NoSuchElementException(); } final InputSplit tmp = this.nextSplit; this.nextSplit = null; return tmp; } @Override public void remove() { throw new UnsupportedOperationException(); } }; }}InputFormatSourceFunction是一个使用InputFormat来读取数据的SourceFunction,它继承了RichParallelSourceFunction,新增了带有2个参数的构造器,一个是InputFormat,一个是TypeInformation这里有一个getInputSplits方法,它返回的是InputSplit的Iterator(splitIterator),nextSplit是调用InputSplitProvider.getNextInputSplit来获取run方法主要是挨个调用splitIterator.next(),并用InputFormat去open该InputSplit,然后调用format.nextRecord来挨个读取该InputSplit的每个record,最后使用SourceContext的emit方法发射出去InputSplitProviderflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java/* * An input split provider can be successively queried to provide a series of {@link InputSplit} objects a * task is supposed to consume in the course of its execution. /@Publicpublic interface InputSplitProvider { /* * Requests the next input split to be consumed by the calling task. * * @param userCodeClassLoader used to deserialize input splits * @return the next input split to be consumed by the calling task or <code>null</code> if the * task shall not consume any further input splits. * @throws InputSplitProviderException if fetching the next input split fails / InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;}InputSplitProvider接口定义了getNextInputSplit方法,用于查询nextInputSplit,它有两个实现类,分别是RpcInputSplitProvider、TaskInputSplitProviderRpcInputSplitProviderflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.javapublic class RpcInputSplitProvider implements InputSplitProvider { private final JobMasterGateway jobMasterGateway; private final JobVertexID jobVertexID; private final ExecutionAttemptID executionAttemptID; private final Time timeout; public RpcInputSplitProvider( JobMasterGateway jobMasterGateway, JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID, Time timeout) { this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); this.jobVertexID = Preconditions.checkNotNull(jobVertexID); this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID); this.timeout = Preconditions.checkNotNull(timeout); } @Override public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException { Preconditions.checkNotNull(userCodeClassLoader); CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit( jobVertexID, executionAttemptID); try { SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit()); if (serializedInputSplit.isEmpty()) { return null; } else { return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader); } } catch (Exception e) { throw new InputSplitProviderException(“Requesting the next input split failed.”, e); } }}RpcInputSplitProvider请求jobMasterGateway.requestNextInputSplit来获取SerializedInputSplit(本实例的splitProvider为RpcInputSplitProvider)TaskInputSplitProviderflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java/* * Implementation using {@link ActorGateway} to forward the messages. /public class TaskInputSplitProvider implements InputSplitProvider { private final ActorGateway jobManager; private final JobID jobID; private final JobVertexID vertexID; private final ExecutionAttemptID executionID; private final FiniteDuration timeout; public TaskInputSplitProvider( ActorGateway jobManager, JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionID, FiniteDuration timeout) { this.jobManager = Preconditions.checkNotNull(jobManager); this.jobID = Preconditions.checkNotNull(jobID); this.vertexID = Preconditions.checkNotNull(vertexID); this.executionID = Preconditions.checkNotNull(executionID); this.timeout = Preconditions.checkNotNull(timeout); } @Override public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException { Preconditions.checkNotNull(userCodeClassLoader); final Future<Object> response = jobManager.ask( new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID), timeout); final Object result; try { result = Await.result(response, timeout); } catch (Exception e) { throw new InputSplitProviderException(“Did not receive next input split from JobManager.”, e); } if(result instanceof JobManagerMessages.NextInputSplit){ final JobManagerMessages.NextInputSplit nextInputSplit = (JobManagerMessages.NextInputSplit) result; byte[] serializedData = nextInputSplit.splitData(); if(serializedData == null) { return null; } else { final Object deserialized; try { deserialized = InstantiationUtil.deserializeObject(serializedData, userCodeClassLoader); } catch (Exception e) { throw new InputSplitProviderException(“Could not deserialize the serialized input split.”, e); } return (InputSplit) deserialized; } } else { throw new InputSplitProviderException(“RequestNextInputSplit requires a response of type " + “NextInputSplit. Instead response is of type " + result.getClass() + ‘.’); } }}TaskInputSplitProvider请求jobManager.ask(new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),timeout)来获取SerializedInputSplitInputSplitflink-core-1.6.2-sources.jar!/org/apache/flink/core/io/InputSplit.java/* * This interface must be implemented by all kind of input splits that can be assigned to input formats. * * <p>Input splits are transferred in serialized form via the messages, so they need to be serializable * as defined by {@link java.io.Serializable}.</p> /@Publicpublic interface InputSplit extends Serializable { /* * Returns the number of this input split. * * @return the number of this input split / int getSplitNumber();}InputSplit是所有类型的input splits必须实现的接口,它InputSplit继承了Serializable,方便进行序列化传输;getSplitNumber返回的是当前split的编号它有四个实现类,其中两个实现类是直接实现该接口,分别是GenericInputSplit、LocatableInputSplit另外两个分别是继承了LocatableInputSplit的FileInputSplit,以及继承了FileInputSplit的TimestampedFileInputSplitGenericInputSplitflink-core-1.6.2-sources.jar!/org/apache/flink/core/io/GenericInputSplit.java/* * A generic input split that has only a partition number. /@Publicpublic class GenericInputSplit implements InputSplit, java.io.Serializable { private static final long serialVersionUID = 1L; /* The number of this split. / private final int partitionNumber; /* The total number of partitions / private final int totalNumberOfPartitions; // ——————————————————————————————– /* * Creates a generic input split with the given split number. * * @param partitionNumber The number of the split’s partition. * @param totalNumberOfPartitions The total number of the splits (partitions). / public GenericInputSplit(int partitionNumber, int totalNumberOfPartitions) { this.partitionNumber = partitionNumber; this.totalNumberOfPartitions = totalNumberOfPartitions; } //…… public String toString() { return “GenericSplit (” + this.partitionNumber + ‘/’ + this.totalNumberOfPartitions + ‘)’; }}GenericInputSplit比较简单,只有两个属性,分别是partitionNumber、totalNumberOfPartitions(本实例的InputSplit为GenericInputSplit类型)LocatableInputSplitflink-core-1.6.2-sources.jar!/org/apache/flink/core/io/LocatableInputSplit.java/* * A locatable input split is an input split referring to input data which is located on one or more hosts. /@Publicpublic class LocatableInputSplit implements InputSplit, java.io.Serializable { private static final long serialVersionUID = 1L; private static final String[] EMPTY_ARR = new String[0]; /* The number of the split. / private final int splitNumber; /* The names of the hosts storing the data this input split refers to. / private final String[] hostnames; // ——————————————————————————————– /* * Creates a new locatable input split that refers to a multiple host as its data location. * * @param splitNumber The number of the split * @param hostnames The names of the hosts storing the data this input split refers to. / public LocatableInputSplit(int splitNumber, String[] hostnames) { this.splitNumber = splitNumber; this.hostnames = hostnames == null ? EMPTY_ARR : hostnames; } /* * Creates a new locatable input split that refers to a single host as its data location. * * @param splitNumber The number of the split. * @param hostname The names of the host storing the data this input split refers to. / public LocatableInputSplit(int splitNumber, String hostname) { this.splitNumber = splitNumber; this.hostnames = hostname == null ? EMPTY_ARR : new String[] { hostname }; } //…… @Override public String toString() { return “Locatable Split (” + splitNumber + “) at " + Arrays.toString(this.hostnames); }}LocatableInputSplit是可定位的input split,它有两个属性,分别是splitNumber以及该split对应的数据所在的hostnamesIteratorInputFormatflink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/IteratorInputFormat.java/* * An input format that returns objects from an iterator. /@PublicEvolvingpublic class IteratorInputFormat<T> extends GenericInputFormat<T> implements NonParallelInput { private static final long serialVersionUID = 1L; private Iterator<T> iterator; // input data as serializable iterator public IteratorInputFormat(Iterator<T> iterator) { if (!(iterator instanceof Serializable)) { throw new IllegalArgumentException(“The data source iterator must be serializable.”); } this.iterator = iterator; } @Override public boolean reachedEnd() { return !this.iterator.hasNext(); } @Override public T nextRecord(T record) { return this.iterator.next(); }}IteratorInputFormat主要是对Iterator进行了包装,实现了reachedEnd、nextRecord接口;它继承了GenericInputFormatGenericInputFormatflink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/GenericInputFormat.java/* * Generic base class for all Rich inputs that are not based on files. /@Publicpublic abstract class GenericInputFormat<OT> extends RichInputFormat<OT, GenericInputSplit> { private static final long serialVersionUID = 1L; /* * The partition of this split. / protected int partitionNumber; // ——————————————————————————————– @Override public void configure(Configuration parameters) { // nothing by default } @Override public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { // no statistics available, by default. return cachedStatistics; } @Override public GenericInputSplit[] createInputSplits(int numSplits) throws IOException { if (numSplits < 1) { throw new IllegalArgumentException(“Number of input splits has to be at least 1.”); } numSplits = (this instanceof NonParallelInput) ? 1 : numSplits; GenericInputSplit[] splits = new GenericInputSplit[numSplits]; for (int i = 0; i < splits.length; i++) { splits[i] = new GenericInputSplit(i, numSplits); } return splits; } @Override public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) { return new DefaultInputSplitAssigner(splits); } // ——————————————————————————————– @Override public void open(GenericInputSplit split) throws IOException { this.partitionNumber = split.getSplitNumber(); } @Override public void close() throws IOException {}}RpcInputSplitProvider是调用JobMaster.requestNextInputSplit来获取SerializedInputSplit,而JobMaster是调用splitAssigner.getNextInputSplit(host, taskId),这里的splitAssigner,即为DefaultInputSplitAssigner(从vertex.getSplitAssigner()获取)而vertex.getSplitAssigner()返回的splitAssigner,是ExecutionJobVertex在构造器里头根据splitSource.getInputSplitAssigner(splitSource.createInputSplits(numTaskVertices))得来的而splitSource即为这里的IteratorInputFormat,而IteratorInputFormat的createInputSplits(根据numTaskVertices来分割)及getInputSplitAssigner方法均为父类GenericInputFormat提供DefaultInputSplitAssignerflink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java/* * This is the default implementation of the {@link InputSplitAssigner} interface. The default input split assigner * simply returns all input splits of an input vertex in the order they were originally computed. /@Internalpublic class DefaultInputSplitAssigner implements InputSplitAssigner { /* The logging object used to report information and errors. / private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class); /* The list of all splits */ private final List<InputSplit> splits = new ArrayList<InputSplit>(); public DefaultInputSplitAssigner(InputSplit[] splits) { Collections.addAll(this.splits, splits); } public DefaultInputSplitAssigner(Collection<? extends InputSplit> splits) { this.splits.addAll(splits); } @Override public InputSplit getNextInputSplit(String host, int taskId) { InputSplit next = null; // keep the synchronized part short synchronized (this.splits) { if (this.splits.size() > 0) { next = this.splits.remove(this.splits.size() - 1); } } if (LOG.isDebugEnabled()) { if (next == null) { LOG.debug(“No more input splits available”); } else { LOG.debug(“Assigning split " + next + " to " + host); } } return next; }}DefaultInputSplitAssigner仅仅是按顺序返回InputSplit小结InputFormatSourceFunction是一个使用InputFormat来读取数据的SourceFunction,它继承了RichParallelSourceFunction,新增了带有2个参数的构造器,一个是InputFormat,一个是TypeInformation本实例使用的IteratorInputFormat继承了GenericInputFormat,后者提供了两个重要的方法,一个是createInputSplits(这里是根据numTaskVertices来分割),一个是getInputSplitAssigner(这里创建的是DefaultInputSplitAssigner,即按顺序返回分割好的InputSplit)InputFormatSourceFunction的run方法主要是挨个调用splitIterator.next(),并用InputFormat去open该InputSplit,然后调用format.nextRecord来挨个读取该InputSplit的每个record,最后使用SourceContext的emit方法发射出去可以看到整个大的逻辑就是GenericInputFormat提供将input分割为InputSplit的方法,同时提供InputSplitAssigner,然后InputFormatSourceFunction就是挨个遍历分割好的属于自己(Task)的InputSplit(通过InputSplitAssigner获取),然后通过InputFormat读取InputSplit来挨个获取这个InputSplit的每个元素,然后通过SourceContext的emit方法发射出去docInputFormatSourceFunction ...

November 30, 2018 · 8 min · jiezi

聊聊flink的RichParallelSourceFunction

序本文主要研究一下flink的RichParallelSourceFunctionRichParallelSourceFunction/** * Base class for implementing a parallel data source. Upon execution, the runtime will * execute as many parallel instances of this function function as configured parallelism * of the source. * * <p>The data source has access to context information (such as the number of parallel * instances of the source, and which parallel instance the current instance is) * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p> * * @param <OUT> The type of the records produced by this source. /@Publicpublic abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements ParallelSourceFunction<OUT> { private static final long serialVersionUID = 1L;}RichParallelSourceFunction实现了ParallelSourceFunction接口,同时继承了AbstractRichFunctionParallelSourceFunctionflink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java/* * A stream data source that is executed in parallel. Upon execution, the runtime will * execute as many parallel instances of this function function as configured parallelism * of the source. * * <p>This interface acts only as a marker to tell the system that this source may * be executed in parallel. When different parallel instances are required to perform * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime * context, which reveals information like the number of parallel tasks, and which parallel * task the current instance is. * * @param <OUT> The type of the records produced by this source. /@Publicpublic interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {}ParallelSourceFunction继承了SourceFunction接口,它并没有定义其他额外的方法,仅仅是用接口名来表达意图,即可以被并行执行的stream data sourceAbstractRichFunctionflink-core-1.6.2-sources.jar!/org/apache/flink/api/common/functions/AbstractRichFunction.java/* * An abstract stub implementation for rich user-defined functions. * Rich functions have additional methods for initialization ({@link #open(Configuration)}) and * teardown ({@link #close()}), as well as access to their runtime execution context via * {@link #getRuntimeContext()}. /@Publicpublic abstract class AbstractRichFunction implements RichFunction, Serializable { private static final long serialVersionUID = 1L; // ——————————————————————————————– // Runtime context access // ——————————————————————————————– private transient RuntimeContext runtimeContext; @Override public void setRuntimeContext(RuntimeContext t) { this.runtimeContext = t; } @Override public RuntimeContext getRuntimeContext() { if (this.runtimeContext != null) { return this.runtimeContext; } else { throw new IllegalStateException(“The runtime context has not been initialized.”); } } @Override public IterationRuntimeContext getIterationRuntimeContext() { if (this.runtimeContext == null) { throw new IllegalStateException(“The runtime context has not been initialized.”); } else if (this.runtimeContext instanceof IterationRuntimeContext) { return (IterationRuntimeContext) this.runtimeContext; } else { throw new IllegalStateException(“This stub is not part of an iteration step function.”); } } // ——————————————————————————————– // Default life cycle methods // ——————————————————————————————– @Override public void open(Configuration parameters) throws Exception {} @Override public void close() throws Exception {}}AbstractRichFunction主要实现了RichFunction接口的setRuntimeContext、getRuntimeContext、getIterationRuntimeContext方法;open及close方法都是空操作RuntimeContextflink-core-1.6.2-sources.jar!/org/apache/flink/api/common/functions/RuntimeContext.java/* * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance * of the function will have a context through which it can access static contextual information (such as * the current parallelism) and other constructs like accumulators and broadcast variables. * * <p>A function can, during runtime, obtain the RuntimeContext via a call to * {@link AbstractRichFunction#getRuntimeContext()}. /@Publicpublic interface RuntimeContext { /* * Returns the name of the task in which the UDF runs, as assigned during plan construction. * * @return The name of the task in which the UDF runs. / String getTaskName(); /* * Returns the metric group for this parallel subtask. * * @return The metric group for this parallel subtask. / @PublicEvolving MetricGroup getMetricGroup(); /* * Gets the parallelism with which the parallel task runs. * * @return The parallelism with which the parallel task runs. / int getNumberOfParallelSubtasks(); /* * Gets the number of max-parallelism with which the parallel task runs. * * @return The max-parallelism with which the parallel task runs. / @PublicEvolving int getMaxNumberOfParallelSubtasks(); /* * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to * parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}). * * @return The index of the parallel subtask. / int getIndexOfThisSubtask(); /* * Gets the attempt number of this parallel subtask. First attempt is numbered 0. * * @return Attempt number of the subtask. / int getAttemptNumber(); /* * Returns the name of the task, appended with the subtask indicator, such as “MyTask (3/6)”, * where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be * {@link #getNumberOfParallelSubtasks()}. * * @return The name of the task, with subtask indicator. / String getTaskNameWithSubtasks(); /* * Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing * job. / ExecutionConfig getExecutionConfig(); //…….}RuntimeContext定义了很多方法,这里我们看下getNumberOfParallelSubtasks方法,它可以返回当前的task的parallelism;而getIndexOfThisSubtask则可以获取当前parallel subtask的下标;可以根据这些信息,开发既能并行执行但各自发射的数据又不重复的ParallelSourceFunctionJobMaster.startJobExecutionflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/jobmaster/JobMaster.java private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception { validateRunsInMainThread(); checkNotNull(newJobMasterId, “The new JobMasterId must not be null.”); if (Objects.equals(getFencingToken(), newJobMasterId)) { log.info(“Already started the job execution with JobMasterId {}.”, newJobMasterId); return Acknowledge.get(); } setNewFencingToken(newJobMasterId); startJobMasterServices(); log.info(“Starting execution of job {} ({})”, jobGraph.getName(), jobGraph.getJobID()); resetAndScheduleExecutionGraph(); return Acknowledge.get(); } private void resetAndScheduleExecutionGraph() throws Exception { validateRunsInMainThread(); final CompletableFuture<Void> executionGraphAssignedFuture; if (executionGraph.getState() == JobStatus.CREATED) { executionGraphAssignedFuture = CompletableFuture.completedFuture(null); } else { suspendAndClearExecutionGraphFields(new FlinkException(“ExecutionGraph is being reset in order to be rescheduled.”)); final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); final ExecutionGraph newExecutionGraph = createAndRestoreExecutionGraph(newJobManagerJobMetricGroup); executionGraphAssignedFuture = executionGraph.getTerminationFuture().handleAsync( (JobStatus ignored, Throwable throwable) -> { assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup); return null; }, getMainThreadExecutor()); } executionGraphAssignedFuture.thenRun(this::scheduleExecutionGraph); } private void scheduleExecutionGraph() { checkState(jobStatusListener == null); // register self as job status change listener jobStatusListener = new JobManagerJobStatusListener(); executionGraph.registerJobStatusListener(jobStatusListener); try { executionGraph.scheduleForExecution(); } catch (Throwable t) { executionGraph.failGlobal(t); } }这里调用了resetAndScheduleExecutionGraph方法,而resetAndScheduleExecutionGraph则组合了scheduleExecutionGraph方法;scheduleExecutionGraph这里调用executionGraph.scheduleForExecution()来调度执行ExecutionGraph.scheduleForExecutionflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionGraph.java public void scheduleForExecution() throws JobException { final long currentGlobalModVersion = globalModVersion; if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { final CompletableFuture<Void> newSchedulingFuture; switch (scheduleMode) { case LAZY_FROM_SOURCES: newSchedulingFuture = scheduleLazy(slotProvider); break; case EAGER: newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout); break; default: throw new JobException(“Schedule mode is invalid.”); } if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) { schedulingFuture = newSchedulingFuture; newSchedulingFuture.whenCompleteAsync( (Void ignored, Throwable throwable) -> { if (throwable != null && !(throwable instanceof CancellationException)) { // only fail if the scheduling future was not canceled failGlobal(ExceptionUtils.stripCompletionException(throwable)); } }, futureExecutor); } else { newSchedulingFuture.cancel(false); } } else { throw new IllegalStateException(“Job may only be scheduled from state " + JobStatus.CREATED); } }这里走的是EAGER模式,因而调用scheduleEager方法ExecutionGraph.scheduleEagerflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionGraph.java /* * * * @param slotProvider The resource provider from which the slots are allocated * @param timeout The maximum time that the deployment may take, before a * TimeoutException is thrown. * @returns Future which is completed once the {@link ExecutionGraph} has been scheduled. * The future can also be completed exceptionally if an error happened. / private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) { checkState(state == JobStatus.RUNNING, “job is not running currently”); // Important: reserve all the space we need up front. // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost final boolean queued = allowQueuedScheduling; // collecting all the slots may resize and fail in that operation without slots getting lost final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); // allocate the slots (obtain all their futures for (ExecutionJobVertex ejv : getVerticesTopologically()) { // these calls are not blocking, they only return futures Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll( slotProvider, queued, LocationPreferenceConstraint.ALL, allocationTimeout); allAllocationFutures.addAll(allocationFutures); } // this future is complete once all slot futures are complete. // the future fails once one slot future fails. final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures); final CompletableFuture<Void> currentSchedulingFuture = allAllocationsFuture .thenAccept( (Collection<Execution> executionsToDeploy) -> { for (Execution execution : executionsToDeploy) { try { execution.deploy(); } catch (Throwable t) { throw new CompletionException( new FlinkException( String.format(“Could not deploy execution %s.”, execution), t)); } } }) // Generate a more specific failure message for the eager scheduling .exceptionally( (Throwable throwable) -> { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); final Throwable resultThrowable; if (strippedThrowable instanceof TimeoutException) { int numTotal = allAllocationsFuture.getNumFuturesTotal(); int numComplete = allAllocationsFuture.getNumFuturesCompleted(); String message = “Could not allocate all requires slots within timeout of " + timeout + “. Slots required: " + numTotal + “, slots allocated: " + numComplete; resultThrowable = new NoResourceAvailableException(message); } else { resultThrowable = strippedThrowable; } throw new CompletionException(resultThrowable); }); return currentSchedulingFuture; }scheduleEager方法这里先调用getVerticesTopologically来获取ExecutionJobVertex之后调用ExecutionJobVertex.allocateResourcesForAll来分配资源得到Collection<CompletableFuture<Execution>>最后通过FutureUtils.combineAll(allAllocationFutures)等待这批Future,之后挨个调用execution.deploy()进行部署ExecutionJobVertex.allocateResourcesForAllflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java /* * Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns * pairs of the slots and execution attempts, to ease correlation between vertices and execution * attempts. * * <p>If this method throws an exception, it makes sure to release all so far requested slots. * * @param resourceProvider The resource provider from whom the slots are requested. * @param queued if the allocation can be queued * @param locationPreferenceConstraint constraint for the location preferences * @param allocationTimeout timeout for allocating the individual slots / public Collection<CompletableFuture<Execution>> allocateResourcesForAll( SlotProvider resourceProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, Time allocationTimeout) { final ExecutionVertex[] vertices = this.taskVertices; final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length]; // try to acquire a slot future for each execution. // we store the execution with the future just to be on the safe side for (int i = 0; i < vertices.length; i++) { // allocate the next slot (future) final Execution exec = vertices[i].getCurrentExecutionAttempt(); final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution( resourceProvider, queued, locationPreferenceConstraint, allocationTimeout); slots[i] = allocationFuture; } // all good, we acquired all slots return Arrays.asList(slots); }这里根据ExecutionJobVertex的taskVertices来挨个调用exec.allocateAndAssignSlotForExecution进行分配;可以发现整个并行度由taskVertices来决定Execution.deployflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.java /* * Deploys the execution to the previously assigned resource. * * @throws JobException if the execution cannot be deployed to the assigned resource */ public void deploy() throws JobException { final LogicalSlot slot = assignedResource; checkNotNull(slot, “In order to deploy the execution we first have to assign a resource via tryAssignResource.”); //…… try { //…… final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, taskRestore, attemptNumber); // null taskRestore to let it be GC’ed taskRestore = null; final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout); submitResultFuture.whenCompleteAsync( (ack, failure) -> { // only respond to the failure case if (failure != null) { if (failure instanceof TimeoutException) { String taskname = vertex.getTaskNameWithSubtaskIndex() + " (” + attemptId + ‘)’; markFailed(new Exception( “Cannot deploy task " + taskname + " - TaskManager (” + getAssignedResourceLocation() + “) not responding after a rpcTimeout of " + rpcTimeout, failure)); } else { markFailed(failure); } } }, executor); } catch (Throwable t) { markFailed(t); ExceptionUtils.rethrow(t); } }Execution.deploy会创建TaskDeploymentDescriptor,之后通过taskManagerGateway.submitTask提交这个deployment;之后就是触发TaskExecutor去触发Task的run方法ExecutionJobVertexflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java private final ExecutionVertex[] taskVertices; public ExecutionJobVertex( ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, Time timeout, long initialGlobalModVersion, long createTimestamp) throws JobException { if (graph == null || jobVertex == null) { throw new NullPointerException(); } this.graph = graph; this.jobVertex = jobVertex; int vertexParallelism = jobVertex.getParallelism(); int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism; final int configuredMaxParallelism = jobVertex.getMaxParallelism(); this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism); // if no max parallelism was configured by the user, we calculate and set a default setMaxParallelismInternal(maxParallelismConfigured ? configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices)); // verify that our parallelism is not higher than the maximum parallelism if (numTaskVertices > maxParallelism) { throw new JobException( String.format(“Vertex %s’s parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.”, jobVertex.getName(), numTaskVertices, maxParallelism)); } this.parallelism = numTaskVertices; this.serializedTaskInformation = null; this.taskVertices = new ExecutionVertex[numTaskVertices]; //…… // create all task vertices for (int i = 0; i < numTaskVertices; i++) { ExecutionVertex vertex = new ExecutionVertex( this, i, producedDataSets, timeout, initialGlobalModVersion, createTimestamp, maxPriorAttemptsHistoryLength); this.taskVertices[i] = vertex; } //…… }taskVertices是一个ExecutionVertex[],它的大小由numTaskVertices决定ExecutionJobVertex先判断jobVertex.getParallelism()是否大于0(一般大于0),大于0则取jobVertex.getParallelism()的值为numTaskVertices;如果不大于0则取defaultParallelism(ExecutionGraph的attachJobGraph方法里头创建ExecutionJobVertex时,传递的defaultParallelism为1)之后就是根据numTaskVertices挨个创建ExecutionVertex,放入到taskVertices数据中而jobVertex的parallelism是StreamingJobGraphGenerator在createJobVertex方法中根据streamNode.getParallelism()来设置的(如果streamNode.getParallelism()的值大于0的话)streamNode的parallelism如果自己没有设置,则默认是取StreamExecutionEnvironment的parallelism(详见DataStreamSource的构造器、DataStream.transform方法、DataStreamSink的构造器;DataStreamSource里头会将不是parallel类型的source的parallelism重置为1);如果是LocalEnvironment的话,它默认是取Runtime.getRuntime().availableProcessors()小结RichParallelSourceFunction实现了ParallelSourceFunction接口,同时继承了AbstractRichFunction;AbstractRichFunction主要实现了RichFunction接口的setRuntimeContext、getRuntimeContext、getIterationRuntimeContext方法;RuntimeContext定义的getNumberOfParallelSubtasks方法(返回当前的task的parallelism)以及getIndexOfThisSubtask(获取当前parallel subtask的下标)方法,可以方便开发既能并行执行但各自发射的数据又不重复的ParallelSourceFunctionJobMaster在startJobExecution的时候调用executionGraph.scheduleForExecution()进行调度;期间通过ExecutionJobVertex.allocateResourcesForAll来分配资源得到Collection<CompletableFuture<Execution>>,之后挨个执行execution.deploy()进行部署;Execution.deploy会创建TaskDeploymentDescriptor,之后通过taskManagerGateway.submitTask提交这个deployment;之后就是触发TaskExecutor去触发Task的run方法ExecutionJobVertex.allocateResourcesForAll是根据ExecutionJobVertex的taskVertices来挨个调用exec.allocateAndAssignSlotForExecution进行分配,整个并行度由taskVertices来决定;而taskVertices是在ExecutionJobVertex构造器里头初始化的,如果jobVertex.getParallelism()大于0则取该值,否则取defaultParallelism为1;而jobVertex的parallelism是StreamingJobGraphGenerator在createJobVertex方法中根据streamNode.getParallelism()来设置(如果streamNode.getParallelism()的值大于0的话),如果用户没有设置则默认是取StreamExecutionEnvironment的parallelism;LocalEnvironment的话,它默认是取Runtime.getRuntime().availableProcessors()docRichParallelSourceFunction ...

November 28, 2018 · 9 min · jiezi

[case47]聊聊flink的BoltWrapper

序本文主要研究一下flink的BoltWrapperBoltWrapperflink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltWrapper.java/** * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming program. * It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the bolt can * process. Furthermore, it takes the bolt’s output tuples and transforms them into Flink tuples of type {@code OUT} * (see {@link AbstractStormCollector} for supported types).<br/> * <br/> * <strong>Works for single input streams only! See {@link MergedInputsBoltWrapper} for multi-input stream * Bolts.</strong> /public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> { @Override public void open() throws Exception { super.open(); this.flinkCollector = new TimestampedCollector<>(this.output); GlobalJobParameters config = getExecutionConfig().getGlobalJobParameters(); StormConfig stormConfig = new StormConfig(); if (config != null) { if (config instanceof StormConfig) { stormConfig = (StormConfig) config; } else { stormConfig.putAll(config.toMap()); } } this.topologyContext = WrapperSetupHelper.createTopologyContext( getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig); final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>( this.numberOfAttributes, this.topologyContext.getThisTaskId(), this.flinkCollector)); if (this.stormTopology != null) { Map<GlobalStreamId, Grouping> inputs = this.topologyContext.getThisSources(); for (GlobalStreamId inputStream : inputs.keySet()) { for (Integer tid : this.topologyContext.getComponentTasks(inputStream .get_componentId())) { this.inputComponentIds.put(tid, inputStream.get_componentId()); this.inputStreamIds.put(tid, inputStream.get_streamId()); this.inputSchemas.put(tid, this.topologyContext.getComponentOutputFields(inputStream)); } } } this.bolt.prepare(stormConfig, this.topologyContext, stormCollector); } @Override public void dispose() throws Exception { super.dispose(); this.bolt.cleanup(); } @Override public void processElement(final StreamRecord<IN> element) throws Exception { this.flinkCollector.setTimestamp(element); IN value = element.getValue(); if (this.stormTopology != null) { Tuple tuple = (Tuple) value; Integer producerTaskId = tuple.getField(tuple.getArity() - 1); this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(producerTaskId), producerTaskId, this.inputStreamIds.get(producerTaskId), this.inputComponentIds .get(producerTaskId), MessageId.makeUnanchored())); } else { this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(null), -1, null, null, MessageId.makeUnanchored())); } }}flink用BoltWrapper来包装storm的IRichBolt,它实现了OneInputStreamOperator接口,继承AbstractStreamOperator类OneInputStreamOperator接口继承了StreamOperator接口,额外定义了processElement、processWatermark、processLatencyMarker三个接口AbstractStreamOperator类实现的是StreamOperator接口,但是里头帮忙实现了processWatermark、processLatencyMarker这两个接口BoltWrapper里头主要是实现OneInputStreamOperator接口的processElement方法,然后是覆盖StreamOperator接口定义的open及dispose方法open方法有个要点就是调用bolt的prepare方法,传入包装BoltCollector的OutputCollector,通过BoltCollector来收集bolt发射的数据到flink,它使用的是flink的TimestampedCollectorBoltCollectorflink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltCollector.java/* * A {@link BoltCollector} is used by {@link BoltWrapper} to provided an Storm compatible * output collector to the wrapped bolt. It transforms the emitted Storm tuples into Flink tuples * and emits them via the provide {@link Output} object. /class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector { /* The Flink output Collector. / private final Collector<OUT> flinkOutput; /* * Instantiates a new {@link BoltCollector} that emits Flink tuples to the given Flink output object. If the * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. * * @param numberOfAttributes * The number of attributes of the emitted tuples per output stream. * @param taskId * The ID of the producer task (negative value for unknown). * @param flinkOutput * The Flink output object to be used. * @throws UnsupportedOperationException * if the specified number of attributes is greater than 25 / BoltCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId, final Collector<OUT> flinkOutput) throws UnsupportedOperationException { super(numberOfAttributes, taskId); assert (flinkOutput != null); this.flinkOutput = flinkOutput; } @Override protected List<Integer> doEmit(final OUT flinkTuple) { this.flinkOutput.collect(flinkTuple); // TODO return null; } @Override public void reportError(final Throwable error) { // not sure, if Flink can support this } @Override public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) { return this.tansformAndEmit(streamId, tuple); } @Override public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) { throw new UnsupportedOperationException(“Direct emit is not supported by Flink”); } @Override public void ack(final Tuple input) {} @Override public void fail(final Tuple input) {} @Override public void resetTimeout(Tuple var1) {}}BoltCollector实现了storm的IOutputCollector接口,只是ack、fail、resetTimeout、reportError操作都为空,不支持emitDirect操作doEmit方法调用的是flinkOutput.collect(flinkTuple)emit方法调用的是tansformAndEmit(streamId, tuple),它由继承的父类AbstractStormCollector实现TimestampedCollector.collectflink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/TimestampedCollector.java/* * Wrapper around an {@link Output} for user functions that expect a {@link Collector}. * Before giving the {@link TimestampedCollector} to a user function you must set * the timestamp that should be attached to emitted elements. Most operators * would set the timestamp of the incoming * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here. * * @param <T> The type of the elements that can be emitted. /@Internalpublic class TimestampedCollector<T> implements Collector<T> { private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; /* * Creates a new {@link TimestampedCollector} that wraps the given {@link Output}. / public TimestampedCollector(Output<StreamRecord<T>> output) { this.output = output; this.reuse = new StreamRecord<T>(null); } @Override public void collect(T record) { output.collect(reuse.replace(record)); } public void setTimestamp(StreamRecord<?> timestampBase) { if (timestampBase.hasTimestamp()) { reuse.setTimestamp(timestampBase.getTimestamp()); } else { reuse.eraseTimestamp(); } } public void setAbsoluteTimestamp(long timestamp) { reuse.setTimestamp(timestamp); } public void eraseTimestamp() { reuse.eraseTimestamp(); } @Override public void close() { output.close(); }}TimestampedCollector实现了flink的Collector接口,这里头额外新增了setTimestamp、setAbsoluteTimestamp、eraseTimestamp方法它使用了StreamRecord对象,它里头有value、timestamp、hasTimestamp三个属性,可以将value与时间戳关联起来这里的collect方法调用了StreamRecord的replace返回的对象,replace方法只是更新了value引用,但是里头的时间戳没有更新AbstractStormCollector.tansformAndEmitflink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/AbstractStormCollector.java /* * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)} * to the specified output stream. * * @param The * The output stream id. * @param tuple * The Storm tuple to be emitted. * @return the return value of {@link #doEmit(Object)} / @SuppressWarnings(“unchecked”) protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) { List<Integer> taskIds; int numAtt = this.numberOfAttributes.get(streamId); int taskIdIdx = numAtt; if (this.taskId >= 0 && numAtt < 0) { numAtt = 1; taskIdIdx = 0; } if (numAtt >= 0) { assert (tuple.size() == numAtt); Tuple out = this.outputTuple.get(streamId); for (int i = 0; i < numAtt; ++i) { out.setField(tuple.get(i), i); } if (this.taskId >= 0) { out.setField(this.taskId, taskIdIdx); } if (this.split) { this.splitTuple.streamId = streamId; this.splitTuple.value = out; taskIds = doEmit((OUT) this.splitTuple); } else { taskIds = doEmit((OUT) out); } } else { assert (tuple.size() == 1); if (this.split) { this.splitTuple.streamId = streamId; this.splitTuple.value = tuple.get(0); taskIds = doEmit((OUT) this.splitTuple); } else { taskIds = doEmit((OUT) tuple.get(0)); } } this.tupleEmitted = true; return taskIds; }AbstractStormCollector.tansformAndEmit,这里主要处理了split的场景,即一个bolt declare了多个stream,最后都通过子类BoltCollector.doEmit来发射数据如果split为true,则传给doEmit方法的是splitTuple,即SplitStreamType,它记录了streamId及其value如果split为false,则传给doEmit方法的是Tuple类型,即相当于SplitStreamType中的value,相比于SplitStreamType少了streamId信息Task.runflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java/* * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * <p>The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task’s configuration, and the IDs of the intermediate results to consume and * produce (if any). * * <p>Each Task is run by one dedicated thread. /public class Task implements Runnable, TaskActions, CheckpointListener { //…… /* * The core work method that bootstraps the task and executes its code. / @Override public void run() { //…… // now load and instantiate the task’s invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // —————————————————————- // actual task core work // —————————————————————- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //…… }}Task的run方法会调用invokable.invoke(),这里的invokable为StreamTaskStreamTask.invokeflink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java/* * Base class for all streaming tasks. A task is the unit of local processing that is deployed * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form * the Task’s operator chain. Operators that are chained together execute synchronously in the * same thread and hence on the same stream partition. A common case for these chains * are successive map/flatmap/filter tasks. * * <p>The task chain contains one “head” operator and multiple chained operators. * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, * as well as for sources, iteration heads and iteration tails. * * <p>The Task class deals with the setup of the streams read by the head operator, and the streams * produced by the operators at the ends of the operator chain. Note that the chain may fork and * thus have multiple ends. * * <p>The life cycle of the task is set up as follows: * <pre>{@code * – setInitialState -> provides state of all operators in the chain * * – invoke() * | * +—-> Create basic utils (config, etc) and load the chain of operators * +—-> operators.setup() * +—-> task specific init() * +—-> initialize-operator-states() * +—-> open-operators() * +—-> run() * +—-> close-operators() * +—-> dispose-operators() * +—-> common cleanup * +—-> task specific cleanup() * }</pre> * * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods * are called concurrently. * * @param <OUT> * @param <OP> */@Internalpublic abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { //…… @Override public final void invoke() throws Exception { boolean disposed = false; try { //…… // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the “clean shutdown” is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug(“Finished task {}”, getName()); //…… } finally { // clean up everything we initialized isRunning = false; //…… } }}StreamTask的invoke方法里头调用了子类的run方法,这里子类为OneInputStreamTaskOneInputStreamTask.runflink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @Override protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor<IN> inputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the “processInput” method } }该run方法主要是调用inputProcessor.processInput(),这里的inputProcessor为StreamInputProcessorStreamInputProcessor.processInputflink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java public boolean processInput() throws Exception { if (isFinished) { return false; } if (numRecordsIn == null) { try { numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); } catch (Exception e) { LOG.warn(“An exception occurred during the metrics setup.”, e); numRecordsIn = new SimpleCounter(); } } while (true) { if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { StreamElement recordOrMark = deserializationDelegate.getInstance(); if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); continue; } else if (recordOrMark.isStreamStatus()) { // handle stream status statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); continue; } else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } continue; } else { // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } return true; } } } //…… } }该processInput方法,先是通过currentRecordDeserializer.getNextRecord(deserializationDelegate)获取nextRecord,之后有调用到streamOperator.processElement(record)来处理,这里的streamOperator为BoltWrapper小结flink用BoltWrapper来包装storm的IRichBolt,它实现OneInputStreamOperator接口的processElement方法,在该方法中执行bolt.execute方法;另外在实现StreamOperator的open方法中调用bolt的prepare方法,传入包装BoltCollector的OutputCollector,通过BoltCollector来收集bolt.execute时发射的数据到flink,它使用的是flink的TimestampedCollectorBoltCollector的emit方法内部调用了AbstractStormCollector.tansformAndEmit(它最后调用BoltCollector.doEmit方法来发射),针对多个stream的场景,封装了SplitStreamType的tuple给到doEmit方法;如果只有一个stream,则仅仅将普通的tuple传给doEmit方法flink的Task的run方法会调用StreamTask的invoke方法,而StreamTask的invoke方法会调用子类(这里子类为OneInputStreamTask)的run方法,OneInputStreamTask的run方法是不断循环调用inputProcessor.processInput(),这里的inputProcessor为StreamInputProcessor,它的processInput()会调用currentRecordDeserializer.getNextRecord(deserializationDelegate)获取nextRecord,之后根据条件选择调用streamOperator.processElement(record)方法,这里的streamOperator为BoltWrapper,而BoltWrapper的processElement正好调用storm bolt的execute方法来执行bolt逻辑并使用flink的BoltCollector进行发射docStorm Compatibility Beta ...

November 25, 2018 · 9 min · jiezi

聊聊flink的SpoutWrapper

序本文主要研究一下flink的SpoutWrapperSpoutWrapperflink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutWrapper.java/** * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It * takes the spout’s output tuples and transforms them into Flink tuples of type {@code OUT} (see * {@link SpoutCollector} for supported types).<br> * <br> * Per default, {@link SpoutWrapper} calls the wrapped spout’s {@link IRichSpout#nextTuple() nextTuple()} method in * an infinite loop.<br> * Alternatively, {@link SpoutWrapper} can call {@link IRichSpout#nextTuple() nextTuple()} for a finite number of * times and terminate automatically afterwards (for finite input streams). The number of {@code nextTuple()} calls can * be specified as a certain number of invocations or can be undefined. In the undefined case, {@link SpoutWrapper} * terminates if no record was emitted to the output collector for the first time during a call to * {@link IRichSpout#nextTuple() nextTuple()}.<br> * If the given spout implements {@link FiniteSpout} interface and {@link #numberOfInvocations} is not provided or * is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() nextTuple()} method until * {@link FiniteSpout#reachedEnd()} returns true. /public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction { //…… /* The number of {@link IRichSpout#nextTuple()} calls. / private Integer numberOfInvocations; // do not use int -> null indicates an infinite loop /* * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of * the given {@link IRichSpout spout} a finite number of times. The output type will be one of {@link Tuple0} to * {@link Tuple25} depending on the spout’s declared number of attributes. * * @param spout * The {@link IRichSpout spout} to be used. * @param numberOfInvocations * The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper} * terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is * disabled. * @throws IllegalArgumentException * If the number of declared output attributes is not with range [0;25]. / public SpoutWrapper(final IRichSpout spout, final Integer numberOfInvocations) throws IllegalArgumentException { this(spout, (Collection<String>) null, numberOfInvocations); } /* * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of * the given {@link IRichSpout spout} in an infinite loop. The output type will be one of {@link Tuple0} to * {@link Tuple25} depending on the spout’s declared number of attributes. * * @param spout * The {@link IRichSpout spout} to be used. * @throws IllegalArgumentException * If the number of declared output attributes is not with range [0;25]. / public SpoutWrapper(final IRichSpout spout) throws IllegalArgumentException { this(spout, (Collection<String>) null, null); } @Override public final void run(final SourceContext<OUT> ctx) throws Exception { final GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig() .getGlobalJobParameters(); StormConfig stormConfig = new StormConfig(); if (config != null) { if (config instanceof StormConfig) { stormConfig = (StormConfig) config; } else { stormConfig.putAll(config.toMap()); } } final TopologyContext stormTopologyContext = WrapperSetupHelper.createTopologyContext( (StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name, this.stormTopology, stormConfig); SpoutCollector<OUT> collector = new SpoutCollector<OUT>(this.numberOfAttributes, stormTopologyContext.getThisTaskId(), ctx); this.spout.open(stormConfig, stormTopologyContext, new SpoutOutputCollector(collector)); this.spout.activate(); if (numberOfInvocations == null) { if (this.spout instanceof FiniteSpout) { final FiniteSpout finiteSpout = (FiniteSpout) this.spout; while (this.isRunning && !finiteSpout.reachedEnd()) { finiteSpout.nextTuple(); } } else { while (this.isRunning) { this.spout.nextTuple(); } } } else { int counter = this.numberOfInvocations; if (counter >= 0) { while ((–counter >= 0) && this.isRunning) { this.spout.nextTuple(); } } else { do { collector.tupleEmitted = false; this.spout.nextTuple(); } while (collector.tupleEmitted && this.isRunning); } } } /* * {@inheritDoc} * * <p>Sets the {@link #isRunning} flag to {@code false}. / @Override public void cancel() { this.isRunning = false; } /* * {@inheritDoc} * * <p>Sets the {@link #isRunning} flag to {@code false}. / @Override public void stop() { this.isRunning = false; } @Override public void close() throws Exception { this.spout.close(); }}SpoutWrapper继承了RichParallelSourceFunction类,实现了StoppableFunction接口的stop方法SpoutWrapper的run方法创建了flink的SpoutCollector作为storm的SpoutOutputCollector的构造器参数,之后调用spout的open方法,把包装了SpoutCollector(flink)的SpoutOutputCollector传递给spout,用来收集spout发射的数据之后就是根据numberOfInvocations参数来调用spout.nextTuple()方法来发射数据;numberOfInvocations是控制调用spout的nextTuple的次数,它可以在创建SpoutWrapper的时候在构造器中设置,如果使用没有numberOfInvocations参数的构造器,则该值为null,表示infinite loopflink对storm的spout有进行封装,提供了FiniteSpout接口,它有个reachedEnd接口用来判断数据是否发送完毕,来将storm的spout改造为finite模式;这里如果使用的是storm原始的spout,则就是一直循环调用nextTuple方法如果有设置numberOfInvocations而且大于等于0,则根据指定的次数来调用nextTuple方法;如果该值小于0,则根据collector.tupleEmitted值来判断是否终止循环SpoutCollectorflink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutCollector.java/* * A {@link SpoutCollector} is used by {@link SpoutWrapper} to provided an Storm * compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into * Flink tuples and emits them via the provide {@link SourceContext} object. /class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector { /* The Flink source context object. / private final SourceContext<OUT> flinkContext; /* * Instantiates a new {@link SpoutCollector} that emits Flink tuples to the given Flink source context. If the * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0 * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. * * @param numberOfAttributes * The number of attributes of the emitted tuples. * @param taskId * The ID of the producer task (negative value for unknown). * @param flinkContext * The Flink source context to be used. * @throws UnsupportedOperationException * if the specified number of attributes is greater than 25 / SpoutCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId, final SourceContext<OUT> flinkContext) throws UnsupportedOperationException { super(numberOfAttributes, taskId); assert (flinkContext != null); this.flinkContext = flinkContext; } @Override protected List<Integer> doEmit(final OUT flinkTuple) { this.flinkContext.collect(flinkTuple); // TODO return null; } @Override public void reportError(final Throwable error) { // not sure, if Flink can support this } @Override public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) { return this.tansformAndEmit(streamId, tuple); } @Override public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) { throw new UnsupportedOperationException(“Direct emit is not supported by Flink”); } public long getPendingCount() { return 0; }}SpoutCollector实现了storm的ISpoutOutputCollector接口,实现了该接口定义的emit、emitDirect、getPendingCount、reportError方法;flink目前不支持emitDirect方法,另外getPendingCount也始终返回0,reportError方法是个空操作doEmit里头调用flinkContext.collect(flinkTuple)来发射数据,该方法为protected,主要是给tansformAndEmit调用的tansformAndEmit方法由父类AbstractStormCollector提供AbstractStormCollector.tansformAndEmitflink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/AbstractStormCollector.java /* * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)} * to the specified output stream. * * @param The * The output stream id. * @param tuple * The Storm tuple to be emitted. * @return the return value of {@link #doEmit(Object)} / @SuppressWarnings(“unchecked”) protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) { List<Integer> taskIds; int numAtt = this.numberOfAttributes.get(streamId); int taskIdIdx = numAtt; if (this.taskId >= 0 && numAtt < 0) { numAtt = 1; taskIdIdx = 0; } if (numAtt >= 0) { assert (tuple.size() == numAtt); Tuple out = this.outputTuple.get(streamId); for (int i = 0; i < numAtt; ++i) { out.setField(tuple.get(i), i); } if (this.taskId >= 0) { out.setField(this.taskId, taskIdIdx); } if (this.split) { this.splitTuple.streamId = streamId; this.splitTuple.value = out; taskIds = doEmit((OUT) this.splitTuple); } else { taskIds = doEmit((OUT) out); } } else { assert (tuple.size() == 1); if (this.split) { this.splitTuple.streamId = streamId; this.splitTuple.value = tuple.get(0); taskIds = doEmit((OUT) this.splitTuple); } else { taskIds = doEmit((OUT) tuple.get(0)); } } this.tupleEmitted = true; return taskIds; }AbstractStormCollector.tansformAndEmit,这里主要处理了split的场景,即一个spout declare了多个stream,最后都通过子类SpoutCollector.doEmit来发射数据如果split为true,则传给doEmit方法的是splitTuple,即SplitStreamType,它记录了streamId及其value如果split为false,则传给doEmit方法的是Tuple类型,即相当于SplitStreamType中的value,相比于SplitStreamType少了streamId信息Task.runflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java/* * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * <p>The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task’s configuration, and the IDs of the intermediate results to consume and * produce (if any). * * <p>Each Task is run by one dedicated thread. /public class Task implements Runnable, TaskActions, CheckpointListener { //…… /* * The core work method that bootstraps the task and executes its code. / @Override public void run() { //…… // now load and instantiate the task’s invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // —————————————————————- // actual task core work // —————————————————————- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //…… }}Task的run方法会调用invokable.invoke(),这里的invokable为StreamTaskStreamTaskflink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java/* * Base class for all streaming tasks. A task is the unit of local processing that is deployed * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form * the Task’s operator chain. Operators that are chained together execute synchronously in the * same thread and hence on the same stream partition. A common case for these chains * are successive map/flatmap/filter tasks. * * <p>The task chain contains one “head” operator and multiple chained operators. * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, * as well as for sources, iteration heads and iteration tails. * * <p>The Task class deals with the setup of the streams read by the head operator, and the streams * produced by the operators at the ends of the operator chain. Note that the chain may fork and * thus have multiple ends. * * <p>The life cycle of the task is set up as follows: * <pre>{@code * – setInitialState -> provides state of all operators in the chain * * – invoke() * | * +—-> Create basic utils (config, etc) and load the chain of operators * +—-> operators.setup() * +—-> task specific init() * +—-> initialize-operator-states() * +—-> open-operators() * +—-> run() * +—-> close-operators() * +—-> dispose-operators() * +—-> common cleanup * +—-> task specific cleanup() * }</pre> * * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods * are called concurrently. * * @param <OUT> * @param <OP> /@Internalpublic abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { //…… @Override public final void invoke() throws Exception { boolean disposed = false; try { //…… // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the “clean shutdown” is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug(“Finished task {}”, getName()); //…… } finally { // clean up everything we initialized isRunning = false; //…… } }}StreamTask的invoke方法里头调用子类的run方法,这里子类为StoppableSourceStreamTaskStoppableSourceStreamTaskflink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java/* * Stoppable task for executing stoppable streaming sources. * * @param <OUT> Type of the produced elements * @param <SRC> Stoppable source function /public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction> extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask { private volatile boolean stopped; public StoppableSourceStreamTask(Environment environment) { super(environment); } @Override protected void run() throws Exception { if (!stopped) { super.run(); } } @Override public void stop() { stopped = true; if (this.headOperator != null) { this.headOperator.stop(); } }}StoppableSourceStreamTask继承了SourceStreamTask,主要是实现了StoppableTask的stop方法,它的run方法由其直接父类SourceStreamTask来实现SourceStreamTaskflink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java/* * {@link StreamTask} for executing a {@link StreamSource}. * * <p>One important aspect of this is that the checkpointing and the emission of elements must never * occur at the same time. The execution must be serial. This is achieved by having the contract * with the StreamFunction that it must only modify its state or emit elements in * a synchronized block that locks on the lock Object. Also, the modification of the state * and the emission of elements must happen in the same block of code that is protected by the * synchronized block. * * @param <OUT> Type of the output elements of this source. * @param <SRC> Type of the source function for the stream source operator * @param <OP> Type of the stream source operator /@Internalpublic class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> { //…… @Override protected void run() throws Exception { headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); }}SourceStreamTask主要是调用StreamSource的run方法StreamSourceflink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java/* * {@link StreamOperator} for streaming sources. * * @param <OUT> Type of the output elements * @param <SRC> Type of the source function of this stream source operator */@Internalpublic class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> { //…… public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception { run(lockingObject, streamStatusMaintainer, output); } public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer, final Output<StreamRecord<OUT>> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(); final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured() ? getExecutionConfig().getLatencyTrackingInterval() : configuration.getLong(MetricOptions.LATENCY_INTERVAL); LatencyMarksEmitter<OUT> latencyEmitter = null; if (latencyTrackingInterval > 0) { latencyEmitter = new LatencyMarksEmitter<>( getProcessingTimeService(), collector, latencyTrackingInterval, this.getOperatorID(), getRuntimeContext().getIndexOfThisSubtask()); } final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( timeCharacteristic, getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval, -1); try { userFunction.run(ctx); // if we get here, then the user function either exited after being done (finite source) // or the function was canceled or stopped. For the finite source case, we should emit // a final watermark that indicates that we reached the end of event-time if (!isCanceledOrStopped()) { ctx.emitWatermark(Watermark.MAX_WATERMARK); } } finally { // make sure that the context is closed in any case ctx.close(); if (latencyEmitter != null) { latencyEmitter.close(); } } }它调用了userFunction.run(ctx),这里的userFunction为SpoutWrapper,从而完成spout的nextTuple的触发小结flink使用SpoutWrapper来包装storm原始的spout,它在run方法里头创建了flink的SpoutCollector作为storm的SpoutOutputCollector的构造器参数,之后调用spout的open方法,把包装了SpoutCollector(flink)的SpoutOutputCollector传递给spout,用来收集spout发射的数据;之后就是根据numberOfInvocations参数来调用spout.nextTuple()方法来发射数据;numberOfInvocations是控制调用spout的nextTuple的次数,它可以在创建SpoutWrapper的时候在构造器中设置,如果使用没有numberOfInvocations参数的构造器,则该值为null,表示infinite loopSpoutCollector的emit方法内部调用了AbstractStormCollector.tansformAndEmit(它最后调用SpoutCollector.doEmit方法来发射),针对多个stream的场景,封装了SplitStreamType的tuple给到doEmit方法;如果只有一个stream,则仅仅将普通的tuple传给doEmit方法flink的Task的run方法会调用StreamTask的invoke方法,而StreamTask的invoke方法会调用子类(这里子类为StoppableSourceStreamTask)的run方法,StoppableSourceStreamTask的run方法是直接父类SourceStreamTask来实现的,而它主要是调用了StreamSource的run方法,而StreamSource的run方法调用了userFunction.run(ctx),这里的userFunction为SpoutWrapper,从而执行spout的nextTuple的逻辑,通过flink的SpoutCollector进行发射docStorm Compatibility Beta ...

November 24, 2018 · 11 min · jiezi

聊聊flink如何兼容StormTopology

序本文主要研究一下flink如何兼容StormTopology实例 @Test public void testStormWordCount() throws Exception { //NOTE 1 build Topology the Storm way final TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(“spout”, new RandomWordSpout(), 1); builder.setBolt(“count”, new WordCountBolt(), 5) .fieldsGrouping(“spout”, new Fields(“word”)); builder.setBolt(“print”, new PrintBolt(), 1) .shuffleGrouping(“count”); //NOTE 2 convert StormTopology to FlinkTopology FlinkTopology flinkTopology = FlinkTopology.createTopology(builder); //NOTE 3 execute program locally using FlinkLocalCluster Config conf = new Config(); // only required to stabilize integration test conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); cluster.submitTopology(“stormWordCount”, conf, flinkTopology); cluster.shutdown(); }这里使用FlinkLocalCluster.getLocalCluster()来创建或获取FlinkLocalCluster,之后调用FlinkLocalCluster.submitTopology来提交topology,结束时通过FlinkLocalCluster.shutdown来关闭cluster这里构建的RandomWordSpout继承自storm的BaseRichSpout,WordCountBolt继承自storm的BaseBasicBolt;PrintBolt继承自storm的BaseRichBolt(由于flink是使用的Checkpoint机制,不会转换storm的ack操作,因而这里用BaseBasicBolt还是BaseRichBolt都无特别要求)FlinkLocalCluster.submitTopology这里使用的topology是StormTopoloy转换后的FlinkTopologyLocalClusterFactoryflink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java // ———————————————————————— // Access to default local cluster // ———————————————————————— // A different {@link FlinkLocalCluster} to be used for execution of ITCases private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory(); /** * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned. * * @return a {@link FlinkLocalCluster} to be used for execution / public static FlinkLocalCluster getLocalCluster() { return currentFactory.createLocalCluster(); } /* * Sets a different factory for FlinkLocalClusters to be used for execution. * * @param clusterFactory * The LocalClusterFactory to create the local clusters for execution. / public static void initialize(LocalClusterFactory clusterFactory) { currentFactory = Objects.requireNonNull(clusterFactory); } // ———————————————————————— // Cluster factory // ———————————————————————— /* * A factory that creates local clusters. / public interface LocalClusterFactory { /* * Creates a local Flink cluster. * @return A local Flink cluster. / FlinkLocalCluster createLocalCluster(); } /* * A factory that instantiates a FlinkLocalCluster. / public static class DefaultLocalClusterFactory implements LocalClusterFactory { @Override public FlinkLocalCluster createLocalCluster() { return new FlinkLocalCluster(); } }flink在FlinkLocalCluster里头提供了一个静态方法getLocalCluster,用来获取FlinkLocalCluster,它是通过LocalClusterFactory来创建一个FlinkLocalClusterLocalClusterFactory这里使用的是DefaultLocalClusterFactory实现类,它的createLocalCluster方法,直接new了一个FlinkLocalCluster目前的实现来看,每次调用FlinkLocalCluster.getLocalCluster,都会创建一个新的FlinkLocalCluster,这个在调用的时候是需要注意一下的FlinkTopologyflink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java /* * Creates a Flink program that uses the specified spouts and bolts. * @param stormBuilder The Storm topology builder to use for creating the Flink topology. * @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed. / public static FlinkTopology createTopology(TopologyBuilder stormBuilder) { return new FlinkTopology(stormBuilder); } private FlinkTopology(TopologyBuilder builder) { this.builder = builder; this.stormTopology = builder.createTopology(); // extract the spouts and bolts this.spouts = getPrivateField("_spouts"); this.bolts = getPrivateField("_bolts"); this.env = StreamExecutionEnvironment.getExecutionEnvironment(); // Kick off the translation immediately translateTopology(); }FlinkTopology提供了一个静态工厂方法createTopology用来创建FlinkTopologyFlinkTopology先保存一下TopologyBuilder,然后通过getPrivateField反射调用getDeclaredField获取_spouts、_bolts私有属性然后保存起来,方便后面转换topology使用之后先获取到ExecutionEnvironment,最后就是调用translateTopology进行整个StormTopology的转换translateTopologyflink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java /* * Creates a Flink program that uses the specified spouts and bolts. / private void translateTopology() { unprocessdInputsPerBolt.clear(); outputStreams.clear(); declarers.clear(); availableInputs.clear(); // Storm defaults to parallelism 1 env.setParallelism(1); / Translation of topology / for (final Entry<String, IRichSpout> spout : spouts.entrySet()) { final String spoutId = spout.getKey(); final IRichSpout userSpout = spout.getValue(); final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); userSpout.declareOutputFields(declarer); final HashMap<String, Fields> sourceStreams = declarer.outputStreams; this.outputStreams.put(spoutId, sourceStreams); declarers.put(spoutId, declarer); final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>(); final DataStreamSource<?> source; if (sourceStreams.size() == 1) { final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null); spoutWrapperSingleOutput.setStormTopology(stormTopology); final String outputStreamId = (String) sourceStreams.keySet().toArray()[0]; DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId, declarer.getOutputType(outputStreamId)); outputStreams.put(outputStreamId, src); source = src; } else { final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>( userSpout, spoutId, null, null); spoutWrapperMultipleOutputs.setStormTopology(stormTopology); @SuppressWarnings({ “unchecked”, “rawtypes” }) DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource( spoutWrapperMultipleOutputs, spoutId, (TypeInformation) TypeExtractor.getForClass(SplitStreamType.class)); SplitStream<SplitStreamType<Tuple>> splitSource = multiSource .split(new StormStreamSelector<Tuple>()); for (String streamId : sourceStreams.keySet()) { SingleOutputStreamOperator<Tuple> outStream = splitSource.select(streamId) .map(new SplitStreamMapper<Tuple>()); outStream.getTransformation().setOutputType(declarer.getOutputType(streamId)); outputStreams.put(streamId, outStream); } source = multiSource; } availableInputs.put(spoutId, outputStreams); final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common(); if (common.is_set_parallelism_hint()) { int dop = common.get_parallelism_hint(); source.setParallelism(dop); } else { common.set_parallelism_hint(1); } } /* * 1. Connect all spout streams with bolts streams * 2. Then proceed with the bolts stream already connected * * <p>Because we do not know the order in which an iterator steps over a set, we might process a consumer before * its producer * ->thus, we might need to repeat multiple times / boolean makeProgress = true; while (bolts.size() > 0) { if (!makeProgress) { StringBuilder strBld = new StringBuilder(); strBld.append(“Unable to build Topology. Could not connect the following bolts:”); for (String boltId : bolts.keySet()) { strBld.append("\n “); strBld.append(boltId); strBld.append(”: missing input streams ["); for (Entry<GlobalStreamId, Grouping> streams : unprocessdInputsPerBolt .get(boltId)) { strBld.append("’"); strBld.append(streams.getKey().get_streamId()); strBld.append("’ from ‘"); strBld.append(streams.getKey().get_componentId()); strBld.append("’; “); } strBld.append(”]"); } throw new RuntimeException(strBld.toString()); } makeProgress = false; final Iterator<Entry<String, IRichBolt>> boltsIterator = bolts.entrySet().iterator(); while (boltsIterator.hasNext()) { final Entry<String, IRichBolt> bolt = boltsIterator.next(); final String boltId = bolt.getKey(); final IRichBolt userBolt = copyObject(bolt.getValue()); final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common(); Set<Entry<GlobalStreamId, Grouping>> unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId); if (unprocessedBoltInputs == null) { unprocessedBoltInputs = new HashSet<>(); unprocessedBoltInputs.addAll(common.get_inputs().entrySet()); unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs); } // check if all inputs are available final int numberOfInputs = unprocessedBoltInputs.size(); int inputsAvailable = 0; for (Entry<GlobalStreamId, Grouping> entry : unprocessedBoltInputs) { final String producerId = entry.getKey().get_componentId(); final String streamId = entry.getKey().get_streamId(); final HashMap<String, DataStream<Tuple>> streams = availableInputs.get(producerId); if (streams != null && streams.get(streamId) != null) { inputsAvailable++; } } if (inputsAvailable != numberOfInputs) { // traverse other bolts first until inputs are available continue; } else { makeProgress = true; boltsIterator.remove(); } final Map<GlobalStreamId, DataStream<Tuple>> inputStreams = new HashMap<>(numberOfInputs); for (Entry<GlobalStreamId, Grouping> input : unprocessedBoltInputs) { final GlobalStreamId streamId = input.getKey(); final Grouping grouping = input.getValue(); final String producerId = streamId.get_componentId(); final Map<String, DataStream<Tuple>> producer = availableInputs.get(producerId); inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer)); } final SingleOutputStreamOperator<?> outputStream = createOutput(boltId, userBolt, inputStreams); if (common.is_set_parallelism_hint()) { int dop = common.get_parallelism_hint(); outputStream.setParallelism(dop); } else { common.set_parallelism_hint(1); } } } }整个转换是先转换spout,再转换bolt,他们根据的spouts及bolts信息是在构造器里头使用反射从storm的TopologyBuilder对象获取到的flink使用FlinkOutputFieldsDeclarer(它实现了storm的OutputFieldsDeclarer接口)来承载storm的IRichSpout及IRichBolt里头配置的declareOutputFields信息,不过要注意的是flink不支持dirct emit;这里通过userSpout.declareOutputFields方法,将原始spout的declare信息设置到FlinkOutputFieldsDeclarerflink使用SpoutWrapper来包装spout,将其转换为RichParallelSourceFunction类型,这里对spout的outputStreams的个数是否大于1进行不同处理;之后就是将RichParallelSourceFunction作为StreamExecutionEnvironment.addSource方法的参数创建flink的DataStreamSource,并添加到availableInputs中,然后根据spout的parallelismHit来设置DataStreamSource的parallelism对于bolt的转换,这里维护了unprocessdInputsPerBolt,key为boltId,value为该bolt要连接的GlobalStreamId及Grouping方式,由于是使用map来进行遍历的,因此转换的bolt可能乱序,如果连接的GlobalStreamId存在则进行转换,然后从bolts中移除,bolt连接的GlobalStreamId不在availableInputs中的时候,需要跳过处理下一个,不会从bolts中移除,因为外层的循环条件是bolts的size大于0,就是依靠这个机制来处理乱序对于bolt的转换有一个重要的方法就是processInput,它把bolt的grouping转换为对spout的DataStream的对应操作(比如shuffleGrouping转换为对DataStream的rebalance操作,fieldsGrouping转换为对DataStream的keyBy操作,globalGrouping转换为global操作,allGrouping转换为broadcast操作),之后调用createOutput方法转换bolt的执行逻辑,它使用BoltWrapper或者MergedInputsBoltWrapper将bolt转换为flink的OneInputStreamOperator,然后作为参数对stream进行transform操作返回flink的SingleOutputStreamOperator,同时将转换后的SingleOutputStreamOperator添加到availableInputs中,之后根据bolt的parallelismHint对这个SingleOutputStreamOperator设置parallelismFlinkLocalClusterflink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/api/FlinkLocalCluster.java/* * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}. /public class FlinkLocalCluster { /* The log used by this mini cluster. / private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class); /* The Flink mini cluster on which to execute the programs. / private FlinkMiniCluster flink; /* Configuration key to submit topology in blocking mode if flag is set to {@code true}. */ public static final String SUBMIT_BLOCKING = “SUBMIT_STORM_TOPOLOGY_BLOCKING”; public FlinkLocalCluster() { } public FlinkLocalCluster(FlinkMiniCluster flink) { this.flink = Objects.requireNonNull(flink); } @SuppressWarnings(“rawtypes”) public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology) throws Exception { this.submitTopologyWithOpts(topologyName, conf, topology, null); } @SuppressWarnings(“rawtypes”) public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception { LOG.info(“Running Storm topology on FlinkLocalCluster”); boolean submitBlocking = false; if (conf != null) { Object blockingFlag = conf.get(SUBMIT_BLOCKING); if (blockingFlag instanceof Boolean) { submitBlocking = ((Boolean) blockingFlag).booleanValue(); } } FlinkClient.addStormConfigToTopology(topology, conf); StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph(); streamGraph.setJobName(topologyName); JobGraph jobGraph = streamGraph.getJobGraph(); if (this.flink == null) { Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, “0”); configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); this.flink = new LocalFlinkMiniCluster(configuration, true); this.flink.start(); } if (submitBlocking) { this.flink.submitJobAndWait(jobGraph, false); } else { this.flink.submitJobDetached(jobGraph); } } public void killTopology(final String topologyName) { this.killTopologyWithOpts(topologyName, null); } public void killTopologyWithOpts(final String name, final KillOptions options) { } public void activate(final String topologyName) { } public void deactivate(final String topologyName) { } public void rebalance(final String name, final RebalanceOptions options) { } public void shutdown() { if (this.flink != null) { this.flink.stop(); this.flink = null; } } //……}FlinkLocalCluster的submitTopology方法调用了submitTopologyWithOpts,而后者主要是设置一些参数,调用topology.getExecutionEnvironment().getStreamGraph()根据transformations生成StreamGraph,再获取JobGraph,然后创建LocalFlinkMiniCluster并start,最后使用LocalFlinkMiniCluster的submitJobAndWait或submitJobDetached来提交整个JobGraph小结flink通过FlinkTopology对storm提供了一定的兼容性,这对于迁移storm到flink非常有帮助要在flink上运行storm的topology,主要有几个步骤,分别是构建storm原生的TopologyBuilder,之后通过FlinkTopology.createTopology(builder)来将StormTopology转换为FlinkTopology,最后是通过FlinkLocalCluster(本地模式)或者FlinkSubmitter(远程提交)的submitTopology方法提交FlinkTopologyFlinkTopology是flink兼容storm的核心,它负责将StormTopology转换为flink对应的结构,比如使用SpoutWrapper将spout转换为RichParallelSourceFunction,然后添加到StreamExecutionEnvironment创建DataStream,把bolt的grouping转换为对spout的DataStream的对应操作(比如shuffleGrouping转换为对DataStream的rebalance操作,fieldsGrouping转换为对DataStream的keyBy操作,globalGrouping转换为global操作,allGrouping转换为broadcast操作),然后使用BoltWrapper或者MergedInputsBoltWrapper将bolt转换为flink的OneInputStreamOperator,然后作为参数对stream进行transform操作构建完FlinkTopology之后,就使用FlinkLocalCluster提交到本地执行,或者使用FlinkSubmitter提交到远程执行FlinkLocalCluster的submitTopology方法主要是通过FlinkTopology作用的StreamExecutionEnvironment生成StreamGraph,通过它获取JobGraph,然后创建LocalFlinkMiniCluster并start,最后通过LocalFlinkMiniCluster提交JobGraphdocStorm Compatibility Beta ...

November 23, 2018 · 6 min · jiezi

聊聊flink的log.file配置

序本文主要研究一下flink的log.file配置log4j.propertiesflink-release-1.6.2/flink-dist/src/main/flink-bin/conf/log4j.properties# This affects logging for both user code and Flinklog4j.rootLogger=INFO, file# Uncomment this if you want to only change Flink’s logging#log4j.logger.org.apache.flink=INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.log4j.logger.akka=INFOlog4j.logger.org.apache.kafka=INFOlog4j.logger.org.apache.hadoop=INFOlog4j.logger.org.apache.zookeeper=INFO# Log all infos in the given filelog4j.appender.file=org.apache.log4j.FileAppenderlog4j.appender.file.file=${log.file}log4j.appender.file.append=falselog4j.appender.file.layout=org.apache.log4j.PatternLayoutlog4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlog4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file这里使用log.file这个系统属性配置log4j.appender.file.fileMiniClusterflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java /** * Starts the mini cluster, based on the configured properties. * * @throws Exception This method passes on any exception that occurs during the startup of * the mini cluster. / public void start() throws Exception { synchronized (lock) { checkState(!running, “FlinkMiniCluster is already running”); LOG.info(“Starting Flink Mini Cluster”); LOG.debug(“Using configuration {}”, miniClusterConfiguration); final Configuration configuration = miniClusterConfiguration.getConfiguration(); final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout(); final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED; try { initializeIOFormatClasses(configuration); LOG.info(“Starting Metrics Registry”); metricRegistry = createMetricRegistry(configuration); this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, “localhost”); final RpcService jobManagerRpcService; final RpcService resourceManagerRpcService; final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers]; // bring up all the RPC services LOG.info(“Starting RPC Service(s)”); // we always need the ‘commonRpcService’ for auxiliary calls commonRpcService = createRpcService(configuration, rpcTimeout, false, null); // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); metricRegistry.startQueryService(actorSystem, null); if (useSingleRpcService) { for (int i = 0; i < numTaskManagers; i++) { taskManagerRpcServices[i] = commonRpcService; } jobManagerRpcService = commonRpcService; resourceManagerRpcService = commonRpcService; this.resourceManagerRpcService = null; this.jobManagerRpcService = null; this.taskManagerRpcServices = null; } else { // start a new service per component, possibly with custom bind addresses final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress(); final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress(); final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress(); jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress); resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress); for (int i = 0; i < numTaskManagers; i++) { taskManagerRpcServices[i] = createRpcService( configuration, rpcTimeout, true, taskManagerBindAddress); } this.jobManagerRpcService = jobManagerRpcService; this.taskManagerRpcServices = taskManagerRpcServices; this.resourceManagerRpcService = resourceManagerRpcService; } // create the high-availability services LOG.info(“Starting high-availability services”); haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( configuration, commonRpcService.getExecutor()); blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); heartbeatServices = HeartbeatServices.fromConfiguration(configuration); // bring up the ResourceManager(s) LOG.info(“Starting ResourceManger”); resourceManagerRunner = startResourceManager( configuration, haServices, heartbeatServices, metricRegistry, resourceManagerRpcService, new ClusterInformation(“localhost”, blobServer.getPort()), jobManagerMetricGroup); blobCacheService = new BlobCacheService( configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()) ); // bring up the TaskManager(s) for the mini cluster LOG.info(“Starting {} TaskManger(s)”, numTaskManagers); taskManagers = startTaskManagers( configuration, haServices, heartbeatServices, metricRegistry, blobCacheService, numTaskManagers, taskManagerRpcServices); // starting the dispatcher rest endpoint LOG.info(“Starting dispatcher rest endpoint.”); dispatcherGatewayRetriever = new RpcGatewayRetriever<>( jobManagerRpcService, DispatcherGateway.class, DispatcherId::fromUuid, 20, Time.milliseconds(20L)); final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( jobManagerRpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 20, Time.milliseconds(20L)); this.dispatcherRestEndpoint = new DispatcherRestEndpoint( RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, configuration, RestHandlerConfiguration.fromConfiguration(configuration), resourceManagerGatewayRetriever, blobServer.getTransientBlobService(), WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), “DispatcherRestEndpoint”), new AkkaQueryServiceRetriever( actorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), haServices.getWebMonitorLeaderElectionService(), new ShutDownFatalErrorHandler()); dispatcherRestEndpoint.start(); restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl()); // bring up the dispatcher that launches JobManagers when jobs submitted LOG.info(“Starting job dispatcher(s) for JobManger”); this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, “localhost”); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint); dispatcher = new StandaloneDispatcher( jobManagerRpcService, Dispatcher.DISPATCHER_NAME + UUID.randomUUID(), configuration, haServices, resourceManagerRunner.getResourceManageGateway(), blobServer, heartbeatServices, jobManagerMetricGroup, metricRegistry.getMetricQueryServicePath(), new MemoryArchivedExecutionGraphStore(), Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, new ShutDownFatalErrorHandler(), dispatcherRestEndpoint.getRestBaseUrl(), historyServerArchivist); dispatcher.start(); resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever(); dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever(); resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever); dispatcherLeaderRetriever.start(dispatcherGatewayRetriever); } catch (Exception e) { // cleanup everything try { close(); } catch (Exception ee) { e.addSuppressed(ee); } throw e; } // create a new termination future terminationFuture = new CompletableFuture<>(); // now officially mark this as running running = true; LOG.info(“Flink Mini Cluster started successfully”); } }这里先创建了metricRegistry、commonRpcService、jobManagerRpcService、resourceManagerRpcService、haServices、blobServer、heartbeatServices、resourceManagerRunner、blobCacheService、taskManagers、dispatcherGatewayRetriever、dispatcherRestEndpoint、dispatcher、dispatcherLeaderRetrieverRestServerEndpointflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/RestServerEndpoint.java /* * Starts this REST server endpoint. * * @throws Exception if we cannot start the RestServerEndpoint / public final void start() throws Exception { synchronized (lock) { Preconditions.checkState(state == State.CREATED, “The RestServerEndpoint cannot be restarted.”); log.info(“Starting rest endpoint.”); final Router router = new Router(); final CompletableFuture<String> restAddressFuture = new CompletableFuture<>(); List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = initializeHandlers(restAddressFuture); / sort the handlers such that they are ordered the following: * /jobs * /jobs/overview * /jobs/:jobid * /jobs/:jobid/config * /:* / Collections.sort( handlers, RestHandlerUrlComparator.INSTANCE); handlers.forEach(handler -> { log.debug(“Register handler {} under {}@{}.”, handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL()); registerHandler(router, handler); }); ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { RouterHandler handler = new RouterHandler(router, responseHeaders); // SSL should be the first handler in the pipeline if (sslEngineFactory != null) { ch.pipeline().addLast(“ssl”, new RedirectingSslHandler(restAddress, restAddressFuture, sslEngineFactory)); } ch.pipeline() .addLast(new HttpServerCodec()) .addLast(new FileUploadHandler(uploadDir)) .addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders)) .addLast(new ChunkedWriteHandler()) .addLast(handler.getName(), handler) .addLast(new PipelineErrorHandler(log, responseHeaders)); } }; NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory(“flink-rest-server-netty-boss”)); NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory(“flink-rest-server-netty-worker”)); bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer); log.debug(“Binding rest endpoint to {}:{}.”, restBindAddress, restBindPort); final ChannelFuture channel; if (restBindAddress == null) { channel = bootstrap.bind(restBindPort); } else { channel = bootstrap.bind(restBindAddress, restBindPort); } serverChannel = channel.syncUninterruptibly().channel(); final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); final String advertisedAddress; if (bindAddress.getAddress().isAnyLocalAddress()) { advertisedAddress = this.restAddress; } else { advertisedAddress = bindAddress.getAddress().getHostAddress(); } final int port = bindAddress.getPort(); log.info(“Rest endpoint listening at {}:{}”, advertisedAddress, port); final String protocol; if (sslEngineFactory != null) { protocol = “https://”; } else { protocol = “http://”; } restBaseUrl = protocol + advertisedAddress + ‘:’ + port; restAddressFuture.complete(restBaseUrl); state = State.RUNNING; startInternal(); } }这里调用了initializeHandlers来获取ChannelInboundHandler,initializeHandlers在子类DispatcherRestEndpoint中有实现DispatcherRestEndpointflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @Override protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) { List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(restAddressFuture); // Add the Dispatcher specific handlers final Time timeout = restConfiguration.getTimeout(); JobSubmitHandler jobSubmitHandler = new JobSubmitHandler( restAddressFuture, leaderRetriever, timeout, responseHeaders, executor, clusterConfiguration); if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) { try { webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension( leaderRetriever, restAddressFuture, timeout, responseHeaders, uploadDir, executor, clusterConfiguration); // register extension handlers handlers.addAll(webSubmissionExtension.getHandlers()); } catch (FlinkException e) { if (log.isDebugEnabled()) { log.debug(“Failed to load web based job submission extension.”, e); } else { log.info(“Failed to load web based job submission extension. " + “Probable reason: flink-runtime-web is not in the classpath.”); } } } else { log.info(“Web-based job submission is not enabled.”); } handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler)); return handlers; }这里首先调用了父类的initializeHandlers,这里的父类为WebMonitorEndpoint(它是RestServerEndpoint的直接子类,而DispatcherRestEndpoint又继承了WebMonitorEndpoint)WebMonitorEndpointflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @Override protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) { ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30); final Time timeout = restConfiguration.getTimeout(); //…… // TODO: Remove once the Yarn proxy can forward all REST verbs handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler)); handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler)); handlers.add(Tuple2.of(shutdownHandler.getMessageHeaders(), shutdownHandler)); //…… // load the log and stdout file handler for the main cluster component final WebMonitorUtils.LogFileLocation logFileLocation = WebMonitorUtils.LogFileLocation.find(clusterConfiguration); final ChannelInboundHandler logFileHandler = createStaticFileHandler( restAddressFuture, timeout, logFileLocation.logFile); final ChannelInboundHandler stdoutFileHandler = createStaticFileHandler( restAddressFuture, timeout, logFileLocation.stdOutFile); handlers.add(Tuple2.of(LogFileHandlerSpecification.getInstance(), logFileHandler)); handlers.add(Tuple2.of(StdoutFileHandlerSpecification.getInstance(), stdoutFileHandler)); // TaskManager log and stdout file handler final Time cacheEntryDuration = Time.milliseconds(restConfiguration.getRefreshInterval()); final TaskManagerLogFileHandler taskManagerLogFileHandler = new TaskManagerLogFileHandler( restAddressFuture, leaderRetriever, timeout, responseHeaders, TaskManagerLogFileHeaders.getInstance(), resourceManagerRetriever, transientBlobService, cacheEntryDuration); final TaskManagerStdoutFileHandler taskManagerStdoutFileHandler = new TaskManagerStdoutFileHandler( restAddressFuture, leaderRetriever, timeout, responseHeaders, TaskManagerStdoutFileHeaders.getInstance(), resourceManagerRetriever, transientBlobService, cacheEntryDuration); handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler)); handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler)); //…… } @Nonnull private ChannelInboundHandler createStaticFileHandler( CompletableFuture<String> restAddressFuture, Time timeout, File fileToServe) { if (fileToServe == null) { return new ConstantTextHandler("(file unavailable)”); } else { try { return new StaticFileServerHandler<>( leaderRetriever, restAddressFuture, timeout, fileToServe); } catch (IOException e) { log.info(“Cannot load log file handler.”, e); return new ConstantTextHandler("(log file unavailable)"); } } }它初始化了一系列的ChannelInboundHandler,然后注册到handlers中对于JobManager的FileHandler,它先调用了WebMonitorUtils.LogFileLocation.find(clusterConfiguration),构建了logFileLocation,之后使用logFileLocation.logFile及logFileLocation.stdOutFile分别构造了logFileHandler、stdoutFileHandler,分别用于处理log及stdout文件的下载对于TaskManager的FileHandler,分别构造了TaskManagerLogFileHandler以及TaskManagerStdoutFileHandler来处理log及stdout文件的下载JobManager FileHandlerWebMonitorUtils.LogFileLocation.findflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java /* * Singleton to hold the log and stdout file. / public static class LogFileLocation { public final File logFile; public final File stdOutFile; private LogFileLocation(File logFile, File stdOutFile) { this.logFile = logFile; this.stdOutFile = stdOutFile; } /* * Finds the Flink log directory using log.file Java property that is set during startup. / public static LogFileLocation find(Configuration config) { final String logEnv = “log.file”; String logFilePath = System.getProperty(logEnv); if (logFilePath == null) { LOG.warn(“Log file environment variable ‘{}’ is not set.”, logEnv); logFilePath = config.getString(WebOptions.LOG_PATH); } // not configured, cannot serve log files if (logFilePath == null || logFilePath.length() < 4) { LOG.warn(“JobManager log files are unavailable in the web dashboard. " + “Log file location not found in environment variable ‘{}’ or configuration key ‘{}’.”, logEnv, WebOptions.LOG_PATH); return new LogFileLocation(null, null); } String outFilePath = logFilePath.substring(0, logFilePath.length() - 3).concat(“out”); LOG.info(“Determined location of main cluster component log file: {}”, logFilePath); LOG.info(“Determined location of main cluster component stdout file: {}”, outFilePath); return new LogFileLocation(resolveFileLocation(logFilePath), resolveFileLocation(outFilePath)); } /* * Verify log file location. * * @param logFilePath Path to log file * @return File or null if not a valid log file / private static File resolveFileLocation(String logFilePath) { File logFile = new File(logFilePath); return (logFile.exists() && logFile.canRead()) ? logFile : null; } }这里先从系统属性读取log.file属性,没有找到,则打印warning(Log file environment variable ’log.file’ is not set.)log.file没有配置的话,则从flink的Configuration读取WebOptions.LOG_PATH(web.log.path)配置,如果没有或者logFilePath.length()小于4,则打印warning(JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable ’log.file’ or configuration key ‘Key: ‘web.log.path’ , default: null (deprecated keys: [jobmanager.web.log.path])’.)这里之所以要logFilePath.length()大于等于4,主要是后面要使用logFilePath.substring(0, logFilePath.length() - 3).concat(“out”)来构建outFilePath;然后通过resolveFileLocation方法校验logFilePath及outFilePath,构建LogFileLocation返回StaticFileServerHandlerflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java/* * Simple file server handler that serves requests to web frontend’s static files, such as * HTML, CSS, or JS files. * * <p>This code is based on the “HttpStaticFileServerHandler” from the Netty project’s HTTP server * example.</p> /@ChannelHandler.Sharablepublic class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> { /* Timezone in which this server answers its “if-modified” requests. / private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone(“GMT”); /* Date format for HTTP. / public static final String HTTP_DATE_FORMAT = “EEE, dd MMM yyyy HH:mm:ss zzz”; /* Be default, we allow files to be cached for 5 minutes. / private static final int HTTP_CACHE_SECONDS = 300; // ———————————————————————— /* The path in which the static documents are. / private final File rootPath; public StaticFileServerHandler( GatewayRetriever<? extends T> retriever, CompletableFuture<String> localJobManagerAddressFuture, Time timeout, File rootPath) throws IOException { super(localJobManagerAddressFuture, retriever, timeout, Collections.emptyMap()); this.rootPath = checkNotNull(rootPath).getCanonicalFile(); } // ———————————————————————— // Responses to requests // ———————————————————————— @Override protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, T gateway) throws Exception { final HttpRequest request = routedRequest.getRequest(); final String requestPath; // make sure we request the “index.html” in case there is a directory request if (routedRequest.getPath().endsWith(”/")) { requestPath = routedRequest.getPath() + “index.html”; } // in case the files being accessed are logs or stdout files, find appropriate paths. else if (routedRequest.getPath().equals("/jobmanager/log") || routedRequest.getPath().equals("/jobmanager/stdout")) { requestPath = “”; } else { requestPath = routedRequest.getPath(); } respondToRequest(channelHandlerContext, request, requestPath); } //…… @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { if (ctx.channel().isActive()) { logger.error(“Caught exception”, cause); sendError(ctx, INTERNAL_SERVER_ERROR); } }}对于/jobmanager/log以及/jobmanager/stdout它会重置一下requestPath,之后调用respondToRequest处理,它根据rootPath来传输文件TaskManager FileHandlerTaskManagerLogFileHandlerflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogFileHandler.java/* * Rest handler which serves the log files from {@link TaskExecutor}. /public class TaskManagerLogFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> { public TaskManagerLogFileHandler( @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, @Nonnull Time timeout, @Nonnull Map<String, String> responseHeaders, @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders, @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, @Nonnull TransientBlobService transientBlobService, @Nonnull Time cacheEntryDuration) { super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration); } @Override protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) { return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.LOG, timeout); }}它的requestFileUpload是调用了ResourceManager.requestTaskManagerFileUpload,传递的FileType是FileType.LOG类型TaskManagerStdoutFileHandler.requestFileUploadflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerStdoutFileHandler.java/* * Rest handler which serves the stdout file of the {@link TaskExecutor}. */public class TaskManagerStdoutFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> { public TaskManagerStdoutFileHandler( @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, @Nonnull Time timeout, @Nonnull Map<String, String> responseHeaders, @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders, @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, @Nonnull TransientBlobService transientBlobService, @Nonnull Time cacheEntryDuration) { super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration); } @Override protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) { return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.STDOUT, timeout); }}它的requestFileUpload是调用了ResourceManager.requestTaskManagerFileUpload,传递的FileType是FileType.STDOUT类型ResourceManager.requestTaskManagerFileUploadflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/resourcemanager/ResourceManager.java @Override public CompletableFuture<TransientBlobKey> requestTaskManagerFileUpload(ResourceID taskManagerId, FileType fileType, Time timeout) { log.debug(“Request file {} upload from TaskExecutor {}.”, fileType, taskManagerId); final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId); if (taskExecutor == null) { log.debug(“Requested file {} upload from unregistered TaskExecutor {}.”, fileType, taskManagerId); return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId)); } else { return taskExecutor.getTaskExecutorGateway().requestFileUpload(fileType, timeout); } }ResourceManager的requestTaskManagerFileUpload是通过TaskExecutor.requestFileUpload来实现的TaskExecutor.requestFileUploadflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @Override public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, Time timeout) { log.debug(“Request file {} upload.”, fileType); final String filePath; switch (fileType) { case LOG: filePath = taskManagerConfiguration.getTaskManagerLogPath(); break; case STDOUT: filePath = taskManagerConfiguration.getTaskManagerStdoutPath(); break; default: filePath = null; } if (filePath != null && !filePath.isEmpty()) { final File file = new File(filePath); if (file.exists()) { final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService(); final TransientBlobKey transientBlobKey; try (FileInputStream fileInputStream = new FileInputStream(file)) { transientBlobKey = transientBlobService.putTransient(fileInputStream); } catch (IOException e) { log.debug(“Could not upload file {}.”, fileType, e); return FutureUtils.completedExceptionally(new FlinkException(“Could not upload file " + fileType + ‘.’, e)); } return CompletableFuture.completedFuture(transientBlobKey); } else { log.debug(“The file {} does not exist on the TaskExecutor {}.”, fileType, getResourceID()); return FutureUtils.completedExceptionally(new FlinkException(“The file " + fileType + " does not exist on the TaskExecutor.”)); } } else { log.debug(“The file {} is unavailable on the TaskExecutor {}.”, fileType, getResourceID()); return FutureUtils.completedExceptionally(new FlinkException(“The file " + fileType + " is not available on the TaskExecutor.”)); } }TaskExecutor的requestFileUpload会根据fileType来获取filePath,如果是LOG类型取的是taskManagerConfiguration.getTaskManagerLogPath();如果是STDOUT类型,取的是taskManagerConfiguration.getTaskManagerStdoutPath(),之后将文件传输过去TaskManagerRunner.startTaskManagerflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java public static TaskExecutor startTaskManager( Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, BlobCacheService blobCacheService, boolean localCommunicationOnly, FatalErrorHandler fatalErrorHandler) throws Exception { checkNotNull(configuration); checkNotNull(resourceID); checkNotNull(rpcService); checkNotNull(highAvailabilityServices); LOG.info(“Starting TaskManager with ResourceID: {}”, resourceID); InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress()); TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( configuration, remoteAddress, localCommunicationOnly); TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, resourceID, rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io. EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(), EnvironmentInformation.getMaxJvmHeapMemory()); TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( metricRegistry, taskManagerServices.getTaskManagerLocation(), taskManagerServices.getNetworkEnvironment()); TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); return new TaskExecutor( rpcService, taskManagerConfiguration, highAvailabilityServices, taskManagerServices, heartbeatServices, taskManagerMetricGroup, blobCacheService, fatalErrorHandler); }TaskManagerRunner.startTaskManager通过TaskManagerConfiguration.fromConfiguration(configuration)构造了taskManagerConfigurationTaskManagerConfiguration.fromConfigurationflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java public static TaskManagerConfiguration fromConfiguration(Configuration configuration) { int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); if (numberSlots == -1) { numberSlots = 1; } //…… final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty(“log.file”)); final String taskManagerStdoutPath; if (taskManagerLogPath != null) { final int extension = taskManagerLogPath.lastIndexOf(’.’); if (extension > 0) { taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + “.out”; } else { taskManagerStdoutPath = null; } } else { taskManagerStdoutPath = null; } return new TaskManagerConfiguration( numberSlots, tmpDirPaths, timeout, finiteRegistrationDuration, initialRegistrationPause, maxRegistrationPause, refusedRegistrationPause, configuration, exitOnOom, FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder), alwaysParentFirstLoaderPatterns, taskManagerLogPath, taskManagerStdoutPath); }TaskManagerConfiguration.fromConfiguration里头首先根据ConfigConstants.TASK_MANAGER_LOG_PATH_KEY(taskmanager.log.path)从flink的Configuration读取taskManagerLogPath,如果读取不到,则取系统属性log.file;如果读取到taskManagerLogPath不为null,则换个后缀构建taskManagerStdoutPath小结flink的log4j.properties里头配置了file appender,使用了系统属性log.fileflink的MiniCluster在start的时候会创建DispatcherRestEndpoint,它的start方法会使用initializeHandlers来初始化一系列的handlers,对于JobManager的fileHandler,使用WebMonitorUtils.LogFileLocation.find(clusterConfiguration)获取logFileLocation,它先从系统属性读取log.file属性,没有找到的话再从flink的Configuration读取WebOptions.LOG_PATH(web.log.path)配置;之后分别使用logFileLocation.logFile及logFileLocation.stdOutFile创建了两个StaticFileServerHandler对于TaskManager的fileHandler,则分别创建了TaskManagerLogFileHandler及TaskManagerStdoutFileHandler来处理log及stdout文件的下载,它们内部都是调用了ResourceManager.requestTaskManagerFileUpload方法,只是fileType不同,一个是LOG,一个是STDOUT;而ResourceManager.requestTaskManagerFileUpload方法最后是通过TaskExecutor.requestFileUpload来完成文件传输;TaskManagerRunner.startTaskManager在创建TaskExecutor的时候,构造了TaskManagerConfiguration,它里头先从flink的Configuration获取ConfigConstants.TASK_MANAGER_LOG_PATH_KEY(taskmanager.log.path),如果没有则取系统属性log.filedocHow to use logging ...

November 22, 2018 · 11 min · jiezi

美团点评基于 Flink 的实时数仓建设实践

引言近些年,企业对数据服务实时化服务的需求日益增多。本文整理了常见实时数据组件的性能特点和适用场景,介绍了美团如何通过 Flink 引擎构建实时数据仓库,从而提供高效、稳健的实时数据服务。此前我们美团技术博客发布过一篇文章《流计算框架 Flink 与 Storm 的性能对比》,对 Flink 和 Storm 俩个引擎的计算性能进行了比较。本文主要阐述使用 Flink 在实际数据生产上的经验。实时平台初期架构在实时数据系统建设初期,由于对实时数据的需求较少,形成不了完整的数据体系。我们采用的是“一路到底”的开发模式:通过在实时计算平台上部署 Storm 作业处理实时数据队列来提取数据指标,直接推送到实时应用服务中。<center>图1 初期实时数据架构</center>但是,随着产品和业务人员对实时数据需求的不断增多,新的挑战也随之发生。数据指标越来越多,“烟囱式”的开发导致代码耦合问题严重。需求越来越多,有的需要明细数据,有的需要 OLAP 分析。单一的开发模式难以应付多种需求。缺少完善的监控系统,无法在对业务产生影响之前发现并修复问题。实时数据仓库的构建为解决以上问题,我们根据生产离线数据的经验,选择使用分层设计方案来建设实时数据仓库,其分层架构如下图所示:<center>图2 实时数仓数据分层架构</center>该方案由以下四层构成:ODS 层:Binlog 和流量日志以及各业务实时队列。数据明细层:业务领域整合提取事实数据,离线全量和实时变化数据构建实时维度数据。数据汇总层:使用宽表模型对明细数据补充维度数据,对共性指标进行汇总。App 层:为了具体需求而构建的应用层,通过 RPC 框架对外提供服务。通过多层设计我们可以将处理数据的流程沉淀在各层完成。比如在数据明细层统一完成数据的过滤、清洗、规范、脱敏流程;在数据汇总层加工共性的多维指标汇总数据。提高了代码的复用率和整体生产效率。同时各层级处理的任务类型相似,可以采用统一的技术方案优化性能,使数仓技术架构更简洁。技术选型1.存储引擎的调研实时数仓在设计中不同于离线数仓在各层级使用同种储存方案,比如都存储在 Hive 、DB 中的策略。首先对中间过程的表,采用将结构化的数据通过消息队列存储和高速 KV 存储混合的方案。实时计算引擎可以通过监听消息消费消息队列内的数据,进行实时计算。而在高速 KV 存储上的数据则可以用于快速关联计算,比如维度数据。 其次在应用层上,针对数据使用特点配置存储方案直接写入。避免了离线数仓应用层同步数据流程带来的处理延迟。 为了解决不同类型的实时数据需求,合理的设计各层级存储方案,我们调研了美团内部使用比较广泛的几种存储方案。<center>表1 存储方案列表</center>方案优势劣势MySQL1. 具有完备的事务功能,可以对数据进行更新。2. 支持 SQL,开发成本低。1. 横向扩展成本大,存储容易成为瓶颈; 2. 实时数据的更新和查询频率都很高,线上单个实时应用请求就有 1000+ QPS;使用 MySQL 成本太高。Elasticsearch1. 吞吐量大,单个机器可以支持 2500+ QPS,并且集群可以快速横向扩展。2. Term 查询时响应速度很快,单个机器在 2000+ QPS时,查询延迟在 20 ms以内。1. 没有原生的 SQL 支持,查询 DSL 有一定的学习门槛;2. 进行聚合运算时性能下降明显。Druid1. 支持超大数据量,通过 Kafka 获取实时数据时,单个作业可支持 6W+ QPS;2. 可以在数据导入时通过预计算对数据进行汇总,减少的数据存储。提高了实际处理数据的效率;3. 有很多开源 OLAP 分析框架。实现如 Superset。1. 预聚合导致无法支持明细的查询;2. 无法支持 Join 操作;3. Append-only 不支持数据的修改。只能以 Segment 为单位进行替换。Cellar1. 支持超大数据量,采用内存加分布式存储的架构,存储性价比很高;2. 吞吐性能好,经测试处理 3W+ QPS 读写请求时,平均延迟在 1ms左右;通过异步读写线上最高支持 10W+ QPS。1. 接口仅支持 KV,Map,List 以及原子加减等;2. 单个 Key 值不得超过 1KB ,而 Value 的值超过 100KB 时则性能下降明显。根据不同业务场景,实时数仓各个模型层次使用的存储方案大致如下:<center>图3 实时数仓存储分层架构</center>数据明细层 对于维度数据部分场景下关联的频率可达 10w+ TPS,我们选择 Cellar(美团内部存储系统) 作为存储,封装维度服务为实时数仓提供维度数据。数据汇总层 对于通用的汇总指标,需要进行历史数据关联的数据,采用和维度数据一样的方案通过 Cellar 作为存储,用服务的方式进行关联操作。数据应用层 应用层设计相对复杂,再对比了几种不同存储方案后。我们制定了以数据读写频率 1000 QPS 为分界的判断依据。对于读写平均频率高于 1000 QPS 但查询不太复杂的实时应用,比如商户实时的经营数据。采用 Cellar 为存储,提供实时数据服务。对于一些查询复杂的和需要明细列表的应用,使用 Elasticsearch 作为存储则更为合适。而一些查询频率低,比如一些内部运营的数据。 Druid 通过实时处理消息构建索引,并通过预聚合可以快速的提供实时数据 OLAP 分析功能。对于一些历史版本的数据产品进行实时化改造时,也可以使用 MySQL 存储便于产品迭代。2.计算引擎的调研在实时平台建设初期我们使用 Storm 引擎来进行实时数据处理。Storm 引擎虽然在灵活性和性能上都表现不错。但是由于 API 过于底层,在数据开发过程中需要对一些常用的数据操作进行功能实现。比如表关联、聚合等,产生了很多额外的开发工作,不仅引入了很多外部依赖比如缓存,而且实际使用时性能也不是很理想。同时 Storm 内的数据对象 Tuple 支持的功能也很简单,通常需要将其转换为 Java 对象来处理。对于这种基于代码定义的数据模型,通常我们只能通过文档来进行维护。不仅需要额外的维护工作,同时在增改字段时也很麻烦。综合来看使用 Storm 引擎构建实时数仓难度较大。我们需要一个新的实时处理方案,要能够实现:提供高级 API,支持常见的数据操作比如关联聚合,最好是能支持 SQL。具有状态管理和自动支持久化方案,减少对存储的依赖。便于接入元数据服务,避免通过代码管理数据结构。处理性能至少要和 Storm 一致。我们对主要的实时计算引擎进行了技术调研。总结了各类引擎特性如下表所示:<center>表2 实时计算方案列表</center>项目/引擎StormFlinkspark-treamingAPI灵活的底层 API 和具有事务保证的 Trident API流 API 和更加适合数据开发的 Table API 和 Flink SQL 支持流 API 和 Structured-Streaming API 同时也可以使用更适合数据开发的 Spark SQL容错机制ACK 机制State 分布式快照保存点RDD 保存点状态管理Trident State状态管理Key State 和 Operator State两种 State 可以使用,支持多种持久化方案有 UpdateStateByKey 等 API 进行带状态的变更,支持多种持久化方案处理模式单条流式处理单条流式处理Mic batch处理延迟毫秒级毫秒级秒级语义保障At Least Once,Exactly OnceExactly Once,At Least OnceAt Least Once从调研结果来看,Flink 和 Spark Streaming 的 API 、容错机制与状态持久化机制都可以解决一部分我们目前使用 Storm 中遇到的问题。但 Flink 在数据延迟上和 Storm 更接近,对现有应用影响最小。而且在公司内部的测试中 Flink 的吞吐性能对比 Storm 有十倍左右提升。综合考量我们选定 Flink 引擎作为实时数仓的开发引擎。更加引起我们注意的是,Flink 的 Table 抽象和 SQL 支持。虽然使用 Strom 引擎也可以处理结构化数据。但毕竟依旧是基于消息的处理 API ,在代码层层面上不能完全享受操作结构化数据的便利。而 Flink 不仅支持了大量常用的 SQL 语句,基本覆盖了我们的开发场景。而且 Flink 的 Table 可以通过 TableSchema 进行管理,支持丰富的数据类型和数据结构以及数据源。可以很容易的和现有的元数据管理系统或配置管理系统结合。通过下图我们可以清晰的看出 Storm 和 Flink 在开发统过程中的区别。<center>图4 Flink - Storm 对比图</center>在使用 Storm 开发时处理逻辑与实现需要固化在 Bolt 的代码。Flink 则可以通过 SQL 进行开发,代码可读性更高,逻辑的实现由开源框架来保证可靠高效,对特定场景的优化只要修改 Flink SQL 优化器功能实现即可,而不影响逻辑代码。使我们可以把更多的精力放到到数据开发中,而不是逻辑的实现。当需要离线数据和实时数据口径统一的场景时,我们只需对离线口径的 SQL 脚本稍加改造即可,极大地提高了开发效率。同时对比图中 Flink 和 Storm 使用的数据模型,Storm 需要通过一个 Java 的 Class 去定义数据结构,Flink Table 则可以通过元数据来定义。可以很好的和数据开发中的元数据,数据治理等系统结合,提高开发效率。Flink使用心得在利用 Flink-Table 构建实时数据仓库过程中。我们针对一些构建数据仓库的常用操作,比如数据指标的维度扩充,数据按主题关联,以及数据的聚合运算通过 Flink 来实现总结了一些使用心得。1.维度扩充数据指标的维度扩充,我们采用的是通过维度服务获取维度信息。虽然基于 Cellar 的维度服务通常的响应延迟可以在 1ms 以下。但是为了进一步优化 Flink 的吞吐,我们对维度数据的关联全部采用了异步接口访问的方式,避免了使用 RPC 调用影响数据吞吐。对于一些数据量很大的流,比如流量日志数据量在 10W 条/秒这个量级。在关联 UDF 的时候内置了缓存机制,可以根据命中率和时间对缓存进行淘汰,配合用关联的 Key 值进行分区,显著减少了对外部服务的请求次数,有效的减少了处理延迟和对外部系统的压力。2.数据关联数据主题合并,本质上就是多个数据源的关联,简单的来说就是 Join 操作。Flink 的 Table 是建立在无限流这个概念上的。在进行 Join 操作时并不能像离线数据一样对两个完整的表进行关联。采用的是在窗口时间内对数据进行关联的方案,相当于从两个数据流中各自截取一段时间的数据进行 Join 操作。有点类似于离线数据通过限制分区来进行关联。同时需要注意 Flink 关联表时必须有至少一个“等于”关联条件,因为等号两边的值会用来分组。由于 Flink 会缓存窗口内的全部数据来进行关联,缓存的数据量和关联的窗口大小成正比。因此 Flink 的关联查询,更适合处理一些可以通过业务规则限制关联数据时间范围的场景。比如关联下单用户购买之前 30 分钟内的浏览日志。过大的窗口不仅会消耗更多的内存,同时会产生更大的 Checkpoint ,导致吞吐下降或 Checkpoint 超时。在实际生产中可以使用 RocksDB 和启用增量保存点模式,减少 Checkpoint 过程对吞吐产生影响。对于一些需要关联窗口期很长的场景,比如关联的数据可能是几天以前的数据。对于这些历史数据,我们可以将其理解为是一种已经固定不变的"维度"。可以将需要被关联的历史数据采用和维度数据一致的处理方法:“缓存 + 离线"数据方式存储,用接口的方式进行关联。另外需要注意 Flink 对多表关联是直接顺序链接的,因此需要注意先进行结果集小的关联。3.聚合运算使用聚合运算时,Flink 对常见的聚合运算如求和、极值、均值等都有支持。美中不足的是对于 Distinct 的支持,Flink-1.6 之前的采用的方案是通过先对去重字段进行分组再聚合实现。对于需要对多个字段去重聚合的场景,只能分别计算再进行关联处理效率很低。为此我们开发了自定义的 UDAF,实现了 MapView 精确去重、BloomFilter 非精确去重、 HyperLogLog 超低内存去重方案应对各种实时去重场景。但是在使用自定义的 UDAF 时,需要注意 RocksDBStateBackend 模式对于较大的 Key 进行更新操作时序列化和反序列化耗时很多。可以考虑使用 FsStateBackend 模式替代。另外要注意的一点 Flink 框架在计算比如 Rank 这样的分析函数时,需要缓存每个分组窗口下的全部数据才能进行排序,会消耗大量内存。建议在这种场景下优先转换为 TopN 的逻辑,看是否可以解决需求。下图展示一个完整的使用 Flink 引擎生产一张实时数据表的过程:<center>图5 实时计算流程图</center>实时数仓成果通过使用实时数仓代替原有流程,我们将数据生产中的各个流程抽象到实时数仓的各层当中。实现了全部实时数据应用的数据源统一,保证了应用数据指标、维度的口径的一致。在几次数据口径发生修改的场景中,我们通过对仓库明细和汇总进行改造,在完全不用修改应用代码的情况下就完成全部应用的口径切换。在开发过程中通过严格的把控数据分层、主题域划分、内容组织标准规范和命名规则。使数据开发的链路更为清晰,减少了代码的耦合。再配合上使用 Flink SQL 进行开发,代码加简洁。单个作业的代码量从平均 300+ 行的 JAVA 代码 ,缩减到几十行的 SQL 脚本。项目的开发时长也大幅减短,一人日开发多个实时数据指标情况也不少见。除此以外我们通过针对数仓各层级工作内容的不同特点,可以进行针对性的性能优化和参数配置。比如 ODS 层主要进行数据的解析、过滤等操作,不需要 RPC 调用和聚合运算。 我们针对数据解析过程进行优化,减少不必要的 JSON 字段解析,并使用更高效的 JSON 包。在资源分配上,单个 CPU 只配置 1GB 的内存即可满需求。而汇总层主要则主要进行聚合与关联运算,可以通过优化聚合算法、内外存共同运算来提高性能、减少成本。资源配置上也会分配更多的内存,避免内存溢出。通过这些优化手段,虽然相比原有流程实时数仓的生产链路更长,但数据延迟并没有明显增加。同时实时数据应用所使用的计算资源也有明显减少。展望我们的目标是将实时仓库建设成可以和离线仓库数据准确性,一致性媲美的数据系统。为商家,业务人员以及美团用户提供及时可靠的数据服务。同时作为到餐实时数据的统一出口,为集团其他业务部门助力。未来我们将更加关注在数据可靠性和实时数据指标管理。建立完善的数据监控,数据血缘检测,交叉检查机制。及时对异常数据或数据延迟进行监控和预警。同时优化开发流程,降低开发实时数据学习成本。让更多有实时数据需求的人,可以自己动手解决问题。参考文献流计算框架 Flink 与 Storm 的性能对比关于作者伟伦,美团到店餐饮技术部实时数据负责人,2017年加入美团,长期从事数据平台、实时数据计算、数据架构方面的开发工作。在使用 Flink 进行实时数据生产和提高生产效率上,有一些心得和产出。同时也积极推广 Flink 在实时数据处理中的实战经验。招聘信息对数据工程和将数据通过服务业务释放价值感兴趣的同学,可以发送简历到 huangweilun@meituan.com。我们在实时数据仓库、实时数据治理、实时数据产品开发框架、面向销售和商家侧的数据型创新产品层面,都有很多未知但有意义的领域等你来开拓。 ...

October 19, 2018 · 2 min · jiezi