关于flink:Flink-源码广播流状态源码解析

37次阅读

共计 24437 个字符,预计需要花费 62 分钟才能阅读完成。

Flink 源码:播送流状态源码解析

Broadcast State 是 Operator State 的一种非凡类型。它的引入是为了反对这样的场景: 一个流的记录须要播送到所有上游工作,在这些用例中,它们用于在所有子工作中保护雷同的状态。而后能够在解决第二个流的数据时拜访这个播送状态,播送状态有本人的一些个性。

  1. 必须定义为一个 Map 构造。
  2. 播送状态只能在播送流侧批改,非播送侧不能批改状态。
  3. Broadcast State 运行时的状态只能保留在内存中。

看到这置信你必定会有上面的疑难:

  • 播送状态为什么必须定义为 Map 构造,我用其余的状态类型不行吗?
  • 播送状态为什么只能在播送侧批改,非播送侧为什么不能批改呢?
  • 播送状态为什么只能保留在内存中,难道不能用 Rockdb 状态后端吗?

上面就带着这三个疑难通过浏览相干源码,答复下面的问题。

broadcast 源码

/**
 * Sets the partitioning of the {@link DataStream} so that the output elements are broadcasted
 * to every parallel instance of the next operation. In addition, it implicitly as many {@link
 * org.apache.flink.api.common.state.BroadcastState broadcast states} as the specified
 * descriptors which can be used to store the element of the stream.
 *
 * @param broadcastStateDescriptors the descriptors of the broadcast states to create.
 * @return A {@link BroadcastStream} which can be used in the {@link #connect(BroadcastStream)}
 *     to create a {@link BroadcastConnectedStream} for further processing of the elements.
 */
@PublicEvolving
public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {Preconditions.checkNotNull(broadcastStateDescriptors);
    final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
    return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
}

能够发现 broadcast 办法须要的参数是 MapStateDescriptor<?, ?> 也就是一个 Map 构造的状态描述符,咱们在应用的时候就必须定义为 MapStateDescriptor,否则会间接报错,其实次要是因为播送状态的作用是和非播送流进行关联,你能够设想成双流 join 的场景,那么 join 的时候就必须要有一个主键,也就是雷同的 key 能力 join 上,所以 Map(key-value)构造是最适宜这种场景的,key 能够存储要关联字段,value 能够是任意类型的播送数据,在关联的时候只须要获取到播送状态,而后 state.get(key) 就能够很容易拿到播送数据。

process 源码

@PublicEvolving
public <KEY, OUT> SingleOutputStreamOperator<OUT> process(final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function) {
        // 获取输入数据的类型信息
    TypeInformation<OUT> outTypeInfo =
            TypeExtractor.getBinaryOperatorReturnType(
                    function,
                    KeyedBroadcastProcessFunction.class,
                    1,
                    2,
                    3,
                    TypeExtractor.NO_INDEX,
                    getType1(),
                    getType2(),
                    Utils.getCallLocationName(),
                    true);

    return process(function, outTypeInfo);
}

process 办法须要的参数是 KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT>,跟一般的 KeyedProcessFunction<K, I, O> 相比,很容易发现多了一个泛型参数,因为这里的 process 上游连贯的是两个数据流,所以须要两个类型。而后调用 process 的重载办法。

process 源码

@PublicEvolving
public <KEY, OUT> SingleOutputStreamOperator<OUT> process(
        final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function,
        final TypeInformation<OUT> outTypeInfo) {Preconditions.checkNotNull(function);
    Preconditions.checkArgument(
            nonBroadcastStream instanceof KeyedStream,
            "A KeyedBroadcastProcessFunction can only be used on a keyed stream.");
        
    return transform(function, outTypeInfo);
}

这个 process 办法外面什么都没干,间接调用 transform 办法。

transform 源码

