1. 自定义序列化接入计划(Protobuf)
在理论利用场景中,会存在各种简单传输对象,同时要求较高的传输解决性能,这就须要采纳自定义的序列化形式做相应实现,这里以 Protobuf 为例做解说。
性能:kafka 对同一 Topic 的生产与生产,采纳 Protobuf 做序列化与反序列化传输,验证是否失常解析数据。
-
通过 protobuf 脚本生成 JAVA 文件
syntax = "proto3"; option java_package = "com.itcast.flink.connectors.kafka.proto"; option java_outer_classname = "AccessLogProto"; // 音讯构造定义 message AccessLog { string ip = 1; string time = 2; string type = 3; string api = 4; string num = 5; }
通过批处理脚本,生成 JAVA 文件:
@echo off
for %%i in (proto/*.proto) do (
d:/TestCode/protoc.exe --proto_path=./proto --java_out=../java ./proto/%%i
echo generate %%i to java file successfully!
)
留神,门路要配置正确。
-
自定义序列化实现
增加 POM 依赖:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.11.2</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.8.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.1.8.RELEASE</version> </dependency> </dependencies>
AccessLog 对象:
@Data
public class AccessLog implements Serializable {
private String ip;
private String time;
private String type;
private String api;
private Integer num;
}
CustomSerialSchema:
/**
* 自定义序列化实现(Protobuf)*/
public class CustomSerialSchema implements DeserializationSchema<AccessLog>, SerializationSchema<AccessLog> {
private static final long serialVersionUID = 1L;
private transient Charset charset;
public CustomSerialSchema() {this(StandardCharsets.UTF_8);
}
public CustomSerialSchema(Charset charset) {this.charset = checkNotNull(charset);
}
public Charset getCharset() {return charset;}
/**
* 反序列化实现
* @param message
* @return
*/
@Override
public AccessLog deserialize(byte[] message) {
AccessLog accessLog = null;
try {AccessLogProto.AccessLog accessLogProto = AccessLogProto.AccessLog.parseFrom(message);
accessLog = new AccessLog();
BeanUtils.copyProperties(accessLogProto, accessLog);
return accessLog;
} catch (Exception e) {e.printStackTrace();
}
return accessLog;
}
@Override
public boolean isEndOfStream(AccessLog nextElement) {return false;}
/**
* 序列化解决
* @param element
* @return
*/
@Override
public byte[] serialize(AccessLog element) {AccessLogProto.AccessLog.Builder builder = AccessLogProto.AccessLog.newBuilder();
BeanUtils.copyProperties(element, builder);
return builder.build().toByteArray();
}
/**
* 定义音讯类型
* @return
*/
@Override
public TypeInformation<AccessLog> getProducedType() {return TypeInformation.of(AccessLog.class);
}
}
-
通过 flink 对 kafka 音讯生产者的实现
public class KafkaSinkApplication {public static void main(String[] args) throws Exception { // 1. 创立运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取 Socket 数据源 DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); // 3. 转换解决流数据 SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() { @Override public AccessLog map(String value) throws Exception {System.out.println(value); // 依据分隔符解析数据 String[] arrValue = value.split("\t"); // 将数据组装为对象 AccessLog log = new AccessLog(); log.setNum(1); for(int i=0; i<arrValue.length; i++) {if(i == 0) {log.setIp(arrValue[i]); }else if(i== 1) {log.setTime(arrValue[i]); }else if(i== 2) {log.setType(arrValue[i]); }else if(i== 3) {log.setApi(arrValue[i]); } } return log; } }); // 3. Kakfa 的生产者配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( "10.10.20.132:9092", // broker 列表 "flink-serial", // 指标 topic new CustomSerialSchema() // 序列化 形式); // 4. 增加 kafka 的写入器 outputStream.addSink(kafkaProducer); socketStr.print().setParallelism(1); // 5. 执行工作 env.execute("job"); } }
开启 Kafka 消费者命令行终端,验证生产者的可用性:
[root@flink1 kafka_2.12-1.1.1]# bin/kafka-console-consumer.sh --bootstrap-server 10.10.20.132:9092 --topic flink-serial
1601649380422GET"
getAccount
1601649381422POSTaddOrder
1601649382422POST"
-
通过 flink 对 kafka 音讯订阅者的实现
public class KafkaSourceApplication {public static void main(String[] args) throws Exception { // 1. 创立运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 设置 kafka 服务连贯信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); properties.setProperty("group.id", "fink_group"); // 3. 创立 Kafka 生产端 FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer( "flink-serial", // 指标 topic new CustomSerialSchema(), // 自定义序列化 properties); // 4. 读取 Kafka 数据源 DataStreamSource<AccessLog> socketStr = env.addSource(kafkaProducer); socketStr.print().setParallelism(1); // 5. 执行工作 env.execute("job"); } }
通过 flink 的 kafka 生产者音讯的发送,对消费者的性能做测试验证。
本文由 mirson 创作分享,如需进一步交换,请加 QQ 群:19310171 或拜访 www.softart.cn