关于云计算:将CSV的数据发送到kafkajava版

43次阅读

共计 8306 个字符,预计需要花费 21 分钟才能阅读完成。

欢送拜访我的 GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;

为什么将 CSV 的数据发到 kafka

  1. flink 做流式计算时,选用 kafka 音讯作为数据源是罕用伎俩,因而在学习和开发 flink 过程中,也会将数据集文件中的记录发送到 kafka,来模仿不间断数据;
  2. 整个流程如下:

  1. 您可能会感觉这样做多此一举:flink 间接读取 CSV 不就行了吗?这样做的起因如下:
  2. 首先,这是学习和开发时的做法,数据集是 CSV 文件,而生产环境的实时数据却是 kafka 数据源;
  3. 其次,Java 利用中能够退出一些非凡逻辑,例如数据处理,汇总统计(用来和 flink 后果比照验证);
  4. 另外,如果两条记录理论的间隔时间如果是 1 分钟,那么 Java 利用在发送音讯时也能够距离一分钟再发送,这个逻辑在 flink 社区的 demo 中有具体的实现,此 demo 也是将数据集发送到 kafka,再由 flink 生产 kafka,地址是:https://github.com/ververica/…

如何将 CSV 的数据发送到 kafka

后面的图能够看出,读取 CSV 再发送音讯到 kafka 的操作是 Java 利用所为,因而明天的次要工作就是开发这个 Java 利用,并验证;

版本信息

  1. JDK:1.8.0_181
  2. 开发工具:IntelliJ IDEA 2019.2.1 (Ultimate Edition)
  3. 开发环境:Win10
  4. Zookeeper:3.4.13
  5. Kafka:2.4.0(scala:2.12)

对于数据集

  1. 本次实战用到的数据集是 CSV 文件,外面是一百零四万条淘宝用户行为数据,该数据起源是阿里云天池公开数据集,我对此数据做了大量调整;
  2. 此 CSV 文件能够在 CSDN 下载,地址:https://download.csdn.net/dow…
  3. 也能够在我的 Github 下载,地址:https://raw.githubusercontent…
  4. 该 CSV 文件的内容,一共有六列,每列的含意如下表:
列名称 阐明
用户 ID 整数类型,序列化后的用户 ID
商品 ID 整数类型,序列化后的商品 ID
商品类目 ID 整数类型,序列化后的商品所属类目 ID
行为类型 字符串,枚举类型,包含 (‘pv’, ‘buy’, ‘cart’, ‘fav’)
工夫戳 行为产生的工夫戳
工夫字符串 依据工夫戳字段生成的工夫字符串
  1. 对于该数据集的详情,请参考《筹备数据集用于 flink 学习》

Java 利用简介

编码前,先把具体内容列出来,而后再挨个实现:

  1. 从 CSV 读取记录的工具类:UserBehaviorCsvFileReader
  2. 每条记录对应的 Bean 类:UserBehavior
  3. Java 对象序列化成 JSON 的序列化类:JsonSerializer
  4. 向 kafka 发送音讯的工具类:KafkaProducer
  5. 利用类,程序入口:SendMessageApplication

上述五个类即可实现 Java 利用的工作,接下来开始编码吧;

间接下载源码

  1. 如果您不想写代码,您能够间接从 GitHub 下载这个工程的源码,地址和链接信息如下表所示:
名称 链接 备注
我的项目主页 https://github.com/zq2599/blo… 该我的项目在 GitHub 上的主页
git 仓库地址 (https) https://github.com/zq2599/blo… 该我的项目源码的仓库地址,https 协定
git 仓库地址 (ssh) git@github.com:zq2599/blog_demos.git 该我的项目源码的仓库地址,ssh 协定
  1. 这个 git 我的项目中有多个文件夹,本章源码在 flinksql 这个文件夹下,如下图红框所示:

编码

  1. 创立 maven 工程,pom.xml 如下,比拟重要的 jackson 和 javacsv 的依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bolingcavalry</groupId>
    <artifactId>flinksql</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.10.0</flink.version>
        <kafka.version>2.2.0</kafka.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.10.1</version>
        </dependency>

        <!-- Logging dependencies -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>net.sourceforge.javacsv</groupId>
            <artifactId>javacsv</artifactId>
            <version>2.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- Shade plugin to include all dependencies -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. 从 CSV 读取记录的工具类:<font color=”blue”>UserBehaviorCsvFileReader</font>,前面在主程序中会用到 java8 的 Steam API 来解决汇合,所以 UserBehaviorCsvFileReader 实现了 Supplier 接口:
public class UserBehaviorCsvFileReader implements Supplier<UserBehavior> {

    private final String filePath;
    private CsvReader csvReader;

    public UserBehaviorCsvFileReader(String filePath) throws IOException {

        this.filePath = filePath;
        try {csvReader = new CsvReader(filePath);
            csvReader.readHeaders();} catch (IOException e) {throw new IOException("Error reading TaxiRecords from file:" + filePath, e);
        }
    }

