聊聊flink的Broadcast State

36次阅读

共计 14270 个字符,预计需要花费 36 分钟才能阅读完成。


本文主要研究一下 flink 的 Broadcast State
实例
@Test
public void testBroadcastState() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> originStream = env.addSource(new RandomWordSource());

MapStateDescriptor<String, String> descriptor = new MapStateDescriptor(“dynamicConfig”, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
BroadcastStream<Tuple2<String,String>> configStream = env.addSource(new DynamicConfigSource()).broadcast(descriptor);

BroadcastConnectedStream<String, Tuple2<String,String>> connectStream = originStream.connect(configStream);
connectStream.process(new BroadcastProcessFunction<String, Tuple2<String,String>, Void>() {
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<Void> out) throws Exception {
ReadOnlyBroadcastState<String,String> config = ctx.getBroadcastState(descriptor);
String configValue = config.get(“demoConfigKey”);
//do some process base on the config
LOGGER.info(“process value:{},config:{}”,value,configValue);
}

@Override
public void processBroadcastElement(Tuple2<String, String> value, Context ctx, Collector<Void> out) throws Exception {
LOGGER.info(“receive config item:{}”,value);
//update state
ctx.getBroadcastState(descriptor).put(value.getField(0),value.getField(1));
}
});

env.execute(“testBroadcastState”);
}

public class DynamicConfigSource implements SourceFunction<Tuple2<String,String>> {

private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
long idx = 1;
while (isRunning){
ctx.collect(Tuple2.of(“demoConfigKey”,”value” + idx));
idx++;
TimeUnit.SECONDS.sleep(10);
}
}

@Override
public void cancel() {
isRunning = false;
}
}
这里模拟了一个配置的 source,定时去刷新配置,然后 broadcast 到每个 task
MapStateDescriptor
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MapStateDescriptor.java
@PublicEvolving
public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {

private static final long serialVersionUID = 1L;

/**
* Create a new {@code MapStateDescriptor} with the given name and the given type serializers.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keySerializer The type serializer for the keys in the state.
* @param valueSerializer The type serializer for the values in the state.
*/
public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
super(name, new MapSerializer<>(keySerializer, valueSerializer), null);
}

/**
* Create a new {@code MapStateDescriptor} with the given name and the given type information.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keyTypeInfo The type information for the keys in the state.
* @param valueTypeInfo The type information for the values in the state.
*/
public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) {
super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);
}

/**
* Create a new {@code MapStateDescriptor} with the given name and the given type information.
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keyClass The class of the type of keys in the state.
* @param valueClass The class of the type of values in the state.
*/
public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
super(name, new MapTypeInfo<>(keyClass, valueClass), null);
}

@Override
public Type getType() {
return Type.MAP;
}

/**
* Gets the serializer for the keys in the state.
*
* @return The serializer for the keys in the state.
*/
public TypeSerializer<UK> getKeySerializer() {
final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
if (!(rawSerializer instanceof MapSerializer)) {
throw new IllegalStateException(“Unexpected serializer type.”);
}

return ((MapSerializer<UK, UV>) rawSerializer).getKeySerializer();
}

/**
* Gets the serializer for the values in the state.
*
* @return The serializer for the values in the state.
*/
public TypeSerializer<UV> getValueSerializer() {
final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
if (!(rawSerializer instanceof MapSerializer)) {
throw new IllegalStateException(“Unexpected serializer type.”);
}

return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer();
}
}
MapStateDescriptor 继承了 StateDescriptor,其中 state 为 MapState 类型,value 为 Map 类型
DataStream.broadcast
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
/**
* Sets the partitioning of the {@link DataStream} so that the output elements
* are broadcasted to every parallel instance of the next operation. In addition,
* it implicitly as many {@link org.apache.flink.api.common.state.BroadcastState broadcast states}
* as the specified descriptors which can be used to store the element of the stream.
*
* @param broadcastStateDescriptors the descriptors of the broadcast states to create.
* @return A {@link BroadcastStream} which can be used in the {@link #connect(BroadcastStream)} to
* create a {@link BroadcastConnectedStream} for further processing of the elements.
*/
@PublicEvolving
public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>… broadcastStateDescriptors) {
Preconditions.checkNotNull(broadcastStateDescriptors);
final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
}

/**
* Internal function for setting the partitioner for the DataStream.
*
* @param partitioner
* Partitioner to set.
* @return The modified DataStream.
*/
protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
return new DataStream<>(this.getExecutionEnvironment(), new PartitionTransformation<>(this.getTransformation(), partitioner));
}