@Internal
private <KEY, OUT> SingleOutputStreamOperator<OUT> transform(
        final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> userFunction,
        final TypeInformation<OUT> outTypeInfo) {

    // read the output type of the input Transforms to coax out errors about MissingTypeInfo
    nonBroadcastStream.getType();
    broadcastStream.getType();

    KeyedStream<IN1, KEY> keyedInputStream = (KeyedStream<IN1, KEY>) nonBroadcastStream;
        // 结构 KeyedBroadcastStateTransformation
    final KeyedBroadcastStateTransformation<KEY, IN1, IN2, OUT> transformation =
            new KeyedBroadcastStateTransformation<>(
                    "Co-Process-Broadcast-Keyed",
                    nonBroadcastStream.getTransformation(),
                    broadcastStream.getTransformation(),
                    clean(userFunction),
                    broadcastStateDescriptors,
                    keyedInputStream.getKeyType(),
                    keyedInputStream.getKeySelector(),
                    outTypeInfo,
                    environment.getParallelism());

    @SuppressWarnings({"unchecked", "rawtypes"})
    final SingleOutputStreamOperator<OUT> returnStream =
            new SingleOutputStreamOperator(environment, transformation);
        // 增加到 List<Transformation<?>> 汇合
    getExecutionEnvironment().addOperator(transformation);
    return returnStream;
}

transform 办法外面次要做了两件事:

  1. 先是结构对应的 KeyedBroadcastStateTransformation 对象,其实 KeyedBroadcastStateTransformation 也是 Transformation 的一个子类。
  2. 而后把结构好的 transformation 增加到 List<Transformation<?>> 汇合里,前面在构建 StreamGraph 的时候会从这个汇合里获取 Transformation。

getStreamGraph 源码

@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
    if (clearTransformations) {transformations.clear();
    }
    return streamGraph;
}

getStreamGraph 的次要作用就是生成 StreamGraph。上面就会用到上一步生成的 List<Transformation<?>> 汇合,因为这篇文章次要是剖析 Flink 播送流的源码,所以只会对播送流相干的源码进行解析。

getStreamGraphGenerator 源码

private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {if (transformations.size() <= 0) {
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
    }

    // We copy the transformation so that newly added transformations cannot intervene with the
    // stream graph generation.
    return new StreamGraphGenerator(new ArrayList<>(transformations), config, checkpointCfg, configuration)
            .setStateBackend(defaultStateBackend)
            .setChangelogStateBackendEnabled(changelogStateBackendEnabled)
            .setSavepointDir(defaultSavepointDirectory)
            .setChaining(isChainingEnabled)
            .setUserArtifacts(cacheFile)
            .setTimeCharacteristic(timeCharacteristic)
            .setDefaultBufferTimeout(bufferTimeout)
            .setSlotSharingGroupResource(slotSharingGroupResources);
}

getStreamGraphGenerator 办法次要就是结构 StreamGraphGenerator 对象,StreamGraphGenerator 结构实现后,就能够调用 generate 办法来产生 StreamGraph 了,在看 generate 办法之前先来看一下 StreamGraphGenerator 的动态代码块。

StreamGraphGenerator 源码

static {@SuppressWarnings("rawtypes")
    Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
            tmp = new HashMap<>();
    tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
    tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
    tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
    tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
    tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
    tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
    tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
    tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
    tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
    tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
    tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
    tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
    tmp.put(
            TimestampsAndWatermarksTransformation.class,
            new TimestampsAndWatermarksTransformationTranslator<>());
    tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
    tmp.put(
            KeyedBroadcastStateTransformation.class,
            new KeyedBroadcastStateTransformationTranslator<>());
    translatorMap = Collections.unmodifiableMap(tmp);
}

在初始化 StreamGraphGenerator 之前,会先执行其动态代码块生成一个 Transformation -> TransformationTranslator 映射关系的 Map 汇合,前面会用到这个 Map。

transform 源码

