大家好,我是后来,我会分享我在学习和工作中遇到的点滴,希望有机会我的某篇文章能够对你有所帮助,所有的文章都会在公众号首发,欢迎大家关注我的公众号 ” 后来 X 大数据 “,感谢你的支持与认可。
又是一周没更文了,上周末回运城看牙去了,一直都在路上,太累了。说回正题,关于 flink 的入门在上一篇已经讲过了。
今天主要说一下关于 流处理的 API,这一篇所有的代码都是 scala。
那么我们还得回到上次的 WordCount 代码,Flink 程序看起来像转换数据集合的常规程序。每个程序都包含相同的基本部分:
- 获得 execution environment
- 加载 / 创建初始数据
- 指定对此数据的转换
- 指定将计算结果放在何处
- 触发程序执行
获取执行环境
所以要想处理数据,还得从获取执行环境来说起。StreamExecutionEnvironment 是所有 Flink 程序的基础,所以我们来获取一个执行环境。有以下 3 种静态方法
- getExecutionEnvironment()
- createLocalEnvironment()
- createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
// 获取上下文环境
val contextEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 获取本地环境
val localEnv = StreamExecutionEnvironment.createLocalEnvironment(1)
// 获取集群环境
val romoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("bigdata101",3456,2,"/ce.jar")
但一般来说,我们只需要使用第一种 getExecutionEnvironment(),因为 它将根据上下文执行正确的操作;也即是说,它会根据查询运行的方式决定返回什么样的运行环境,你是 IDE 执行,它会返回本地执行环境,如果你是集群执行,它会返回集群执行环境。
预定义的数据流源
好了,获取到环境之后,我们就开始获取数据源,flink 自身是支持多数据源的,首先来看几个预定义的数据流源
-
基于文件
- readTextFile(path)- TextInputFormat 逐行读取文本文件,并将其作为字符串返回,只读取一次。
- readFile(fileInputFormat, path) - 根据 指定的文件输入格式 读取文件,只读取一次。
但事实上,上面的 2 个方法内部都是调用的 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
我们来看看源码:
我们选择其中第一个比较简单的方法进入,就看到了下图,发现其实上述的 2 种方法最终都会落到这个 readFile(fileInputFormat, path, watchType, interval, pathFilter)方法上,只不过后面的参数都是默认值了。
所以,这些参数当然也可以自己指定。好了,这个方法大家也不常用,所以就简单介绍下,有需要的小伙伴自己试试这些后面的参数。
- 基于套接字
socketTextStream-从套接字读取 。元素可以由定界符分隔。
这里提到了套接字,这个我在终于懂了 TCP 协议为什么是可靠的,计算机基础(六)之运输层是讲过的,这里再说一下:
套接字 socket = {IP 地址:端口号},示例:192.168.1.99:3456
代码使用如下:
val wordDS: DataStream[String] = contextEnv.socketTextStream("bigdata101",3456)
套接字是抽象的,只是为了表示 TCP 连接而存在。
- 基于集合
- fromCollection(Seq)- 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须具有相同的类型。
- fromCollection(Iterator)- 从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
- fromElements(elements: _*)- 从给定的对象序列创建数据流。所有对象必须具有相同的类型。
- fromParallelCollection(SplittableIterator)- 从迭代器并行创建数据流。该类指定迭代器返回的元素的数据类型。
- generateSequence(from, to) - 并行生成给定间隔中的数字序列。
这些预设的数据源使用的也不是很多,可以说是几乎不用。所以大家可以自己尝试一下。
当然注意,如果使用 fromCollection(Seq),因为是从 Java.util.Collection 创建数据流,所以如果你是用 scala 编程,那么就需要 引入 隐式转换
import org.apache.flink.streaming.api.scala._
获取数据源 Source
大家也能发现,以上的方法几乎都是从一个固定的数据源中获取数据,适合自己测试,但在生产中肯定是不能使用的,所以我们来看看正儿八经的数据源:
官方支持的 source 与 sink 如下:
- Apache Kafka(源 / 接收器)
- Apache Cassandra(接收器)
- Amazon Kinesis Streams(源 / 接收器)
- Elasticsearch(接收器)
- Hadoop 文件系统(接收器)
- RabbitMQ(源 / 接收器)
- Apache NiFi(源 / 接收器)
- Twitter Streaming API(源)
- Google PubSub(源 / 接收器)
加粗的 3 个日常中比较常用的,那么也发现其实数据源只有 kafka,sink 有 ES 和 HDFS,那么我们先来说说 kafka Source,关于 Kafka 的安装部署这里就不讲了,自行 Google。我们来贴代码与分析。
Kafka Source
- 在 pom.xml 中导入 kafka 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
这里就涉及到了版本的问题:大家可以根据自己的版本进行调试。但是注意:目前 flink 1.7 版本开始,通用 Kafka 连接器被视为处于 BETA 状态,并且可能不如 0.11 连接器那么稳定。所以建议大家使用flink-connector-kafka-0.11_2.11
- 贴测试代码
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka. FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
/**
* @description: ${kafka Source 测试}
* @author: Liu Jun Jun
* @create: 2020-06-10 10:56
**/
object kafkaSource {def main(args: Array[String]): Unit = {
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
// 配置 kafka 连接器
properties.setProperty("bootstrap.servers", "bigdata101:9092")
properties.setProperty("group.id", "test")
// 设置序列化方式
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
// 设置 offset 消费方式:可设置的参数为:earliest,latest,none
properties.setProperty("auto.offset.reset", "latest")
//earliest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费
//latest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据
//none:topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常
val kafkaDS: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String](
"test1",
new SimpleStringSchema(),
properties
)
)
kafkaDS.print("测试:")
env.execute("kafkaSource")
}
}
在这个里面,要注意的是消费 offset 的方式,3 个参数的区别:
- 如果存在已经提交的 offest 时,不管设置为 earliest 或者 latest 都会从已经提交的 offest 处开始消费
- 如果不存在已经提交的 offest 时,earliest 表示从头开始消费,latest 表示从最新的数据消费, 也就是新产生的数据.
- none:topic 各分区都存在已提交的 offset 时,从提交的 offest 处开始消费;只要有一个分区不存在已提交的 offset,则抛出异常
关于 kafka 序列化的设置这个需要根据实际的需求配置。
上面只是简单的使用 Kafka 作为了数据源获取到了数据,至于怎么做检查点以及精准一次性消费这类共性话题,我们之后单独拿出来再讲,这次先把基本的 API 过一下
Transform 算子
说完了 Source,接下来就是 Transform,这类的算子可以说是很多了,官网写的非常全,我把链接贴在这里,大家可以直接看官网:flink 官网的转换算子介绍
而我们常用的也就是下面这些,功能能 spark 的算子可以说是几乎一样。所以我们简单看一下:
- Map 映射 —– 以元素为单位进行映射,会生成新的数据流;DataStream → DataStream
// 输入单词转换为(word,1)wordDS.map((_,1))
- FlatMap 压平,DataStream → DataStream
// 输入的一行字符串按照空格切分单词
dataStream.flatMap(_.split(" "))
- Filter 过滤,DataStream → DataStream
// 过滤出对 2 取余等于 0 的数字
dataStream.filter(_ % 2 == 0)
- KeyBy 分组
// 计算 wordCount,按照单词分组, 这里的 0 指的是 tuple 的位数,因为(word,1)这类新的流,按照 word 分组,而 word 就是第 0 位
wordDS.map((_,1)).keyBy(0)
- reduce 聚合
// 对上述 KeyBy 后的(word,count)做聚合,合并当前的元素和上次聚合的结果,实现了 wordCount
wordDS.map((_,1)).keyBy(0).reduce{(s1,s2) =>{(s1._1,s1._2 + s2._2)
}
}
关于双流 join 与窗口的算子我在下一站会着重说,这一篇先了解一些常用的简单的 API 目的就达到了。
函数
在 flink 中,对数据处理,除了上述一些简单的转换算子外,还经常碰到一些无法通过上述算子解决的问题,于是就需要我们 自定义实现 UDF 函数
关于 UDF,UDTF,UDAF
UDF:User Defined Function,用户自定义函数,一进一出
UDAF:User- Defined Aggregation Funcation 用户自定义聚合函数,多进一出
UDTF:User-Defined Table-Generating Functions,用户定义表生成函数,用来解决输入一行输出多行
UDF 函数类
其实在我们常用的算子如 map、filter 等都暴露了对应的接口,可以自定义实现:
举例如 map:
val StuDS: DataStream[Stu] = kafkaDS.map(
// 在内部我们可以自定义实现 MapFunction,从而实现类型的转换
new MapFunction[ObjectNode, Stu]() {override def map(value: ObjectNode): Stu = {JSON.parseObject(value.get("value").toString, classOf[Stu])
}
}
)
富函数 Rich Functions
除了上述的函数外,使用多的就还有富函数 Rich Functions,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法, open,close,getRuntimeContext,和 setRuntimeContext,所以可以实现更复杂的功能,比如累加器和计算器等。
那我们来简单实现一个累加器
累加器是具有加法运算和最终累加结果的简单结构,可在作业结束后使用。最简单的累加器是一个计数器:您可以使用 Accumulator.add(V value)方法将其递增。在工作结束时,Flink 将汇总(合并)所有部分结果并将结果发送给客户端。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
/**
* @description: ${description}
* @author: Liu Jun Jun
* @create: 2020-06-12 17:55
**/
object AccumulatorTest {def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
val dataDS = env
.readTextFile("input/word.txt")
//.socketTextStream("bigdata101", 3456)
val resultDS: DataStream[String] = dataDS.map(new RichMapFunction[String, String] {
// 第一步:定义累加器
private val numLines = new IntCounter
override def open(parameters: Configuration): Unit = {super.open(parameters)
// 注册累加器
getRuntimeContext.addAccumulator("num-lines", this.numLines)
}
override def map(value: String): String = {this.numLines.add(1)
value
}
override def close(): Unit = super.close()
})
resultDS.print("单词输入")
val jobExecutionResult = env.execute("单词统计")
// 输出单词个数
println(jobExecutionResult.getAccumulatorResult("num-lines"))
}
}
注意:这个案例中,我使用的是有限流,原因是这个累加器的值只有在最后程序的结束的时候才能打印出来,或者是可以直接在 Flink UI 中体现。
那么如何实现随时打印打印出累加器的值呢?那就需要我们自定义实现累加器了:
而实现自定义的累加器我还没写完。。。。。
Sink
那么当我们通过 flink 对数据处理结束后,要把结果数据放到相应的数据存放点,也就是 sink 了,方便后续通过接口调用做报表统计。
那么数据放哪里呢?
- ES
- redis
- Hbase
- MYSQL
- kafka
ES sink
关于 ES 的介绍,我也发过一篇文章,只不过是入门级别的,有需要的可以看看,贴链接如下:ES 最新版快速入门详解
来吧,贴代码,注意看其中的注释
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
/**
* @description: ${description}
* @author: Liu Jun Jun
* @create: 2020-06-01 11:44
**/
object flink2ES {def main(args: Array[String]): Unit = {
// 1. 获取执行环境
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
//2. 设置并行度为 2
env.setParallelism(2)
//3. 设置关于 kafka 数据源的配置,主题,节点,消费者组,序列化,消费 offset 形式
val topic = "ctm_student"
val properties = new java.util.Properties()
properties.setProperty("bootstrap.servers", "bigdata101:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
// 从 kafka 中获取数据
val kafkaDS: DataStream[String] =
env.addSource(new FlinkKafkaConsumer011[String](
topic,
new SimpleStringSchema(),
properties) )
// 添加 ES 连接
val httpHosts = new java.util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("bigdata101", 9200))
// 创建 ESSink 对象,在其中对数据进行操作
val esSinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, new ElasticsearchSinkFunction[String] {def createIndexRequest(element: String): IndexRequest = {val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("ws")
.`type`("readingData")
.source(json)
}
// 重写 process 方法,对输入的数据进行处理,runtimeContext 为上下文环境,requestIndexer 为操作 index 的对象
override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
// 在 add 方法中,参数可以为:增、删、改、行动请求
requestIndexer.add(createIndexRequest(t))
println("saved successfully")
}
})
// 测试时这句代码一定要写,意思时:每一个请求都进行刷新
// 否则在测试的适合,kafka 生产几条数据,但 ES 中却查不到,默认 ES 是 5000 条消息刷新一次,这就涉及到了 ES 的架构中索引的刷新频率,下面会写相应的配置。esSinkBuilder.setBulkFlushMaxActions(1)
// 真正将数据发送到 ES 中
kafkaDS.addSink(esSinkBuilder.build())
// 触发执行
env.execute()}
}
我们在上面的代码中,初步的通过 kafka 来获取数据,然后直接写到了 ES 中,但模拟的只是执行单个索引请求,我们在日常的生产中,肯定不是说一次请求刷新一次,这对 ES 来说,压力太大了,所以会有批量提交的配置。
- bulk.flush.max.actions:刷新前要缓冲的最大操作数。
-
bulk.flush.max.size.mb:刷新前要缓冲的最大数据大小(以兆字节为单位)。
- bulk.flush.interval.ms:刷新间隔,无论缓冲操作的数量或大小如何。
对于 ES 现在的版本,还支持配置重试临时请求错误的方式:
-
bulk.flush.backoff.enable:如果刷新的一个或多个操作由于临时原因而失败,是否对刷新执行延迟退避重试 EsRejectedExecutionException。
- bulk.flush.backoff.type:退避延迟的类型,可以是 CONSTANT 或 EXPONENTIAL
- bulk.flush.backoff.delay:延迟的延迟量。对于恒定的退避,这只是每次重试之间的延迟。对于指数补偿,这是初始基准延迟。
- bulk.flush.backoff.retries:尝试尝试的退避重试次数
Redis Sink
关于 Redis 大家应该很熟悉了,我们来模拟一下数据处理结束后存入 Redis,我这里模拟的是 redis 单点。
注意:因为我们是在 IDE 中要访问远程的 redis,所以 redis 的 redis.conf 配置文件中,需要修改 2 个地方:
- 注释 bind 127.0.0.1,否则只能安装 redis 的本机连接,其他机器不能访问
- 关闭保护模式
protected-mode no
然后就可以启动 redis 啦,这里再把几个简单的命令贴一下,做到全套服务,哈哈
- 启动 redis 服务:redis-server /usr/local/redis/redis.conf
- 进入 redis:
进入的命令:redis-cli
指定 IP:redis-cli -h master102
redis 中文显示问题:redis-cli -h master -raw(也就是多加一个 -raw)
多个 Redis 同时启动,则需指定端口号访问 redis-cli -p 端口号
- 关闭 redis 服务:
1) 单实例关闭
如果还未通过客户端访问,可直接 redis-cli shutdown
如果已经进入客户端, 直接 shutdown 即可.
2) 多实例关闭
指定端口关闭 redis-cli -p 端口号 shutdown
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
/**
* @description: ${连接单节点 redis 测试}
* @author: Liu Jun Jun
* @create: 2020-06-12 11:23
**/
object flink2Redis {def main(args: Array[String]): Unit = {
// 转换
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val topic = "test1"
val properties = new java.util.Properties()
properties.setProperty("bootstrap.servers", "bigdata101:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
// 从 kafka 中获取数据
val kafkaDS: DataStream[String] =
env.addSource(new FlinkKafkaConsumer011[String](
topic,
new SimpleStringSchema(),
properties) )
kafkaDS.print("data:")
val conf = new FlinkJedisPoolConfig.Builder().setHost("bigdata103").setPort(6379).build()
kafkaDS.addSink(new RedisSink[String](conf, new RedisMapper[String] {
override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET,"sensor")
}
override def getKeyFromData(t: String): String = {t.split(",")(0)
}
override def getValueFromData(t: String): String = {t.split(",")(1)
}
}))
env.execute()}
}
flink to Hbase
有的时候,还需要把 Flink 处理过的数据写到 Hbase,那我们也来简单试试。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* @description: ${flink to Hbase}
* @author: Liu Jun Jun
* @create: 2020-05-29 17:53
**/
object flink2Hbase {def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
val stuDS = env
.socketTextStream("bigdata101",3456)
.map(s => {
// 这里我创建了一个 student 样例类,只有 name 和 age
val stu: Array[String] = s.split(",")
student(stu(0),stu(1).toInt)
})
// 这里我们模拟的比较简单,没有对数据进行处理,直接写入到 Hbase,这个 HBaseSink 是自己写的类,往下看
val hBaseSink: HBaseSink = new HBaseSink("WordCount","info1")
stuDS.addSink(hBaseSink)
env.execute("app")
}
}
case class student(name : String,age : Int)
/**
* @description: ${封装 Hbase 连接}
* @author: Liu Jun Jun
* @create: 2020-06-01 14:41
**/
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
class HBaseSink(tableName: String, family: String) extends RichSinkFunction[student] {
var conn: Connection = _
// 创建连接
override def open(parameters: Configuration): Unit = {conn = HbaseFactoryUtil.getConn()
}
// 调用
override def invoke(value: student): Unit = {val t: Table = conn.getTable(TableName.valueOf(tableName))
val put: Put = new Put(Bytes.toBytes(value.age))
put.addColumn(Bytes.toBytes(family), Bytes.toBytes("name"), Bytes.toBytes(value.name))
put.addColumn(Bytes.toBytes(family), Bytes.toBytes("age"), Bytes.toBytes(value.age))
t.put(put)
t.close()}
override def close(): Unit = {}
}
好了,到这里 flink 的一些基本流处理 API 已经差不多说完了,但是关于 flink 特别重要的 窗口、精准一次性、状态编程、时间语义等重点还没说,所以下一篇关于 flink 的文章就再聊聊这些关键点。
扫码关注公众号“后来 X 大数据”,回复【电子书】,领取超多本 pdf【java 及大数据 电子书】