关于java:Flink消费kafka数据打包

flink介绍

Flink是一个框架和分布式解决引擎,用于对无限度和有限度的数据留进行有状态的计算。Flink被设计为可在所有常见的集群环境中运行,以内存速度和任何规模执行计算。

解决无界和有界数据
任何类型的数据都是作为事件流产生的。信用卡交易,传感器测量,机器日志或网站挪动应用程序上的用户交互,所有这些数据均作为流生成。

Flink 功能强大,反对开发和运行多种不同品种的应用程序。它的次要个性包含:批流一体化、精细的状态治理、事件工夫反对以及准确一次的状态一致性保障等。在启用高可用选项的状况下,它不存在单点生效问题。事实证明,Flink 曾经能够扩大到数千外围,其状态能够达到 TB 级别,且仍能放弃高吞吐、低提早的个性。世界各地有很多要求严苛的流解决利用都运行在 Flink 之上。
外围点:
1、高吞吐,低提早
2、后果的准确性
3、准确一次的状态一致性保障
4、高可用,反对动静扩大

依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

Flink生产Kafka数据

package com.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

@Slf4j
public class Flink {
    
    public static void main(String[] args) {
        try {
            //TODO 1、初始化flink流解决的运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //TODO 2、创立数据源
            Properties properties = new Properties();
            //封装kafka的连贯地址
            properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
            //指定消费者id
            properties.setProperty("group.id", "consumer-group");
            //设置动静监测分区或者主题的变动
            properties.setProperty("flink.partition-discovery.interval-millis", "30000");
            //配置security.protocol
            properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            //配置sasl.mechanism
//            properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
            //配置sasl.jaas.config
            properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"账号\" password=\"明码\";");
//            生产主题
//            生产多个主题
            List<String> topicList = new ArrayList<>();
            topicList.add("OGG_TEST.T_OGGO_FB");
//            topicList.add("OGG_TEST.T_OGG_FB");
//            topicList.add("OGG_TEST.T_OGGO_EB");
//            topicList.add("OGG_TEST.T_OGG_EB");
//            topicList.add("OGG_TEST.T_OGGO_DB");
//            topicList.add("OGG_TEST.T_OGG_DB");
            FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), properties);


//            KafkaConsumer.setStartFromEarliest():从topic的最早offset地位开始解决数据,如果kafka中保留有消费者组的生产地位将被疏忽。
//            KafkaConsumer.setStartFromLatest():从topic的最新offset地位开始解决数据,如果kafka中保留有消费者组的生产地位将被疏忽。
//            KafkaConsumer.setStartFromTimestamp(…):从指定的工夫戳(毫秒)开始生产数据,Kafka中每个分区中数据大于等于设置的工夫戳的数据地位将被当做开始生产的地位。如果kafka中保留有消费者组的生产地位将被疏忽。
//            KafkaConsumer.setStartFromGroupOffsets():默认的设置。依据代码中设置的group.id设置的消费者组,去kafka中或者zookeeper中找到对应的消费者offset地位生产数据。如果没有找到对应的消费者组的地位,那么将依照auto.offset.reset设置的策略读取offset。

            //读取kafka数据的时候须要指定生产策略,如果不指定会应用auto.offset.reset设置
//            kafkaConsumer.setStartFromEarliest();
            kafkaConsumer.setStartFromLatest();

            DataStreamSource<String> dataStreamSource = env.addSource(kafkaConsumer);

            dataStreamSource.map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
                    log.info("kafka数据:" + value);
                    return "Flink生产kafka数据" + value;
                }
            }).print();

            //TODO 5、调用execute办法触发程序执行
            env.execute();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
}

打包放flink中执行

1、配置打包

2、配置打包

3、配置打包

4、配置打包

5、打包

6、打包

7、上传

8、配置启动

【腾讯云】轻量 2核2G4M,首年65元

阿里云限时活动-云数据库 RDS MySQL  1核2G配置 1.88/月 速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

您可能还喜欢...

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据