关于云计算:Flink的DataSource三部曲之二内置connector

5次阅读

共计 6382 个字符,预计需要花费 16 分钟才能阅读完成。

欢送拜访我的 GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;

本篇概览

本文是《Flink 的 DataSource 三部曲》系列的第二篇,上一篇《Flink 的 DataSource 三部曲之一:间接 API》学习了 StreamExecutionEnvironment 的 API 创立 DataSource,明天要练习的是 Flink 内置的 connector,即下图的红框地位,这些 connector 能够通过 StreamExecutionEnvironment 的 addSource 办法应用:


明天的实战抉择 Kafka 作为数据源来操作,先尝试接管和解决 String 型的音讯,再接管 JSON 类型的音讯,将 JSON 反序列化成 bean 实例;

Flink 的 DataSource 三部曲文章链接

  1. 《Flink 的 DataSource 三部曲之一:间接 API》
  2. 《Flink 的 DataSource 三部曲之二: 内置 connector》
  3. 《Flink 的 DataSource 三部曲之三: 自定义》

源码下载

如果您不想写代码,整个系列的源码可在 GitHub 下载到,地址和链接信息如下表所示 (https://github.com/zq2599/blo…:

名称 链接 备注
我的项目主页 https://github.com/zq2599/blo… 该我的项目在 GitHub 上的主页
git 仓库地址 (https) https://github.com/zq2599/blo… 该我的项目源码的仓库地址,https 协定
git 仓库地址 (ssh) git@github.com:zq2599/blog_demos.git 该我的项目源码的仓库地址,ssh 协定

这个 git 我的项目中有多个文件夹,本章的利用在 <font color=”blue”>flinkdatasourcedemo</font> 文件夹下,如下图红框所示:

环境和版本

本次实战的环境和版本如下:

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 操作系统:macOS Catalina 10.15.3(MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)
  6. Kafka:2.4.0
  7. Zookeeper:3.5.5

请确保上述内容都曾经准备就绪,能力持续前面的实战;

Flink 与 Kafka 版本匹配

  1. Flink 官网对匹配 Kafka 版本做了具体阐明,地址是:https://ci.apache.org/project…
  2. 要重点关注的是官网提到的通用版 (universal Kafka connector),这是从 Flink1.7 开始推出的,对于 Kafka1.0.0 或者更高版本都能够应用:

  1. 下图红框中是我的工程中要依赖的库,蓝框中是连贯 Kafka 用到的类,读者您能够依据本人的 Kafka 版本在表格中找到适宜的库和类:

实战字符串音讯解决

  1. 在 kafka 上创立名为 test001 的 topic,参考命令:
./kafka-topics.sh \
--create \
--zookeeper 192.168.50.43:2181 \
--replication-factor 1 \
--partitions 2 \
--topic test001
  1. 持续应用上一章创立的 flinkdatasourcedemo 工程,关上 pom.xml 文件减少以下依赖:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>
  1. 新增类 Kafka240String.java,作用是连贯 broker,对收到的字符串音讯做 WordCount 操作:
package com.bolingcavalry.connector;

import com.bolingcavalry.Splitter;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
import static com.sun.tools.doclint.Entity.para;

public class Kafka240String {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(2);

        Properties properties = new Properties();
        //broker 地址
        properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
        //zookeeper 地址
        properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
        // 消费者的 groupId
        properties.setProperty("group.id", "flink-connector");
        // 实例化 Consumer 类
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
                "test001",
                new SimpleStringSchema(),
                properties
        );
        // 指定从最新地位开始生产,相当于放弃历史音讯
        flinkKafkaConsumer.setStartFromLatest();

        // 通过 addSource 办法失去 DataSource
        DataStream<String> dataStream = env.addSource(flinkKafkaConsumer);

        // 从 kafka 获得字符串音讯后,宰割成单词,统计数量,窗口是 5 秒
        dataStream
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1)
                .print();

        env.execute("Connector DataSource demo : kafka");
    }
}
  1. 确保 kafka 的 topic 曾经创立,将 Kafka240 运行起来,可见生产音讯并进行单词统计的性能是失常的:

  1. 接管 kafka 字符串音讯的实战曾经实现,接下来试试 JSON 格局的音讯;

实战 JSON 音讯解决

  1. 接下来要承受的 JSON 格局音讯,能够被反序列化成 bean 实例,会用到 JSON 库,我抉择的是 gson;
  2. 在 pom.xml 减少 gson 依赖:
<dependency>
  <groupId>com.google.code.gson</groupId>
  <artifactId>gson</artifactId>
  <version>2.8.5</version>
</dependency>
  1. 减少类 Student.java,这是个一般的 Bean,只有 id 和 name 两个字段:
package com.bolingcavalry;

public class Student {

    private int id;

    private String name;

    public int getId() {return id;}

    public void setId(int id) {this.id = id;}

    public String getName() {return name;}

    public void setName(String name) {this.name = name;}
}
  1. 减少类 StudentSchema.java,该类是 DeserializationSchema 接口的实现,将 JSON 反序列化成 Student 实例时用到:
ackage com.bolingcavalry.connector;

import com.bolingcavalry.Student;
import com.google.gson.Gson;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;

public class StudentSchema implements DeserializationSchema<Student>, SerializationSchema<Student> {private static final Gson gson = new Gson();

    /**
     * 反序列化,将 byte 数组转成 Student 实例
     * @param bytes
     * @return
     * @throws IOException
     */
    @Override
    public Student deserialize(byte[] bytes) throws IOException {return gson.fromJson(new String(bytes), Student.class);
    }

    @Override
    public boolean isEndOfStream(Student student) {return false;}

    /**
     * 序列化,将 Student 实例转成 byte 数组
     * @param student
     * @return
     */
    @Override
    public byte[] serialize(Student student) {return new byte[0];
    }

    @Override
    public TypeInformation<Student> getProducedType() {return TypeInformation.of(Student.class);
    }
}
  1. 新增类 Kafka240Bean.java,作用是连贯 broker,对收到的 JSON 音讯转成 Student 实例,统计每个名字呈现的数量,窗口仍旧是 5 秒:
package com.bolingcavalry.connector;

import com.bolingcavalry.Splitter;
import com.bolingcavalry.Student;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class Kafka240Bean {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(2);

        Properties properties = new Properties();
        //broker 地址
        properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
        //zookeeper 地址
        properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
        // 消费者的 groupId
        properties.setProperty("group.id", "flink-connector");
        // 实例化 Consumer 类
        FlinkKafkaConsumer<Student> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
                "test001",
                new StudentSchema(),
                properties
        );
        // 指定从最新地位开始生产,相当于放弃历史音讯
        flinkKafkaConsumer.setStartFromLatest();

        // 通过 addSource 办法失去 DataSource
        DataStream<Student> dataStream = env.addSource(flinkKafkaConsumer);

        // 从 kafka 获得的 JSON 被反序列化成 Student 实例,统计每个 name 的数量,窗口是 5 秒
        dataStream.map(new MapFunction<Student, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(Student student) throws Exception {return new Tuple2<>(student.getName(), 1);
            }
        })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1)
                .print();

        env.execute("Connector DataSource demo : kafka bean");
    }
}
  1. 在测试的时候,要向 kafka 发送 JSON 格局字符串,flink 这边就会给统计出每个 name 的数量:


至此,内置 connector 的实战就实现了,接下来的章节,咱们将要一起实战自定义 DataSource;

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos

正文完
 0