欢送拜访我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及Java、Docker、Kubernetes、DevOPS等;

本篇概览

本文是《Flink的DataSource三部曲》系列的第二篇,上一篇《Flink的DataSource三部曲之一:间接API》学习了StreamExecutionEnvironment的API创立DataSource,明天要练习的是Flink内置的connector,即下图的红框地位,这些connector能够通过StreamExecutionEnvironment的addSource办法应用:


明天的实战抉择Kafka作为数据源来操作,先尝试接管和解决String型的音讯,再接管JSON类型的音讯,将JSON反序列化成bean实例;

Flink的DataSource三部曲文章链接

  1. 《Flink的DataSource三部曲之一:间接API》
  2. 《Flink的DataSource三部曲之二:内置connector》
  3. 《Flink的DataSource三部曲之三:自定义》

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blo...:

名称链接备注
我的项目主页https://github.com/zq2599/blo...该我的项目在GitHub上的主页
git仓库地址(https)https://github.com/zq2599/blo...该我的项目源码的仓库地址,https协定
git仓库地址(ssh)git@github.com:zq2599/blog_demos.git该我的项目源码的仓库地址,ssh协定

这个git我的项目中有多个文件夹,本章的利用在<font color="blue">flinkdatasourcedemo</font>文件夹下,如下图红框所示:

环境和版本

本次实战的环境和版本如下:

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)
  6. Kafka:2.4.0
  7. Zookeeper:3.5.5

请确保上述内容都曾经准备就绪,能力持续前面的实战;

Flink与Kafka版本匹配

  1. Flink官网对匹配Kafka版本做了具体阐明,地址是:https://ci.apache.org/project...
  2. 要重点关注的是官网提到的通用版(universal Kafka connector ),这是从Flink1.7开始推出的,对于Kafka1.0.0或者更高版本都能够应用:

  1. 下图红框中是我的工程中要依赖的库,蓝框中是连贯Kafka用到的类,读者您能够依据本人的Kafka版本在表格中找到适宜的库和类:

实战字符串音讯解决

  1. 在kafka上创立名为test001的topic,参考命令:
./kafka-topics.sh \--create \--zookeeper 192.168.50.43:2181 \--replication-factor 1 \--partitions 2 \--topic test001
  1. 持续应用上一章创立的flinkdatasourcedemo工程,关上pom.xml文件减少以下依赖:
<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-kafka_2.11</artifactId>  <version>1.10.0</version></dependency>
  1. 新增类Kafka240String.java,作用是连贯broker,对收到的字符串音讯做WordCount操作:
package com.bolingcavalry.connector;import com.bolingcavalry.Splitter;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;import static com.sun.tools.doclint.Entity.para;public class Kafka240String {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //设置并行度        env.setParallelism(2);        Properties properties = new Properties();        //broker地址        properties.setProperty("bootstrap.servers", "192.168.50.43:9092");        //zookeeper地址        properties.setProperty("zookeeper.connect", "192.168.50.43:2181");        //消费者的groupId        properties.setProperty("group.id", "flink-connector");        //实例化Consumer类        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(                "test001",                new SimpleStringSchema(),                properties        );        //指定从最新地位开始生产,相当于放弃历史音讯        flinkKafkaConsumer.setStartFromLatest();        //通过addSource办法失去DataSource        DataStream<String> dataStream = env.addSource(flinkKafkaConsumer);        //从kafka获得字符串音讯后,宰割成单词,统计数量,窗口是5秒        dataStream                .flatMap(new Splitter())                .keyBy(0)                .timeWindow(Time.seconds(5))                .sum(1)                .print();        env.execute("Connector DataSource demo : kafka");    }}
  1. 确保kafka的topic曾经创立,将Kafka240运行起来,可见生产音讯并进行单词统计的性能是失常的:

  1. 接管kafka字符串音讯的实战曾经实现,接下来试试JSON格局的音讯;

实战JSON音讯解决

  1. 接下来要承受的JSON格局音讯,能够被反序列化成bean实例,会用到JSON库,我抉择的是gson;
  2. 在pom.xml减少gson依赖:
<dependency>  <groupId>com.google.code.gson</groupId>  <artifactId>gson</artifactId>  <version>2.8.5</version></dependency>
  1. 减少类Student.java,这是个一般的Bean,只有id和name两个字段:
package com.bolingcavalry;public class Student {    private int id;    private String name;    public int getId() {        return id;    }    public void setId(int id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }}
  1. 减少类StudentSchema.java,该类是DeserializationSchema接口的实现,将JSON反序列化成Student实例时用到:
ackage com.bolingcavalry.connector;import com.bolingcavalry.Student;import com.google.gson.Gson;import org.apache.flink.api.common.serialization.DeserializationSchema;import org.apache.flink.api.common.serialization.SerializationSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import java.io.IOException;public class StudentSchema implements DeserializationSchema<Student>, SerializationSchema<Student> {    private static final Gson gson = new Gson();    /**     * 反序列化,将byte数组转成Student实例     * @param bytes     * @return     * @throws IOException     */    @Override    public Student deserialize(byte[] bytes) throws IOException {        return gson.fromJson(new String(bytes), Student.class);    }    @Override    public boolean isEndOfStream(Student student) {        return false;    }    /**     * 序列化,将Student实例转成byte数组     * @param student     * @return     */    @Override    public byte[] serialize(Student student) {        return new byte[0];    }    @Override    public TypeInformation<Student> getProducedType() {        return TypeInformation.of(Student.class);    }}
  1. 新增类Kafka240Bean.java,作用是连贯broker,对收到的JSON音讯转成Student实例,统计每个名字呈现的数量,窗口仍旧是5秒:
package com.bolingcavalry.connector;import com.bolingcavalry.Splitter;import com.bolingcavalry.Student;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class Kafka240Bean {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //设置并行度        env.setParallelism(2);        Properties properties = new Properties();        //broker地址        properties.setProperty("bootstrap.servers", "192.168.50.43:9092");        //zookeeper地址        properties.setProperty("zookeeper.connect", "192.168.50.43:2181");        //消费者的groupId        properties.setProperty("group.id", "flink-connector");        //实例化Consumer类        FlinkKafkaConsumer<Student> flinkKafkaConsumer = new FlinkKafkaConsumer<>(                "test001",                new StudentSchema(),                properties        );        //指定从最新地位开始生产,相当于放弃历史音讯        flinkKafkaConsumer.setStartFromLatest();        //通过addSource办法失去DataSource        DataStream<Student> dataStream = env.addSource(flinkKafkaConsumer);        //从kafka获得的JSON被反序列化成Student实例,统计每个name的数量,窗口是5秒        dataStream.map(new MapFunction<Student, Tuple2<String, Integer>>() {            @Override            public Tuple2<String, Integer> map(Student student) throws Exception {                return new Tuple2<>(student.getName(), 1);            }        })                .keyBy(0)                .timeWindow(Time.seconds(5))                .sum(1)                .print();        env.execute("Connector DataSource demo : kafka bean");    }}
  1. 在测试的时候,要向kafka发送JSON格局字符串,flink这边就会给统计出每个name的数量:


至此,内置connector的实战就实现了,接下来的章节,咱们将要一起实战自定义DataSource;

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游Java世界...
https://github.com/zq2599/blog_demos