1. 自定义序列化接入计划(Protobuf)
在理论利用场景中, 会存在各种简单传输对象,同时要求较高的传输解决性能, 这就须要采纳自定义的序列化形式做相应实现, 这里以Protobuf为例做解说。
性能: kafka对同一Topic的生产与生产,采纳Protobuf做序列化与反序列化传输, 验证是否失常解析数据。
通过protobuf脚本生成JAVA文件
syntax = "proto3";option java_package = "com.itcast.flink.connectors.kafka.proto";option java_outer_classname = "AccessLogProto";// 音讯构造定义message AccessLog { string ip = 1; string time = 2; string type = 3; string api = 4; string num = 5;}
通过批处理脚本,生成JAVA文件:
@echo offfor %%i in (proto/*.proto) do ( d:/TestCode/protoc.exe --proto_path=./proto --java_out=../java ./proto/%%i echo generate %%i to java file successfully!)
留神, 门路要配置正确。
自定义序列化实现
增加POM依赖:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.11.2</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.8.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.1.8.RELEASE</version> </dependency></dependencies>
AccessLog对象:
@Datapublic class AccessLog implements Serializable { private String ip; private String time; private String type; private String api; private Integer num;}
CustomSerialSchema:
/** * 自定义序列化实现(Protobuf) */public class CustomSerialSchema implements DeserializationSchema<AccessLog>, SerializationSchema<AccessLog> { private static final long serialVersionUID = 1L; private transient Charset charset; public CustomSerialSchema() { this(StandardCharsets.UTF_8); } public CustomSerialSchema(Charset charset) { this.charset = checkNotNull(charset); } public Charset getCharset() { return charset; } /** * 反序列化实现 * @param message * @return */ @Override public AccessLog deserialize(byte[] message) { AccessLog accessLog = null; try { AccessLogProto.AccessLog accessLogProto = AccessLogProto.AccessLog.parseFrom(message); accessLog = new AccessLog(); BeanUtils.copyProperties(accessLogProto, accessLog); return accessLog; } catch (Exception e) { e.printStackTrace(); } return accessLog; } @Override public boolean isEndOfStream(AccessLog nextElement) { return false; } /** * 序列化解决 * @param element * @return */ @Override public byte[] serialize(AccessLog element) { AccessLogProto.AccessLog.Builder builder = AccessLogProto.AccessLog.newBuilder(); BeanUtils.copyProperties(element, builder); return builder.build().toByteArray(); } /** * 定义音讯类型 * @return */ @Override public TypeInformation<AccessLog> getProducedType() { return TypeInformation.of(AccessLog.class); }}
通过flink对kafka音讯生产者的实现
public class KafkaSinkApplication { public static void main(String[] args) throws Exception { // 1. 创立运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取Socket数据源 DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); // 3. 转换解决流数据 SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() { @Override public AccessLog map(String value) throws Exception { System.out.println(value); // 依据分隔符解析数据 String[] arrValue = value.split("\t"); // 将数据组装为对象 AccessLog log = new AccessLog(); log.setNum(1); for(int i=0; i<arrValue.length; i++) { if(i == 0) { log.setIp(arrValue[i]); }else if( i== 1) { log.setTime(arrValue[i]); }else if( i== 2) { log.setType(arrValue[i]); }else if( i== 3) { log.setApi(arrValue[i]); } } return log; } }); // 3. Kakfa的生产者配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( "10.10.20.132:9092", // broker 列表 "flink-serial", // 指标 topic new CustomSerialSchema() // 序列化 形式 ); // 4. 增加kafka的写入器 outputStream.addSink(kafkaProducer); socketStr.print().setParallelism(1); // 5. 执行工作 env.execute("job"); }}
开启Kafka消费者命令行终端,验证生产者的可用性:
[root@flink1 kafka_2.12-1.1.1]# bin/kafka-console-consumer.sh --bootstrap-server 10.10.20.132:9092 --topic flink-serial 1601649380422GET"getAccount1601649381422POSTaddOrder1601649382422POST"
通过flink对kafka音讯订阅者的实现
public class KafkaSourceApplication { public static void main(String[] args) throws Exception { // 1. 创立运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 设置kafka服务连贯信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); properties.setProperty("group.id", "fink_group"); // 3. 创立Kafka生产端 FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer( "flink-serial", // 指标 topic new CustomSerialSchema(), // 自定义序列化 properties); // 4. 读取Kafka数据源 DataStreamSource<AccessLog> socketStr = env.addSource(kafkaProducer); socketStr.print().setParallelism(1); // 5. 执行工作 env.execute("job"); }}
通过flink的kafka生产者音讯的发送, 对消费者的性能做测试验证。
本文由mirson创作分享,如需进一步交换,请加QQ群:19310171或拜访www.softart.cn