共计 10140 个字符,预计需要花费 26 分钟才能阅读完成。
给 Print SQL Connector 增加随机取样
Flink 提供了 Print SQL Connector 能够让咱们十分不便的把数据打印到规范输入. 有助于咱们测试 SQL 工作, 测验数据的正确性.
然而在生产环境中, 上游的数据量是十分大的, 如果间接把数据输入的话, 可能会把规范输入文件打满, 造成页面卡死的状况, 反而不利于咱们观测数据, 所以咱们能够对 Print SQL Connector 进行简略的革新, 加一个随机取样的参数控制数据输入.
间接把 Print SQL Connector 相干的代码复制进去.
PrintRateTableSinkFactory
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flink.stream.connector.print;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import static flink.stream.connector.print.PrintConnectorOptions.PRINT_RATE;
import static org.apache.flink.connector.print.table.PrintConnectorOptions.PRINT_IDENTIFIER;
import static org.apache.flink.connector.print.table.PrintConnectorOptions.STANDARD_ERROR;
/**
* Print table sink factory writing every row to the standard output or standard error stream. It is
* designed for: - easy test for streaming job. - very useful in production debugging.
*
* <p>Four possible format options: {@code PRINT_IDENTIFIER}:taskId> output <- {@code
* PRINT_IDENTIFIER} provided, parallelism > 1 {@code PRINT_IDENTIFIER}> output <- {@code
* PRINT_IDENTIFIER} provided, parallelism == 1 taskId> output <- no {@code PRINT_IDENTIFIER}
* provided, parallelism > 1 output <- no {@code PRINT_IDENTIFIER} provided, parallelism == 1
*
* <p>output string format is "$RowKind[f0, f1, f2, ...]", example is: "+I[1, 1]".
*/
@Internal
public class PrintRateTableSinkFactory implements DynamicTableSinkFactory {
// 简略批改
public static final String IDENTIFIER = "print-rate";
@Override
public String factoryIdentifier() {return IDENTIFIER;}
@Override
public Set<ConfigOption<?>> requiredOptions() {return new HashSet<>();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {Set<ConfigOption<?>> options = new HashSet<>();
options.add(PRINT_IDENTIFIER);
options.add(STANDARD_ERROR);
options.add(FactoryUtil.SINK_PARALLELISM);
// 增加到 options
options.add(PRINT_RATE);
return options;
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
return new PrintSink(context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(),
context.getCatalogTable().getPartitionKeys(),
options.get(PRINT_IDENTIFIER),
options.get(STANDARD_ERROR),
options.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null),
options.get(PRINT_RATE));
}
private static class PrintSink implements DynamicTableSink, SupportsPartitioning {
private final DataType type;
private String printIdentifier;
private final boolean stdErr;
private final @Nullable Integer parallelism;
private final List<String> partitionKeys;
private Map<String, String> staticPartitions = new LinkedHashMap<>();
private @Nullable Float printRate;
private PrintSink(
DataType type,
List<String> partitionKeys,
String printIdentifier,
boolean stdErr,
Integer parallelism,
Float printRate) {
this.type = type;
this.partitionKeys = partitionKeys;
this.printIdentifier = printIdentifier;
this.stdErr = stdErr;
this.parallelism = parallelism;
this.printRate = printRate;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {return requestedMode;}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {DataStructureConverter converter = context.createDataStructureConverter(type);
staticPartitions.forEach((key, value) -> {
printIdentifier = null != printIdentifier ? printIdentifier + ":" : "";
printIdentifier += key + "=" + value;
});
return SinkFunctionProvider.of(new RowDataPrintFunction(converter, printIdentifier, stdErr, printRate), parallelism);
}
@Override
public DynamicTableSink copy() {return new PrintSink(type, partitionKeys, printIdentifier, stdErr, parallelism, printRate);
}
@Override
public String asSummaryString() {return "Print to" + (stdErr ? "System.err" : "System.out");
}
@Override
public void applyStaticPartition(Map<String, String> partition) {
// make it a LinkedHashMap to maintain partition column order
staticPartitions = new LinkedHashMap<>();
for (String partitionCol : partitionKeys) {if (partition.containsKey(partitionCol)) {staticPartitions.put(partitionCol, partition.get(partitionCol));
}
}
}
}
/**
* Implementation of the SinkFunction converting {@link RowData} to string and passing to {@link
* PrintSinkFunction}.
*/
private static class RowDataPrintFunction extends RichSinkFunction<RowData> {
private static final long serialVersionUID = 1L;
private final DataStructureConverter converter;
private final PrintSinkOutputWriter<String> writer;
private final Float printRate;
private RowDataPrintFunction(DataStructureConverter converter, String printIdentifier, boolean stdErr, Float printRate) {
this.converter = converter;
this.writer = new PrintSinkOutputWriter<>(printIdentifier, stdErr);
this.printRate = printRate;
}
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
}
@Override
public void invoke(RowData value, Context context) {if (ThreadLocalRandom.current().nextFloat() < this.printRate) {Object data = converter.toExternal(value);
assert data != null;
writer.write(data.toString());
}
}
}
}
PrintConnectorOptions
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flink.stream.connector.print;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import static org.apache.flink.configuration.ConfigOptions.key;
/** Options for the Print sink connector. */
@PublicEvolving
public class PrintConnectorOptions {
public static final ConfigOption<String> PRINT_IDENTIFIER =
key("print-identifier")
.stringType()
.noDefaultValue()
.withDescription("Message that identify print and is prefixed to the output of the value.");
public static final ConfigOption<Boolean> STANDARD_ERROR =
key("standard-error")
.booleanType()
.defaultValue(false)
.withDescription("True, if the format should print to standard error instead of standard out.");
public static final ConfigOption<Float> PRINT_RATE =
key("print-rate")
.floatType()
.defaultValue(0.0001F)
.withDescription("Controls the printing rate of data");
private PrintConnectorOptions() {}
}
首先在 PrintConnectorOptions 配置外面增加 PRINT_RATE 属性, 用来管制随机取样, 默认值是 0.0001.
而后在 PrintRateTableSinkFactory 中把 connector 的惟一标识符 IDENTIFIER 改成 print-rate, 其实不改也是能够的, 只是为了和默认的 Print 做辨别.
在 PrintRateTableSinkFactory#optionalOptions 办法外面退出咱们增加的属性 PRINT_RATE.
@Override
public Set<ConfigOption<?>> optionalOptions() {Set<ConfigOption<?>> options = new HashSet<>();
options.add(PRINT_IDENTIFIER);
options.add(STANDARD_ERROR);
options.add(FactoryUtil.SINK_PARALLELISM);
options.add(PRINT_RATE);
return options;
}
而后把这个参数传入到上面的 PrintSink 最初传入到 RowDataPrintFunction 外面, 最终在 invoke 办法外面增加随机取样的逻辑.
@Override
public void invoke(RowData value, Context context) {if (ThreadLocalRandom.current().nextFloat() < this.printRate) {Object data = converter.toExternal(value);
assert data != null;
writer.write(data.toString());
}
}
到这里代码就批改完了, 非常简单, 一共不到 10 行代码.
最初还要把 PrintRateTableSinkFactory 增加到 META-INF/services 下的配置文件中, 因为 Flink 是用 Java SPI 机制加载这些 connector 的.
最初来测试一下批改后的 connector, 先把打完的 jar 包上传到服务器的 flink/lib 目录上面. 创立初始化脚本和 SQL 文件.
init.sql
SET 'parallelism.default' = '8';
SET 'taskmanager.memory.network.fraction' = '0.01';
SET 'pipeline.name' = 'test-print-rate';
SET 'sql-client.verbose' = 'true';
test_print_rate.sql
CREATE TABLE kafka_table (
name string,
age int,
city string,
ts BIGINT,
proctime as PROCTIME(),
rt as TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR rt AS rt - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',
'properties.group.id' = 'jason_flink_test',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'false'
);
CREATE TABLE print_table
(f1 TIMESTAMP(3),
f2 TIMESTAMP(3),
f3 BIGINT,
f4 STRING
)
WITH (
'connector' = 'print-rate',
'standard-error' = 'false',
'print-rate' = '0.01',
'sink.parallelism' = '4'
);
insert into print_table
select
window_start,
window_end,
count(name),
name
from table(HOP(table kafka_table,descriptor(proctime),interval '30' second, interval '1' HOUR))
group by window_start,
window_end,
name;
这里用的是下面革新的 print-rate connector, 能够通过 ‘print-rate’ = ‘xxx’ 来管制随机取样.
提交工作
sql-client.sh -i init.sql -f test_print_rate.sql
工作提交胜利后, 先向 kafka 里写入数据, 而后到 TM 的 Stdout 外面看下打印的数据.
能够看到数据的确做了随机取样, 因为如果用默认的 Print Connector 的话, 每条数据都会打印进去, 因为 key 都是不一样的. 这样打印的数据就会缩小很多, 当上游数据量十分大时, 也不会造成什么问题.