给 Print SQL Connector 增加随机取样

Flink 提供了 Print SQL Connector 能够让咱们十分不便的把数据打印到规范输入.有助于咱们测试 SQL 工作,测验数据的正确性.

然而在生产环境中,上游的数据量是十分大的,如果间接把数据输入的话,可能会把规范输入文件打满,造成页面卡死的状况,反而不利于咱们观测数据,所以咱们能够对 Print SQL Connector 进行简略的革新,加一个随机取样的参数控制数据输入.

间接把 Print SQL Connector 相干的代码复制进去.

PrintRateTableSinkFactory

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package flink.stream.connector.print;import org.apache.flink.annotation.Internal;import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;import org.apache.flink.table.connector.ChangelogMode;import org.apache.flink.table.connector.sink.DynamicTableSink;import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;import org.apache.flink.table.connector.sink.SinkFunctionProvider;import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;import org.apache.flink.table.data.RowData;import org.apache.flink.table.factories.DynamicTableSinkFactory;import org.apache.flink.table.factories.FactoryUtil;import org.apache.flink.table.types.DataType;import javax.annotation.Nullable;import java.util.*;import java.util.concurrent.ThreadLocalRandom;import static flink.stream.connector.print.PrintConnectorOptions.PRINT_RATE;import static org.apache.flink.connector.print.table.PrintConnectorOptions.PRINT_IDENTIFIER;import static org.apache.flink.connector.print.table.PrintConnectorOptions.STANDARD_ERROR;/** * Print table sink factory writing every row to the standard output or standard error stream. It is * designed for: - easy test for streaming job. - very useful in production debugging. * * <p>Four possible format options: {@code PRINT_IDENTIFIER}:taskId> output <- {@code * PRINT_IDENTIFIER} provided, parallelism > 1 {@code PRINT_IDENTIFIER}> output <- {@code * PRINT_IDENTIFIER} provided, parallelism == 1 taskId> output <- no {@code PRINT_IDENTIFIER} * provided, parallelism > 1 output <- no {@code PRINT_IDENTIFIER} provided, parallelism == 1 * * <p>output string format is "$RowKind[f0, f1, f2, ...]", example is: "+I[1, 1]". */@Internalpublic class PrintRateTableSinkFactory implements DynamicTableSinkFactory {        // 简略批改    public static final String IDENTIFIER = "print-rate";    @Override    public String factoryIdentifier() {        return IDENTIFIER;    }    @Override    public Set<ConfigOption<?>> requiredOptions() {        return new HashSet<>();    }    @Override    public Set<ConfigOption<?>> optionalOptions() {        Set<ConfigOption<?>> options = new HashSet<>();        options.add(PRINT_IDENTIFIER);        options.add(STANDARD_ERROR);        options.add(FactoryUtil.SINK_PARALLELISM);          // 增加到 options        options.add(PRINT_RATE);        return options;    }    @Override    public DynamicTableSink createDynamicTableSink(Context context) {        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);        helper.validate();        ReadableConfig options = helper.getOptions();        return new PrintSink(                context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(),                context.getCatalogTable().getPartitionKeys(),                options.get(PRINT_IDENTIFIER),                options.get(STANDARD_ERROR),                options.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null),                options.get(PRINT_RATE));    }    private static class PrintSink implements DynamicTableSink, SupportsPartitioning {        private final DataType type;        private String printIdentifier;        private final boolean stdErr;        private final @Nullable Integer parallelism;        private final List<String> partitionKeys;        private Map<String, String> staticPartitions = new LinkedHashMap<>();        private @Nullable Float printRate;        private PrintSink(                DataType type,                List<String> partitionKeys,                String printIdentifier,                boolean stdErr,                Integer parallelism,                Float printRate) {            this.type = type;            this.partitionKeys = partitionKeys;            this.printIdentifier = printIdentifier;            this.stdErr = stdErr;            this.parallelism = parallelism;            this.printRate = printRate;        }        @Override        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {            return requestedMode;        }        @Override        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {            DataStructureConverter converter = context.createDataStructureConverter(type);            staticPartitions.forEach(                    (key, value) -> {                        printIdentifier = null != printIdentifier ? printIdentifier + ":" : "";                        printIdentifier += key + "=" + value;                    });            return SinkFunctionProvider.of(                    new RowDataPrintFunction(converter, printIdentifier, stdErr, printRate), parallelism);        }        @Override        public DynamicTableSink copy() {            return new PrintSink(type, partitionKeys, printIdentifier, stdErr, parallelism, printRate);        }        @Override        public String asSummaryString() {            return "Print to " + (stdErr ? "System.err" : "System.out");        }        @Override        public void applyStaticPartition(Map<String, String> partition) {            // make it a LinkedHashMap to maintain partition column order            staticPartitions = new LinkedHashMap<>();            for (String partitionCol : partitionKeys) {                if (partition.containsKey(partitionCol)) {                    staticPartitions.put(partitionCol, partition.get(partitionCol));                }            }        }    }    /**     * Implementation of the SinkFunction converting {@link RowData} to string and passing to {@link     * PrintSinkFunction}.     */    private static class RowDataPrintFunction extends RichSinkFunction<RowData> {        private static final long serialVersionUID = 1L;        private final DataStructureConverter converter;        private final PrintSinkOutputWriter<String> writer;        private final Float printRate;        private RowDataPrintFunction(                DataStructureConverter converter, String printIdentifier, boolean stdErr, Float printRate) {            this.converter = converter;            this.writer = new PrintSinkOutputWriter<>(printIdentifier, stdErr);            this.printRate = printRate;        }        @Override        public void open(Configuration parameters) throws Exception {            super.open(parameters);            StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();            writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());        }        @Override        public void invoke(RowData value, Context context) {            if (ThreadLocalRandom.current().nextFloat() < this.printRate) {                Object data = converter.toExternal(value);                assert data != null;                writer.write(data.toString());            }        }    }}

PrintConnectorOptions

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package flink.stream.connector.print;import org.apache.flink.annotation.PublicEvolving;import org.apache.flink.configuration.ConfigOption;import static org.apache.flink.configuration.ConfigOptions.key;/** Options for the Print sink connector. */@PublicEvolvingpublic class PrintConnectorOptions {    public static final ConfigOption<String> PRINT_IDENTIFIER =            key("print-identifier")                    .stringType()                    .noDefaultValue()                    .withDescription(                            "Message that identify print and is prefixed to the output of the value.");    public static final ConfigOption<Boolean> STANDARD_ERROR =            key("standard-error")                    .booleanType()                    .defaultValue(false)                    .withDescription(                            "True, if the format should print to standard error instead of standard out.");    public static final ConfigOption<Float> PRINT_RATE =            key("print-rate")                    .floatType()                    .defaultValue(0.0001F)                    .withDescription(                            "Controls the printing rate of data");    private PrintConnectorOptions() {}}

首先在 PrintConnectorOptions 配置外面增加 PRINT_RATE 属性,用来管制随机取样,默认值是 0.0001.

而后在 PrintRateTableSinkFactory 中把 connector 的惟一标识符 IDENTIFIER 改成 print-rate,其实不改也是能够的,只是为了和默认的 Print 做辨别.

在 PrintRateTableSinkFactory#optionalOptions 办法外面退出咱们增加的属性 PRINT_RATE.

@Override    public Set<ConfigOption<?>> optionalOptions() {        Set<ConfigOption<?>> options = new HashSet<>();        options.add(PRINT_IDENTIFIER);        options.add(STANDARD_ERROR);        options.add(FactoryUtil.SINK_PARALLELISM);        options.add(PRINT_RATE);        return options;    }

而后把这个参数传入到上面的 PrintSink 最初传入到 RowDataPrintFunction 外面,最终在 invoke 办法外面增加随机取样的逻辑.

@Override        public void invoke(RowData value, Context context) {            if (ThreadLocalRandom.current().nextFloat() < this.printRate) {                Object data = converter.toExternal(value);                assert data != null;                writer.write(data.toString());            }        }

到这里代码就批改完了,非常简单,一共不到 10 行代码.

最初还要把 PrintRateTableSinkFactory 增加到 META-INF/services 下的配置文件中,因为 Flink 是用 Java SPI 机制加载这些 connector 的.

最初来测试一下批改后的 connector,先把打完的 jar 包上传到服务器的 flink/lib 目录上面.创立初始化脚本和 SQL 文件.

init.sql

SET 'parallelism.default' = '8';SET 'taskmanager.memory.network.fraction' = '0.01';SET 'pipeline.name' = 'test-print-rate';SET 'sql-client.verbose' = 'true';

test_print_rate.sql

CREATE TABLE kafka_table (name string,age int,city string,ts BIGINT,proctime as PROCTIME(),rt as TO_TIMESTAMP_LTZ(ts, 3),WATERMARK FOR rt AS rt - INTERVAL '5' SECOND)WITH (    'connector' = 'kafka',    'topic' = 'test',    'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',      'properties.group.id' = 'jason_flink_test',     'scan.startup.mode' = 'latest-offset',     'format' = 'json',     'json.fail-on-missing-field' = 'false',    'json.ignore-parse-errors' = 'false' );CREATE TABLE print_table(f1 TIMESTAMP(3),f2 TIMESTAMP(3),f3 BIGINT,f4 STRING)WITH ('connector' = 'print-rate','standard-error' = 'false','print-rate' = '0.01','sink.parallelism' = '4');insert into print_tableselect  window_start,  window_end,  count(name),  namefrom table(HOP(table kafka_table,descriptor(proctime),interval '30' second, interval '1' HOUR))group by window_start,  window_end,  name;

这里用的是下面革新的 print-rate connector,能够通过 'print-rate' = 'xxx' 来管制随机取样.

提交工作

sql-client.sh -i init.sql -f test_print_rate.sql

工作提交胜利后,先向 kafka 里写入数据,而后到 TM 的 Stdout 外面看下打印的数据.

能够看到数据的确做了随机取样,因为如果用默认的 Print Connector 的话,每条数据都会打印进去,因为 key 都是不一样的.这样打印的数据就会缩小很多,当上游数据量十分大时,也不会造成什么问题.