// 依据 Transformation 获取对应的 TransformationTranslator 
final TransformationTranslator<?, Transformation<?>> translator =
        (TransformationTranslator<?, Transformation<?>>)
                translatorMap.get(transform.getClass());

Collection<Integer> transformedIds;
if (translator != null) {transformedIds = translate(translator, transform);
} else {transformedIds = legacyTransform(transform);
}

结构完 StreamGraphGenerator 对象后,紧接着会调用 generate 办法,而后又调用了 transform 办法,这里会从下面生成的 Map 外面获取到对应的 TransformationTranslator,而后调用 translate 办法。

translate#translateForStreaming#translateForStreamingInternal 源码

@Override
protected Collection<Integer> translateForStreamingInternal(
        final KeyedBroadcastStateTransformation<KEY, IN1, IN2, OUT> transformation,
        final Context context) {checkNotNull(transformation);
    checkNotNull(context);
        // 构建 CoBroadcastWithKeyedOperator 
    CoBroadcastWithKeyedOperator<KEY, IN1, IN2, OUT> operator =
            new CoBroadcastWithKeyedOperator<>(transformation.getUserFunction(),
                    transformation.getBroadcastStateDescriptors());

    return translateInternal(
            transformation,
            transformation.getRegularInput(),
            transformation.getBroadcastInput(),
            SimpleOperatorFactory.of(operator),
            transformation.getStateKeyType(),
            transformation.getKeySelector(),
            null /* no key selector on broadcast input */,
            context);
}

translate 办法最终会调用到 KeyedBroadcastStateTransformationTranslator 的 translateForStreamingInternal 办法中,依据 UserFunction(用户代码)和 broadcastStateDescriptors(播送状态描述符)结构 CoBroadcastWithKeyedOperator 对象。

CoBroadcastWithKeyedOperator 源码

/**
 * A {@link TwoInputStreamOperator} for executing {@link KeyedBroadcastProcessFunction
 * KeyedBroadcastProcessFunctions}.
 *
 * @param <KS> The key type of the input keyed stream.
 * @param <IN1> The input type of the keyed (non-broadcast) side.
 * @param <IN2> The input type of the broadcast side.
 * @param <OUT> The output type of the operator.
 */
