乐趣区

关于java:Fllink实时计算运用七Flink-自定义序列化Protobuf接入实现方案

1. 自定义序列化接入计划(Protobuf)

在理论利用场景中,会存在各种简单传输对象,同时要求较高的传输解决性能,这就须要采纳自定义的序列化形式做相应实现,这里以 Protobuf 为例做解说。

性能:kafka 对同一 Topic 的生产与生产,采纳 Protobuf 做序列化与反序列化传输,验证是否失常解析数据。

  1. 通过 protobuf 脚本生成 JAVA 文件

    syntax = "proto3";
    option java_package = "com.itcast.flink.connectors.kafka.proto";
    option java_outer_classname = "AccessLogProto";
    
    // 音讯构造定义
    message AccessLog {
    
        string ip = 1;
    
        string time = 2;
    
        string type = 3;
    
        string api = 4;
    
        string num = 5;
    }
    

通过批处理脚本,生成 JAVA 文件:

@echo off
for %%i in (proto/*.proto) do (
  d:/TestCode/protoc.exe --proto_path=./proto  --java_out=../java  ./proto/%%i
  echo generate %%i to java file successfully!
)

留神,门路要配置正确。

  1. 自定义序列化实现

    增加 POM 依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.1.8.RELEASE</version>
        </dependency>
    </dependencies>
    

AccessLog 对象:

@Data
public class AccessLog implements Serializable {

    private String ip;

    private String time;

    private String type;

    private String api;

    private Integer num;
}

CustomSerialSchema:

/**
 * 自定义序列化实现(Protobuf)*/
public class CustomSerialSchema implements DeserializationSchema<AccessLog>, SerializationSchema<AccessLog> {

    private static final long serialVersionUID = 1L;

    private transient Charset charset;

    public CustomSerialSchema() {this(StandardCharsets.UTF_8);
    }

    public CustomSerialSchema(Charset charset) {this.charset = checkNotNull(charset);
    }

    public Charset getCharset() {return charset;}
  
    /**
     * 反序列化实现
     * @param message
     * @return
     */
    @Override
    public AccessLog deserialize(byte[] message) {
        AccessLog accessLog = null;
        try {AccessLogProto.AccessLog accessLogProto = AccessLogProto.AccessLog.parseFrom(message);
            accessLog = new AccessLog();
            BeanUtils.copyProperties(accessLogProto, accessLog);
            return accessLog;
        } catch (Exception e) {e.printStackTrace();
        }
        return accessLog;
    }

    @Override
    public boolean isEndOfStream(AccessLog nextElement) {return false;}

    /**
     * 序列化解决
     * @param element
     * @return
     */
    @Override
    public byte[] serialize(AccessLog element) {AccessLogProto.AccessLog.Builder builder = AccessLogProto.AccessLog.newBuilder();
        BeanUtils.copyProperties(element, builder);
        return builder.build().toByteArray();
    }

    /**
     * 定义音讯类型
     * @return
     */
    @Override
    public TypeInformation<AccessLog> getProducedType() {return TypeInformation.of(AccessLog.class);
    }
}
  1. 通过 flink 对 kafka 音讯生产者的实现

    public class KafkaSinkApplication {public static void main(String[] args) throws Exception {
    
            // 1. 创立运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 2. 读取 Socket 数据源
            DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n");
            // 3. 转换解决流数据
            SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() {
                @Override
                public AccessLog map(String value) throws Exception {System.out.println(value);
                    // 依据分隔符解析数据
                    String[] arrValue = value.split("\t");
                    // 将数据组装为对象
                    AccessLog log = new AccessLog();
                    log.setNum(1);
                    for(int i=0; i<arrValue.length; i++) {if(i == 0) {log.setIp(arrValue[i]);
                        }else if(i== 1) {log.setTime(arrValue[i]);
                        }else if(i== 2) {log.setType(arrValue[i]);
                        }else if(i== 3) {log.setApi(arrValue[i]);
                        }
                    }
    
                    return log;
                }
            });
    
            // 3. Kakfa 的生产者配置
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.10.20.132:9092");
            FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
                    "10.10.20.132:9092",            // broker 列表
                    "flink-serial",                  // 指标 topic
                    new CustomSerialSchema()                 // 序列化 形式);   
    
            // 4. 增加 kafka 的写入器
            outputStream.addSink(kafkaProducer);
    
            socketStr.print().setParallelism(1);
    
            // 5. 执行工作
            env.execute("job");
        }
    
    }

开启 Kafka 消费者命令行终端,验证生产者的可用性:

[root@flink1 kafka_2.12-1.1.1]# bin/kafka-console-consumer.sh --bootstrap-server  10.10.20.132:9092  --topic flink-serial    
1601649380422GET"
getAccount
1601649381422POSTaddOrder
1601649382422POST"
  1. 通过 flink 对 kafka 音讯订阅者的实现

    public class KafkaSourceApplication {public static void main(String[] args) throws Exception {
    
            // 1. 创立运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2. 设置 kafka 服务连贯信息
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.10.20.132:9092");
            properties.setProperty("group.id", "fink_group");
    
            // 3. 创立 Kafka 生产端
            FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer(
                    "flink-serial",                  // 指标 topic
                    new CustomSerialSchema(),   // 自定义序列化
                    properties);
    
            // 4. 读取 Kafka 数据源
            DataStreamSource<AccessLog> socketStr = env.addSource(kafkaProducer);
    
            socketStr.print().setParallelism(1);
    
            // 5. 执行工作
            env.execute("job");
        }
    
    }

通过 flink 的 kafka 生产者音讯的发送,对消费者的性能做测试验证。


本文由 mirson 创作分享,如需进一步交换,请加 QQ 群:19310171 或拜访 www.softart.cn

退出移动版