1. 自定义序列化接入计划(Protobuf)

在理论利用场景中, 会存在各种简单传输对象,同时要求较高的传输解决性能, 这就须要采纳自定义的序列化形式做相应实现, 这里以Protobuf为例做解说。

性能: kafka对同一Topic的生产与生产,采纳Protobuf做序列化与反序列化传输, 验证是否失常解析数据。

  1. 通过protobuf脚本生成JAVA文件

    syntax = "proto3";option java_package = "com.itcast.flink.connectors.kafka.proto";option java_outer_classname = "AccessLogProto";// 音讯构造定义message AccessLog {    string ip = 1;    string time = 2;    string type = 3;    string api = 4;    string num = 5;}

通过批处理脚本,生成JAVA文件:

@echo offfor %%i in (proto/*.proto) do (  d:/TestCode/protoc.exe --proto_path=./proto  --java_out=../java  ./proto/%%i  echo generate %%i to java file successfully!)

留神, 门路要配置正确。

  1. 自定义序列化实现

    增加POM依赖:

    <dependencies>    <dependency>        <groupId>org.apache.flink</groupId>        <artifactId>flink-connector-kafka_2.11</artifactId>        <version>1.11.2</version>    </dependency>    <dependency>        <groupId>com.google.protobuf</groupId>        <artifactId>protobuf-java</artifactId>        <version>3.8.0</version>    </dependency>    <dependency>        <groupId>org.springframework</groupId>        <artifactId>spring-beans</artifactId>        <version>5.1.8.RELEASE</version>    </dependency></dependencies>

AccessLog对象:

@Datapublic class AccessLog implements Serializable {    private String ip;    private String time;    private String type;    private String api;    private Integer num;}

CustomSerialSchema:

/** * 自定义序列化实现(Protobuf) */public class CustomSerialSchema implements DeserializationSchema<AccessLog>, SerializationSchema<AccessLog> {    private static final long serialVersionUID = 1L;    private transient Charset charset;    public CustomSerialSchema() {        this(StandardCharsets.UTF_8);    }    public CustomSerialSchema(Charset charset) {        this.charset = checkNotNull(charset);    }    public Charset getCharset() {        return charset;    }      /**     * 反序列化实现     * @param message     * @return     */    @Override    public AccessLog deserialize(byte[] message) {        AccessLog accessLog = null;        try {            AccessLogProto.AccessLog accessLogProto = AccessLogProto.AccessLog.parseFrom(message);            accessLog = new AccessLog();            BeanUtils.copyProperties(accessLogProto, accessLog);            return accessLog;        } catch (Exception e) {            e.printStackTrace();        }        return accessLog;    }    @Override    public boolean isEndOfStream(AccessLog nextElement) {        return false;    }    /**     * 序列化解决     * @param element     * @return     */    @Override    public byte[] serialize(AccessLog element) {        AccessLogProto.AccessLog.Builder builder = AccessLogProto.AccessLog.newBuilder();        BeanUtils.copyProperties(element, builder);        return builder.build().toByteArray();    }    /**     * 定义音讯类型     * @return     */    @Override    public TypeInformation<AccessLog> getProducedType() {        return TypeInformation.of(AccessLog.class);    }}
  1. 通过flink对kafka音讯生产者的实现

    public class KafkaSinkApplication {    public static void main(String[] args) throws Exception {        // 1. 创立运行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 2. 读取Socket数据源        DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n");        // 3. 转换解决流数据        SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() {            @Override            public AccessLog map(String value) throws Exception {                System.out.println(value);                // 依据分隔符解析数据                String[] arrValue = value.split("\t");                // 将数据组装为对象                AccessLog log = new AccessLog();                log.setNum(1);                for(int i=0; i<arrValue.length; i++) {                    if(i == 0) {                        log.setIp(arrValue[i]);                    }else if( i== 1) {                        log.setTime(arrValue[i]);                    }else if( i== 2) {                        log.setType(arrValue[i]);                    }else if( i== 3) {                        log.setApi(arrValue[i]);                    }                }                return log;            }        });        // 3. Kakfa的生产者配置        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "10.10.20.132:9092");        FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(                "10.10.20.132:9092",            // broker 列表                "flink-serial",                  // 指标 topic                new CustomSerialSchema()                 // 序列化 形式                );           // 4. 增加kafka的写入器        outputStream.addSink(kafkaProducer);        socketStr.print().setParallelism(1);        // 5. 执行工作        env.execute("job");    }}

开启Kafka消费者命令行终端,验证生产者的可用性:

[root@flink1 kafka_2.12-1.1.1]# bin/kafka-console-consumer.sh --bootstrap-server  10.10.20.132:9092  --topic flink-serial    1601649380422GET"getAccount1601649381422POSTaddOrder1601649382422POST"
  1. 通过flink对kafka音讯订阅者的实现

    public class KafkaSourceApplication {    public static void main(String[] args) throws Exception {        // 1. 创立运行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 2. 设置kafka服务连贯信息        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "10.10.20.132:9092");        properties.setProperty("group.id", "fink_group");        // 3. 创立Kafka生产端        FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer(                "flink-serial",                  // 指标 topic                new CustomSerialSchema(),   // 自定义序列化                properties);        // 4. 读取Kafka数据源        DataStreamSource<AccessLog> socketStr = env.addSource(kafkaProducer);        socketStr.print().setParallelism(1);        // 5. 执行工作        env.execute("job");    }}

通过flink的kafka生产者音讯的发送, 对消费者的性能做测试验证。


本文由mirson创作分享,如需进一步交换,请加QQ群:19310171或拜访www.softart.cn