@Internal
public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
        extends AbstractUdfStreamOperator<OUT, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>>
        implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<KS, VoidNamespace> {

    private static final long serialVersionUID = 5926499536290284870L;

    private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;

    private transient TimestampedCollector<OUT> collector;

    private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;

    private transient ReadWriteContextImpl rwContext;

    private transient ReadOnlyContextImpl rContext;

    private transient OnTimerContextImpl onTimerContext;

    public CoBroadcastWithKeyedOperator(
            final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
            final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {super(function);
        this.broadcastStateDescriptors = Preconditions.checkNotNull(broadcastStateDescriptors);
    }

    @Override
    public void open() throws Exception {super.open();

        InternalTimerService<VoidNamespace> internalTimerService =
                getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);

        TimerService timerService = new SimpleTimerService(internalTimerService);

        collector = new TimestampedCollector<>(output);

        this.broadcastStates = new HashMap<>(broadcastStateDescriptors.size());
        for (MapStateDescriptor<?, ?> descriptor : broadcastStateDescriptors) {
            broadcastStates.put(
                    descriptor, 
              // 初始化状态实现实例
              getOperatorStateBackend().getBroadcastState(descriptor));
        }

        rwContext =
                new ReadWriteContextImpl(getExecutionConfig(),
                        getKeyedStateBackend(),
                        userFunction,
                        broadcastStates,
                        timerService);
        rContext =
                new ReadOnlyContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService);
        onTimerContext =
                new OnTimerContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService);
    }

    @Override
    public void processElement1(StreamRecord<IN1> element) throws Exception {collector.setTimestamp(element);
        rContext.setElement(element);
        userFunction.processElement(element.getValue(), rContext, collector);
        rContext.setElement(null);
    }

    @Override
    public void processElement2(StreamRecord<IN2> element) throws Exception {collector.setTimestamp(element);
        rwContext.setElement(element);
        userFunction.processBroadcastElement(element.getValue(), rwContext, collector);
        rwContext.setElement(null);
    }

    private class ReadWriteContextImpl
            extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context {

        private final ExecutionConfig config;

        private final KeyedStateBackend<KS> keyedStateBackend;

        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;

        private final TimerService timerService;

        private StreamRecord<IN2> element;

        ReadWriteContextImpl(
                final ExecutionConfig executionConfig,
                final KeyedStateBackend<KS> keyedStateBackend,
                final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
                final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
                final TimerService timerService) {function.super();
            this.config = Preconditions.checkNotNull(executionConfig);
            this.keyedStateBackend = Preconditions.checkNotNull(keyedStateBackend);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }

        void setElement(StreamRecord<IN2> e) {this.element = e;}

        @Override
        public Long timestamp() {checkState(element != null);
            return element.getTimestamp();}

        @Override
        public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {Preconditions.checkNotNull(stateDescriptor);

            stateDescriptor.initializeSerializerUnlessSet(config);
            BroadcastState<K, V> state = (BroadcastState<K, V>) states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException(
                        "The requested state does not exist."
                                + "Check for typos in your state descriptor, or specify the state descriptor"
                                + "in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {checkArgument(outputTag != null, "OutputTag must not be null.");
            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
        }

        @Override
        public long currentProcessingTime() {return timerService.currentProcessingTime();
        }

        @Override
        public long currentWatermark() {return timerService.currentWatermark();
        }

        @Override
        public <VS, S extends State> void applyToKeyedState(
                final StateDescriptor<S, VS> stateDescriptor,
                final KeyedStateFunction<KS, S> function)
                throws Exception {

            keyedStateBackend.applyToAllKeys(
                    VoidNamespace.INSTANCE,
                    VoidNamespaceSerializer.INSTANCE,
                    Preconditions.checkNotNull(stateDescriptor),
                    Preconditions.checkNotNull(function));
        }
    }

    private class ReadOnlyContextImpl extends ReadOnlyContext {

        private final ExecutionConfig config;

        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;

        private final TimerService timerService;

        private StreamRecord<IN1> element;

        ReadOnlyContextImpl(
                final ExecutionConfig executionConfig,
                final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
                final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
                final TimerService timerService) {function.super();
            this.config = Preconditions.checkNotNull(executionConfig);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }

        void setElement(StreamRecord<IN1> e) {this.element = e;}

        @Override
        public Long timestamp() {checkState(element != null);
            return element.hasTimestamp() ? element.getTimestamp() : null;
        }

        @Override
        public TimerService timerService() {return timerService;}

        @Override
        public long currentProcessingTime() {return timerService.currentProcessingTime();
        }

        @Override
        public long currentWatermark() {return timerService.currentWatermark();
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {checkArgument(outputTag != null, "OutputTag must not be null.");
            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
        }

        @Override
        public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {Preconditions.checkNotNull(stateDescriptor);

            stateDescriptor.initializeSerializerUnlessSet(config);
            ReadOnlyBroadcastState<K, V> state =
                    (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException(
                        "The requested state does not exist."
                                + "Check for typos in your state descriptor, or specify the state descriptor"
                                + "in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }

        @Override
        @SuppressWarnings("unchecked")
        public KS getCurrentKey() {return (KS) CoBroadcastWithKeyedOperator.this.getCurrentKey();}
    }

    private class OnTimerContextImpl
            extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext {

        private final ExecutionConfig config;

        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;

        private final TimerService timerService;

        private TimeDomain timeDomain;

        private InternalTimer<KS, VoidNamespace> timer;

        OnTimerContextImpl(
                final ExecutionConfig executionConfig,
                final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
                final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
                final TimerService timerService) {function.super();
            this.config = Preconditions.checkNotNull(executionConfig);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }

        @Override
        public Long timestamp() {checkState(timer != null);
            return timer.getTimestamp();}

        @Override
        public TimeDomain timeDomain() {checkState(timeDomain != null);
            return timeDomain;
        }

        @Override
        public KS getCurrentKey() {return timer.getKey();
        }

        @Override
        public TimerService timerService() {return timerService;}

        @Override
        public long currentProcessingTime() {return timerService.currentProcessingTime();
        }

        @Override
        public long currentWatermark() {return timerService.currentWatermark();
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {checkArgument(outputTag != null, "OutputTag must not be null.");
            output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
        }

        @Override
        public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {Preconditions.checkNotNull(stateDescriptor);

            stateDescriptor.initializeSerializerUnlessSet(config);
            ReadOnlyBroadcastState<K, V> state =
                    (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException(
                        "The requested state does not exist."
                                + "Check for typos in your state descriptor, or specify the state descriptor"
                                + "in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }
    }
}

在剖析 CoBroadcastWithKeyedOperator 源码之前,先来看一下 CoBroadcastWithKeyedOperator 的 UML 图。

CoBroadcastWithKeyedOperator UML 图

能够看到 CoBroadcastWithKeyedOperator 实现了 TwoInputStreamOperator 这个接口,从命名上就能晓得,这是一个具备两个输出流的 StreamOperator 接口,因为 CoBroadcastWithKeyedOperator 的上游连贯的是两个数据流,所以就实现了这个接口,上面再来看一下 TwoInputStreamOperator 的源码。

TwoInputStreamOperator 源码

/**
 * Interface for stream operators with two inputs. Use {@link
 * org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if you want to
 * implement a custom operator.
 *
 * @param <IN1> The input type of the operator
 * @param <IN2> The input type of the operator
 * @param <OUT> The output type of the operator
 */
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {

    /**
     * Processes one element that arrived on the first input of this two-input operator. This method
     * is guaranteed to not be called concurrently with other methods of the operator.
     */
    void processElement1(StreamRecord<IN1> element) throws Exception;

    /**
     * Processes one element that arrived on the second input of this two-input operator. This
     * method is guaranteed to not be called concurrently with other methods of the operator.
     */
    void processElement2(StreamRecord<IN2> element) throws Exception;

}

TwoInputStreamOperator 接口外面定义了两个办法,其中 processElement1 是用来解决非播送流的数据,processElement2 是用来解决播送流的数据。

接着回到 CoBroadcastWithKeyedOperator 的 open 办法,首先会初始化 broadcastStates,用来保留 MapStateDescriptor -> BroadcastState 的映射关系,而后初始化 ReadWriteContextImpl 和 ReadOnlyContextImpl 对象,顾名思义 ReadWriteContextImpl 是既能够读也能够写状态,ReadOnlyContextImpl 是只能读状态,不能写状态,在 open 办法外面还有一个重要的事件,就是初始化播送状态的实现类。

getBroadcastState 源码

public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {Preconditions.checkNotNull(stateDescriptor);
    String name = Preconditions.checkNotNull(stateDescriptor.getName());

    BackendWritableBroadcastState<K, V> previous =
            (BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name);

    if (previous != null) {
        checkStateNameAndMode(previous.getStateMetaInfo().getName(),
                name,
                previous.getStateMetaInfo().getAssignmentMode(),
                OperatorStateHandle.Mode.BROADCAST);
        return previous;
    }

    stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
    TypeSerializer<K> broadcastStateKeySerializer =
            Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
    TypeSerializer<V> broadcastStateValueSerializer =
            Preconditions.checkNotNull(stateDescriptor.getValueSerializer());

    BackendWritableBroadcastState<K, V> broadcastState =
            (BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);

    if (broadcastState == null) {
        broadcastState =
                new HeapBroadcastState<>(
                        new RegisteredBroadcastStateBackendMetaInfo<>(
                                name,
                                OperatorStateHandle.Mode.BROADCAST,
                                broadcastStateKeySerializer,
                                broadcastStateValueSerializer));
        registeredBroadcastStates.put(name, broadcastState);
    } else {
        // has restored state; check compatibility of new state access

        checkStateNameAndMode(broadcastState.getStateMetaInfo().getName(),
                name,
                broadcastState.getStateMetaInfo().getAssignmentMode(),
                OperatorStateHandle.Mode.BROADCAST);

        RegisteredBroadcastStateBackendMetaInfo<K, V> restoredBroadcastStateMetaInfo =
                broadcastState.getStateMetaInfo();

        // check whether new serializers are incompatible
        TypeSerializerSchemaCompatibility<K> keyCompatibility =
                restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
        if (keyCompatibility.isIncompatible()) {
            throw new StateMigrationException("The new key typeSerializer for broadcast state must not be incompatible.");
        }

        TypeSerializerSchemaCompatibility<V> valueCompatibility =
                restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer);
        if (valueCompatibility.isIncompatible()) {
            throw new StateMigrationException("The new value typeSerializer for broadcast state must not be incompatible.");
        }

        broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
    }

    accessedBroadcastStatesByName.put(name, broadcastState);
    return broadcastState;
}

getBroadcastState 办法次要就是初始化 HeapBroadcastState 对象,也就是播送状态的具体实现类,再来看一下 HeapBroadcastState 源码。

HeapBroadcastState 源码

/**
 * A {@link BroadcastState Broadcast State} backed a heap-based {@link Map}.
 *
 * @param <K> The key type of the elements in the {@link BroadcastState Broadcast State}.
 * @param <V> The value type of the elements in the {@link BroadcastState Broadcast State}.
 */
public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K, V> {

    /** Meta information of the state, including state name, assignment mode, and serializer. */
    private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;

    /** The internal map the holds the elements of the state. */
    private final Map<K, V> backingMap;

    /** A serializer that allows to perform deep copies of internal map state. */
    private final MapSerializer<K, V> internalMapCopySerializer;

    HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {this(stateMetaInfo, new HashMap<>());
    }

    private HeapBroadcastState(
            final RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo,
            final Map<K, V> internalMap) {this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
        this.backingMap = Preconditions.checkNotNull(internalMap);
        this.internalMapCopySerializer =
                new MapSerializer<>(stateMetaInfo.getKeySerializer(), stateMetaInfo.getValueSerializer());
    }

    private HeapBroadcastState(HeapBroadcastState<K, V> toCopy) {
        this(toCopy.stateMetaInfo.deepCopy(),
                toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
    }

    @Override
    public void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {this.stateMetaInfo = stateMetaInfo;}

    @Override
    public RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo() {return stateMetaInfo;}

    @Override
    public HeapBroadcastState<K, V> deepCopy() {return new HeapBroadcastState<>(this);
    }

    @Override
    public void clear() {backingMap.clear();
    }

    @Override
    public String toString() {
        return "HeapBroadcastState{"
                + "stateMetaInfo="
                + stateMetaInfo
                + ", backingMap="
                + backingMap
                + ", internalMapCopySerializer="
                + internalMapCopySerializer
                + '}';
    }

    @Override
    public long write(FSDataOutputStream out) throws IOException {long partitionOffset = out.getPos();

        DataOutputView dov = new DataOutputViewStreamWrapper(out);
        dov.writeInt(backingMap.size());
        for (Map.Entry<K, V> entry : backingMap.entrySet()) {getStateMetaInfo().getKeySerializer().serialize(entry.getKey(), dov);
            getStateMetaInfo().getValueSerializer().serialize(entry.getValue(), dov);
        }

        return partitionOffset;
    }

    @Override
    public V get(K key) {return backingMap.get(key);
    }

    @Override
    public void put(K key, V value) {backingMap.put(key, value);
    }

    @Override
    public void putAll(Map<K, V> map) {backingMap.putAll(map);
    }

    @Override
    public void remove(K key) {backingMap.remove(key);
    }

    @Override
    public boolean contains(K key) {return backingMap.containsKey(key);
    }

    @Override
    public Iterator<Map.Entry<K, V>> iterator() {return backingMap.entrySet().iterator();}

    @Override
    public Iterable<Map.Entry<K, V>> entries() {return backingMap.entrySet();
    }

    @Override
    public Iterable<Map.Entry<K, V>> immutableEntries() {return Collections.unmodifiableSet(backingMap.entrySet());
    }
}

HeapBroadcastState 的代码比较简单,次要是对状态的读写操作,实质就是在操作 HashMap。

接着回到 CoBroadcastWithKeyedOperator 的 processElement1 办法里用的是 ReadOnlyContextImpl,processElement2 办法里用的是 ReadWriteContextImpl,换句话说,只有在播送侧才能够批改状态,在非播送侧不能批改状态,这里对应了下面的第二个问题。

尽管在播送侧和非广侧都能够获取到状态,然而 getBroadcastState 办法的返回值是不一样的。

BroadcastState & ReadOnlyBroadcastState UML 图

BroadcastState 接口继承了 ReadOnlyBroadcastState 接口又继承了 State 接口,BroadcastState 接口的惟一实现类是 HeapBroadcastState,从名字上就能看出播送状态是存储在 JVM 堆内存上的。底层就是一个 Map,上图中的 backingMap 就是用来保留状态数据的,这里对应了下面的第三个问题。

为了进一步解释下面的第二个问题,上面补充一个具体的场景来阐明。

举例说明

咱们来看上图中的场景,A 流读取 Kafka 的数据而后通过 keyby 返回一个 KeyedStream,B 流读取 mysql 的数据用于播送流返回一个 BroadcastStream,B 流有两条数据别离是 flink,spark,而后会播送到上游的每一个 subtask 下来,此时上游的 subtask-0,subtask-1 就领有了播送状态中的 flink,spark 两条数据,这个时候往 Kafka 里写入两条数据别离是 flink 和 hive, 通过 keyby 操作,flink 被调配到了上游的 subtask-0 上,hive 被调配到了 subtask-1 上,很显著 flink 这条数据能够和播送流数据关联上,hive 这条数据则关联不上,此时,如果在非播送侧也就是 A 流侧批改了状态,比方把 flink, hive 增加到了状态外面,此时 subtask-0 和 subtask-1 上的播送状态数据就会呈现不统一的状况,所以,为了保障 operator 的所有并发实例持有的播送状态的一致性,在设计的时候就禁止在非播送侧批改状态。

总结

Broadcast State 是 Operator State 的一种非凡类型。次要是用来解决低吞吐量的流(小数据量)和另一个原始数据流关联的场景,播送状态必须定义为 Map 构造,并且只能在播送流侧批改状态,非播送流侧只能获取状态,不能批改状态。播送状态只能保留在堆内存中,所以在应用播送状态的时候须要给 TM 设置足够的内存,本文次要从源码的角度解释了 Flink 这么设计的起因,让大家对播送流状态有了更加深刻的了解。

举荐浏览

Flink 工作实时监控最佳实际

Flink on yarn 实时日志收集最佳实际

Flink 1.14.0 全新的 Kafka Connector

Flink 1.14.0 生产 kafka 数据自定义反序列化类

Flink SQL JSON Format 源码解析

Flink 通过 State Processor API 实现状态的读取和写入

Flink 源码剖析之 Client 端启动流程剖析

Flink Print SQL Connector 增加随机取样性能

Flink on yarn 近程调试源码

IDEA 中应用 Big Data Tools 连贯大数据组件

如果你感觉文章对你有帮忙, 麻烦点一下 在看 吧, 你的反对是我创作的最大能源.

正文完
 0