/**
* Sets the partitioning of the {@link DataStream} so that the output elements
* are broadcast to every parallel instance of the next operation.
*
* @return The DataStream with broadcast partitioning set.
*/
public DataStream<T> broadcast() {
return setConnectionType(new BroadcastPartitioner<T>());
}
DataStream 的 broadcast 方法,首先调用 setConnectionType,然后使用 MapStateDescriptor 作为参数创建 BroadcastStream 返回;DataStream 也有一个无参的 broadcast 方法,它直接调用 setConnectionType 返回 DataStream
DataStream.connect
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
/**
* Creates a new {@link ConnectedStreams} by connecting
* {@link DataStream} outputs of (possible) different types with each other.
* The DataStreams connected using this operator can be used with
* CoFunctions to apply joint transformations.
*
* @param dataStream
* The DataStream with which this stream will be connected.
* @return The {@link ConnectedStreams}.
*/
public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
return new ConnectedStreams<>(environment, this, dataStream);
}

/**
* Creates a new {@link BroadcastConnectedStream} by connecting the current
* {@link DataStream} or {@link KeyedStream} with a {@link BroadcastStream}.
*
* <p>The latter can be created using the {@link #broadcast(MapStateDescriptor[])} method.
*
* <p>The resulting stream can be further processed using the {@code BroadcastConnectedStream.process(MyFunction)}
* method, where {@code MyFunction} can be either a
* {@link org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction KeyedBroadcastProcessFunction}
* or a {@link org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction BroadcastProcessFunction}
* depending on the current stream being a {@link KeyedStream} or not.
*
* @param broadcastStream The broadcast stream with the broadcast state to be connected with this stream.
* @return The {@link BroadcastConnectedStream}.
*/
@PublicEvolving
public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
return new BroadcastConnectedStream<>(
environment,
this,
Preconditions.checkNotNull(broadcastStream),
broadcastStream.getBroadcastStateDescriptor());
}
DataStream 的 connect 方法参数可以是 DataStream 类型,也可以是 BroadcastStream 类型,如果是 BroadcastStream 类型则返回的是 BroadcastConnectedStream,否则是普通的 ConnectedStreams
BroadcastConnectedStream.process
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
@PublicEvolving
public class BroadcastConnectedStream<IN1, IN2> {

private final StreamExecutionEnvironment environment;
private final DataStream<IN1> inputStream1;
private final BroadcastStream<IN2> inputStream2;
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;

protected BroadcastConnectedStream(
final StreamExecutionEnvironment env,
final DataStream<IN1> input1,
final BroadcastStream<IN2> input2,
final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
this.environment = requireNonNull(env);
this.inputStream1 = requireNonNull(input1);
this.inputStream2 = requireNonNull(input2);
this.broadcastStateDescriptors = requireNonNull(broadcastStateDescriptors);
}

public StreamExecutionEnvironment getExecutionEnvironment() {
return environment;
}

/**
* Returns the non-broadcast {@link DataStream}.
*
* @return The stream which, by convention, is not broadcasted.
*/
public DataStream<IN1> getFirstInput() {
return inputStream1;
}

/**
* Returns the {@link BroadcastStream}.
*
* @return The stream which, by convention, is the broadcast one.
*/
public BroadcastStream<IN2> getSecondInput() {
return inputStream2;
}

/**
* Gets the type of the first input.
*
* @return The type of the first input
*/
public TypeInformation<IN1> getType1() {
return inputStream1.getType();
}

/**
* Gets the type of the second input.
*
* @return The type of the second input
*/
public TypeInformation<IN2> getType2() {
return inputStream2.getType();
}

/**
* Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
* {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
*
* @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
* @param <KS> The type of the keys in the keyed stream.
* @param <OUT> The type of the output elements.
* @return The transformed {@link DataStream}.
*/
@PublicEvolving
public <KS, OUT> SingleOutputStreamOperator<OUT> process(final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function) {

TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
function,
KeyedBroadcastProcessFunction.class,
1,
2,
3,
TypeExtractor.NO_INDEX,
getType1(),
getType2(),
Utils.getCallLocationName(),
true);

return process(function, outTypeInfo);
}