    @Override
    public UserBehavior get() {
        UserBehavior userBehavior = null;
        try{if(csvReader.readRecord()) {csvReader.getRawRecord();
                userBehavior = new UserBehavior(Long.valueOf(csvReader.get(0)),
                        Long.valueOf(csvReader.get(1)),
                        Long.valueOf(csvReader.get(2)),
                        csvReader.get(3),
                        new Date(Long.valueOf(csvReader.get(4))*1000L));
            }
        } catch (IOException e) {throw new NoSuchElementException("IOException from" + filePath);
        }

        if (null==userBehavior) {throw new NoSuchElementException("All records read from" + filePath);
        }

        return userBehavior;
    }
}
  1. 每条记录对应的 Bean 类:<font color=”blue”>UserBehavior</font>,和 CSV 记录格局保持一致即可,示意工夫的 <font color=”blue”>ts</font> 字段,应用了 JsonFormat 注解,在序列化的时候以此来管制格局:
public class UserBehavior {

    @JsonFormat
    private long user_id;

    @JsonFormat
    private long item_id;

    @JsonFormat
    private long category_id;

    @JsonFormat
    private String behavior;

    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'")
    private Date ts;

    public UserBehavior() {}

    public UserBehavior(long user_id, long item_id, long category_id, String behavior, Date ts) {
        this.user_id = user_id;
        this.item_id = item_id;
        this.category_id = category_id;
        this.behavior = behavior;
        this.ts = ts;
    }
}
  1. Java 对象序列化成 JSON 的序列化类:JsonSerializer
public class JsonSerializer<T> {private final ObjectMapper jsonMapper = new ObjectMapper();

    public String toJSONString(T r) {
        try {return jsonMapper.writeValueAsString(r);
        } catch (JsonProcessingException e) {throw new IllegalArgumentException("Could not serialize record:" + r, e);
        }
    }

    public byte[] toJSONBytes(T r) {
        try {return jsonMapper.writeValueAsBytes(r);
        } catch (JsonProcessingException e) {throw new IllegalArgumentException("Could not serialize record:" + r, e);
        }
    }
}
  1. 向 kafka 发送音讯的工具类:<font color=”blue”>KafkaProducer</font>:
public class KafkaProducer implements Consumer<UserBehavior> {

    private final String topic;
    private final org.apache.kafka.clients.producer.KafkaProducer<byte[], byte[]> producer;
    private final JsonSerializer<UserBehavior> serializer;

    public KafkaProducer(String kafkaTopic, String kafkaBrokers) {
        this.topic = kafkaTopic;
        this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(createKafkaProperties(kafkaBrokers));
        this.serializer = new JsonSerializer<>();}

    @Override
    public void accept(UserBehavior record) {
        // 将对象序列化成 byte 数组
        byte[] data = serializer.toJSONBytes(record);
        // 封装
        ProducerRecord<byte[], byte[]> kafkaRecord = new ProducerRecord<>(topic, data);
        // 发送
        producer.send(kafkaRecord);

        // 通过 sleep 管制音讯的速度,请根据本身 kafka 配置以及 flink 服务器配置来调整
        try {Thread.sleep(500);
        }catch(InterruptedException e){e.printStackTrace();
        }
    }

    /**
     * kafka 配置
     * @param brokers The brokers to connect to.
     * @return A Kafka producer configuration.
     */
    private static Properties createKafkaProperties(String brokers) {Properties kafkaProps = new Properties();
        kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
        return kafkaProps;
    }
}
  1. 最初是利用类 SendMessageApplication,CSV 文件门路、kafka 的 topic 和 borker 地址都在此设置,另外借助 java8 的 Stream API,只需大量代码即可实现所有工作:
public class SendMessageApplication {public static void main(String[] args) throws Exception {
        // 文件地址
        String filePath = "D:\\temp\\202005\\02\\UserBehavior.csv";
        // kafka topic
        String topic = "user_behavior";
        // kafka borker 地址
        String broker = "192.168.50.43:9092";

        Stream.generate(new UserBehaviorCsvFileReader(filePath))
                .sequential()
                .forEachOrdered(new KafkaProducer(topic, broker));
    }
}

验证

  1. 请确保 kafka 曾经就绪,并且名为 <font color=”blue”>user_behavior</font> 的 topic 曾经创立;
  2. 请将 CSV 文件筹备好;
  3. 确认 SendMessageApplication.java 中的文件地址、kafka topic、kafka broker 三个参数准确无误;
  4. 运行 SendMessageApplication.java;
  5. 开启一个 控制台音讯 kafka 音讯,参考命令如下:
./kafka-console-consumer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic user_behavior \
--consumer-property group.id=old-consumer-test \
--consumer-property consumer.id=old-consumer-cl \
--from-beginning
  1. 失常状况下能够立刻见到音讯,如下图:


至此,通过 Java 利用模仿用户行为音讯流的操作就实现了,接下来的 flink 实战就用这个作为数据源;

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos

正文完
 0