共计 3768 个字符,预计需要花费 10 分钟才能阅读完成。
flink 介绍
Flink 是一个框架和分布式解决引擎,用于对无限度和有限度的数据留进行有状态的计算。Flink 被设计为可在所有常见的集群环境中运行,以内存速度和任何规模执行计算。解决无界和有界数据 | |
任何类型的数据都是作为事件流产生的。信用卡交易,传感器测量,机器日志或网站挪动应用程序上的用户交互,所有这些数据均作为流生成。Flink 功能强大,反对开发和运行多种不同品种的应用程序。它的次要个性包含:批流一体化、精细的状态治理、事件工夫反对以及准确一次的状态一致性保障等。在启用高可用选项的状况下,它不存在单点生效问题。事实证明,Flink 曾经能够扩大到数千外围,其状态能够达到 TB 级别,且仍能放弃高吞吐、低提早的个性。世界各地有很多要求严苛的流解决利用都运行在 Flink 之上。外围点:1、高吞吐,低提早 | |
2、后果的准确性 | |
3、准确一次的状态一致性保障 | |
4、高可用,反对动静扩大 |
依赖
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-java</artifactId> | |
<version>1.8.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-streaming-java_2.11</artifactId> | |
<version>1.8.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-clients_2.11</artifactId> | |
<version>1.8.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-connector-kafka_2.11</artifactId> | |
<version>1.8.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka</artifactId> | |
</dependency> |
Flink 生产 Kafka 数据
package com.kafka; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.common.serialization.SimpleStringSchema; | |
import org.apache.flink.streaming.api.datastream.DataStreamSource; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; | |
import org.apache.kafka.clients.CommonClientConfigs; | |
import org.apache.kafka.common.config.SaslConfigs; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Properties; | |
@Slf4j | |
public class Flink {public static void main(String[] args) { | |
try { | |
//TODO 1、初始化 flink 流解决的运行环境 | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
//TODO 2、创立数据源 | |
Properties properties = new Properties(); | |
// 封装 kafka 的连贯地址 | |
properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); | |
// 指定消费者 id | |
properties.setProperty("group.id", "consumer-group"); | |
// 设置动静监测分区或者主题的变动 | |
properties.setProperty("flink.partition-discovery.interval-millis", "30000"); | |
// 配置 security.protocol | |
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); | |
// 配置 sasl.mechanism | |
// properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); | |
properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256"); | |
// 配置 sasl.jaas.config | |
properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\" 账号 \"password=\" 明码 \";"); | |
// 生产主题 | |
// 生产多个主题 | |
List<String> topicList = new ArrayList<>(); | |
topicList.add("OGG_TEST.T_OGGO_FB"); | |
// topicList.add("OGG_TEST.T_OGG_FB"); | |
// topicList.add("OGG_TEST.T_OGGO_EB"); | |
// topicList.add("OGG_TEST.T_OGG_EB"); | |
// topicList.add("OGG_TEST.T_OGGO_DB"); | |
// topicList.add("OGG_TEST.T_OGG_DB"); | |
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), properties); | |
// KafkaConsumer.setStartFromEarliest(): 从 topic 的最早 offset 地位开始解决数据,如果 kafka 中保留有消费者组的生产地位将被疏忽。// KafkaConsumer.setStartFromLatest(): 从 topic 的最新 offset 地位开始解决数据,如果 kafka 中保留有消费者组的生产地位将被疏忽。// KafkaConsumer.setStartFromTimestamp(…): 从指定的工夫戳(毫秒)开始生产数据,Kafka 中每个分区中数据大于等于设置的工夫戳的数据地位将被当做开始生产的地位。如果 kafka 中保留有消费者组的生产地位将被疏忽。// KafkaConsumer.setStartFromGroupOffsets(): 默认的设置。依据代码中设置的 group.id 设置的消费者组,去 kafka 中或者 zookeeper 中找到对应的消费者 offset 地位生产数据。如果没有找到对应的消费者组的地位,那么将依照 auto.offset.reset 设置的策略读取 offset。// 读取 kafka 数据的时候须要指定生产策略,如果不指定会应用 auto.offset.reset 设置 | |
// kafkaConsumer.setStartFromEarliest(); | |
kafkaConsumer.setStartFromLatest(); | |
DataStreamSource<String> dataStreamSource = env.addSource(kafkaConsumer); | |
dataStreamSource.map(new MapFunction<String, String>() { | |
@Override | |
public String map(String value) throws Exception {log.info("kafka 数据:" + value); | |
return "Flink 生产 kafka 数据" + value; | |
} | |
}).print(); | |
//TODO 5、调用 execute 办法触发程序执行 | |
env.execute();} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} |
打包放 flink 中执行
1、配置打包
2、配置打包
3、配置打包
4、配置打包
5、打包
6、打包
7、上传
8、配置启动
正文完