/**
* Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
* {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
*
* @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
* @param outTypeInfo The type of the output elements.
* @param <KS> The type of the keys in the keyed stream.
* @param <OUT> The type of the output elements.
* @return The transformed {@link DataStream}.
*/
@PublicEvolving
public <KS, OUT> SingleOutputStreamOperator<OUT> process(
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final TypeInformation<OUT> outTypeInfo) {

Preconditions.checkNotNull(function);
Preconditions.checkArgument(inputStream1 instanceof KeyedStream,
“A KeyedBroadcastProcessFunction can only be used on a keyed stream.”);

TwoInputStreamOperator<IN1, IN2, OUT> operator =
new CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors);
return transform(“Co-Process-Broadcast-Keyed”, outTypeInfo, operator);
}

/**
* Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given
* {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream.
*
* @param function The {@link BroadcastProcessFunction} that is called for each element in the stream.
* @param <OUT> The type of the output elements.
* @return The transformed {@link DataStream}.
*/
@PublicEvolving
public <OUT> SingleOutputStreamOperator<OUT> process(final BroadcastProcessFunction<IN1, IN2, OUT> function) {

TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
function,
BroadcastProcessFunction.class,
0,
1,
2,
TypeExtractor.NO_INDEX,
getType1(),
getType2(),
Utils.getCallLocationName(),
true);

return process(function, outTypeInfo);
}

/**
* Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given
* {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream.
*
* @param function The {@link BroadcastProcessFunction} that is called for each element in the stream.
* @param outTypeInfo The type of the output elements.
* @param <OUT> The type of the output elements.
* @return The transformed {@link DataStream}.
*/
@PublicEvolving
public <OUT> SingleOutputStreamOperator<OUT> process(
final BroadcastProcessFunction<IN1, IN2, OUT> function,
final TypeInformation<OUT> outTypeInfo) {

Preconditions.checkNotNull(function);
Preconditions.checkArgument(!(inputStream1 instanceof KeyedStream),
“A BroadcastProcessFunction can only be used on a non-keyed stream.”);

TwoInputStreamOperator<IN1, IN2, OUT> operator =
new CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors);
return transform(“Co-Process-Broadcast”, outTypeInfo, operator);
}

@Internal
private <OUT> SingleOutputStreamOperator<OUT> transform(
final String functionName,
final TypeInformation<OUT> outTypeInfo,
final TwoInputStreamOperator<IN1, IN2, OUT> operator) {

// read the output type of the input Transforms to coax out errors about MissingTypeInfo
inputStream1.getType();
inputStream2.getType();

TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<>(
inputStream1.getTransformation(),
inputStream2.getTransformation(),
functionName,
operator,
outTypeInfo,
environment.getParallelism());

if (inputStream1 instanceof KeyedStream) {
KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1;
TypeInformation<?> keyType1 = keyedInput1.getKeyType();
transform.setStateKeySelectors(keyedInput1.getKeySelector(), null);
transform.setStateKeyType(keyType1);
}

@SuppressWarnings({“unchecked”, “rawtypes”})
SingleOutputStreamOperator<OUT> returnStream = new SingleOutputStreamOperator(environment, transform);

getExecutionEnvironment().addOperator(transform);

return returnStream;
}

protected <F> F clean(F f) {
return getExecutionEnvironment().clean(f);
}
}
BroadcastConnectedStream.process 接收两种类型的 function,一种是 KeyedBroadcastProcessFunction,另外一种是 BroadcastProcessFunction;它们都定义了 processElement、processBroadcastElement 抽象方法,只是 KeyedBroadcastProcessFunction 多定义了一个 onTimer 方法,默认是空操作,允许子类重写
小结

对于 broadcast 的使用有几个步骤,1 是建立 MapStateDescriptor,然后通过 DataStream.broadcast 方法返回 BroadcastStream;2 是需要接受 broadcast 的 stream 通过 DataStream.connect 方法跟 BroadcastStream 进行连接返回 BroadcastConnectedStream;3 是通过 BroadcastConnectedStream.process 方法进行 processElement 及 processBroadcastElement 处理
BroadcastConnectedStream.process 接收两种类型的 function,一种是 KeyedBroadcastProcessFunction,另外一种是 BroadcastProcessFunction;它们都定义了 processElement、processBroadcastElement 抽象方法,只是 KeyedBroadcastProcessFunction 多定义了一个 onTimer 方法,默认是空操作,允许子类重写
Broadcast State 为 map format,它会将 state 广播到每个 task,注意该 state 并不会跨 task 传播,对其修改,仅仅是作用在其所在的 task;downstream tasks 接收到 broadcast event 的顺序可能不一样,所以依赖其到达顺序来处理 element 的时候要小心;checkpoint 的时候也会 checkpoint broadcast state;另外就是 Broadcast State 只在内存有,没有 RocksDB state backend

doc
The Broadcast State Pattern

正文完
 0