大家好,我是后来,我会分享我在学习和工作中遇到的点滴,希望有机会我的某篇文章能够对你有所帮助,所有的文章都会在公众号首发,欢迎大家关注我的公众号" 后来X大数据 ",感谢你的支持与认可。
又是一周没更文了,上周末回运城看牙去了,一直都在路上,太累了。说回正题,关于flink的入门在上一篇已经讲过了。
今天主要说一下关于流处理的API,这一篇所有的代码都是scala。
那么我们还得回到上次的WordCount代码,Flink程序看起来像转换数据集合的常规程序。每个程序都包含相同的基本部分:
- 获得execution environment
- 加载/创建初始数据
- 指定对此数据的转换
- 指定将计算结果放在何处
- 触发程序执行
获取执行环境
所以要想处理数据,还得从获取执行环境来说起。StreamExecutionEnvironment是所有Flink程序的基础,所以我们来获取一个执行环境。有以下3种静态方法
- getExecutionEnvironment()
- createLocalEnvironment()
- createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
//获取上下文环境val contextEnv = StreamExecutionEnvironment.getExecutionEnvironment//获取本地环境val localEnv = StreamExecutionEnvironment.createLocalEnvironment(1)//获取集群环境val romoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("bigdata101",3456,2,"/ce.jar")
但一般来说,我们只需要使用第一种getExecutionEnvironment(),因为它将根据上下文执行正确的操作;也即是说,它会根据查询运行的方式决定返回什么样的运行环境,你是IDE执行,它会返回本地执行环境,如果你是集群执行,它会返回集群执行环境。
预定义的数据流源
好了,获取到环境之后,我们就开始获取数据源,flink自身是支持多数据源的,首先来看几个预定义的数据流源
基于文件
- readTextFile(path)- TextInputFormat 逐行读取文本文件,并将其作为字符串返回,只读取一次。
- readFile(fileInputFormat, path) -根据指定的文件输入格式读取文件,只读取一次。
但事实上,上面的2个方法内部都是调用的readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
我们来看看源码:
我们选择其中第一个比较简单的方法进入,就看到了下图,发现其实上述的2种方法最终都会落到这个readFile(fileInputFormat, path, watchType, interval, pathFilter)方法上,只不过后面的参数都是默认值了。
所以,这些参数当然也可以自己指定。好了,这个方法大家也不常用,所以就简单介绍下,有需要的小伙伴自己试试这些后面的参数。
- 基于套接字
socketTextStream-从套接字读取。元素可以由定界符分隔。
这里提到了套接字,这个我在终于懂了TCP协议为什么是可靠的,计算机基础(六)之运输层是讲过的,这里再说一下:
套接字 socket = {IP地址 : 端口号},示例:192.168.1.99 :3456
代码使用如下:
val wordDS: DataStream[String] = contextEnv.socketTextStream("bigdata101",3456)
套接字是抽象的,只是为了表示TCP连接而存在。
- 基于集合
- fromCollection(Seq)-从Java Java.util.Collection创建数据流。集合中的所有元素必须具有相同的类型。
- fromCollection(Iterator)-从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
- fromElements(elements: _*)-从给定的对象序列创建数据流。所有对象必须具有相同的类型。
- fromParallelCollection(SplittableIterator)-从迭代器并行创建数据流。该类指定迭代器返回的元素的数据类型。
- generateSequence(from, to) -并行生成给定间隔中的数字序列。
这些预设的数据源使用的也不是很多,可以说是几乎不用。所以大家可以自己尝试一下。
当然注意,如果使用 fromCollection(Seq),因为是从Java.util.Collection创建数据流,所以如果你是用scala编程,那么就需要引入 隐式转换
import org.apache.flink.streaming.api.scala._
获取数据源Source
大家也能发现,以上的方法几乎都是从一个固定的数据源中获取数据,适合自己测试,但在生产中肯定是不能使用的,所以我们来看看正儿八经的数据源:
官方支持的source与sink如下:
- Apache Kafka(源/接收器)
- Apache Cassandra(接收器)
- Amazon Kinesis Streams(源/接收器)
- Elasticsearch(接收器)
- Hadoop文件系统(接收器)
- RabbitMQ(源/接收器)
- Apache NiFi(源/接收器)
- Twitter Streaming API(源)
- Google PubSub(源/接收器)
加粗的3个日常中比较常用的,那么也发现其实数据源只有kafka,sink有ES和HDFS,那么我们先来说说kafka Source,关于Kafka的安装部署这里就不讲了,自行Google。我们来贴代码与分析。
Kafka Source
- 在pom.xml中导入kafka依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.7.2</version></dependency>
这里就涉及到了版本的问题:大家可以根据自己的版本进行调试。但是注意:目前flink 1.7版本开始,通用Kafka连接器被视为处于BETA状态,并且可能不如0.11连接器那么稳定。所以建议大家使用flink-connector-kafka-0.11_2.11
- 贴测试代码
import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka. FlinkKafkaConsumer011import org.apache.flink.streaming.api.scala._/** * @description: ${kafka Source测试} * @author: Liu Jun Jun * @create: 2020-06-10 10:56 **/object kafkaSource { def main(args: Array[String]): Unit = {//获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() //配置kafka连接器 properties.setProperty("bootstrap.servers", "bigdata101:9092") properties.setProperty("group.id", "test") //设置序列化方式 properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") //设置offset消费方式:可设置的参数为:earliest,latest,none properties.setProperty("auto.offset.reset", "latest") //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 val kafkaDS: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]( "test1", new SimpleStringSchema(), properties ) ) kafkaDS.print("测试:") env.execute("kafkaSource") }}
在这个里面,要注意的是消费offset的方式,3个参数的区别:
- 如果存在已经提交的offest时,不管设置为earliest 或者latest 都会从已经提交的offest处开始消费
- 如果不存在已经提交的offest时,earliest 表示从头开始消费,latest 表示从最新的数据消费,也就是新产生的数据.
- none :topic各分区都存在已提交的offset时,从提交的offest处开始消费;只要有一个分区不存在已提交的offset,则抛出异常
关于kafka序列化的设置这个需要根据实际的需求配置。
上面只是简单的使用Kafka作为了数据源获取到了数据,至于怎么做检查点以及精准一次性消费这类共性话题,我们之后单独拿出来再讲,这次先把基本的API过一下
Transform 算子
说完了Source,接下来就是Transform,这类的算子可以说是很多了,官网写的非常全,我把链接贴在这里,大家可以直接看官网:flink官网的转换算子介绍
而我们常用的也就是下面这些,功能能spark的算子可以说是几乎一样。所以我们简单看一下:
- Map 映射-----以元素为单位进行映射,会生成新的数据流;DataStream → DataStream
//输入单词转换为(word,1)wordDS.map((_,1))
- FlatMap 压平,DataStream → DataStream
//输入的一行字符串按照空格切分单词dataStream.flatMap(_.split(" "))
- Filter 过滤,DataStream → DataStream
//过滤出对2取余等于0的数字dataStream.filter(_ % 2 == 0)
- KeyBy 分组
//计算wordCount,按照单词分组,这里的0指的是tuple的位数,因为(word,1)这类新的流,按照word分组,而word就是第0位wordDS.map((_,1)).keyBy(0)
- reduce 聚合
//对上述KeyBy后的(word,count)做聚合,合并当前的元素和上次聚合的结果,实现了wordCountwordDS.map((_,1)).keyBy(0).reduce{ (s1,s2) =>{ (s1._1,s1._2 + s2._2) } }
关于双流join与窗口的算子我在下一站会着重说,这一篇先了解一些常用的简单的API目的就达到了。
函数
在flink中,对数据处理,除了上述一些简单的转换算子外,还经常碰到一些无法通过上述算子解决的问题,于是就需要我们自定义实现UDF函数
关于UDF,UDTF,UDAF
UDF:User Defined Function,用户自定义函数,一进一出
UDAF:User- Defined Aggregation Funcation 用户自定义聚合函数,多进一出
UDTF: User-Defined Table-Generating Functions,用户定义表生成函数,用来解决输入一行输出多行
UDF函数类
其实在我们常用的算子如map、filter等都暴露了对应的接口,可以自定义实现:
举例如map:
val StuDS: DataStream[Stu] = kafkaDS.map( //在内部我们可以自定义实现MapFunction,从而实现类型的转换 new MapFunction[ObjectNode, Stu]() { override def map(value: ObjectNode): Stu = { JSON.parseObject(value.get("value").toString, classOf[Stu]) } } )
富函数Rich Functions
除了上述的函数外,使用多的就还有富函数Rich Functions,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法, open,close,getRuntimeContext,和 setRuntimeContext,所以可以实现更复杂的功能,比如累加器和计算器等。
那我们来简单实现一个累加器
累加器是具有加法运算和最终累加结果的简单结构,可在作业结束后使用。最简单的累加器是一个计数器:您可以使用Accumulator.add(V value)方法将其递增 。在工作结束时,Flink将汇总(合并)所有部分结果并将结果发送给客户端。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.common.accumulators.IntCounterimport org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.configuration.Configuration/** * @description: ${description} * @author: Liu Jun Jun * @create: 2020-06-12 17:55 **/object AccumulatorTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._ val dataDS = env .readTextFile("input/word.txt") //.socketTextStream("bigdata101", 3456) val resultDS: DataStream[String] = dataDS.map(new RichMapFunction[String, String] { //第一步:定义累加器 private val numLines = new IntCounter override def open(parameters: Configuration): Unit = { super.open(parameters) //注册累加器 getRuntimeContext.addAccumulator("num-lines", this.numLines) } override def map(value: String): String = { this.numLines.add(1) value } override def close(): Unit = super.close() }) resultDS.print("单词输入") val jobExecutionResult = env.execute("单词统计") //输出单词个数 println(jobExecutionResult.getAccumulatorResult("num-lines")) }}
注意:这个案例中,我使用的是有限流,原因是这个累加器的值只有在最后程序的结束的时候才能打印出来,或者是可以直接在Flink UI中体现。
那么如何实现随时打印打印出累加器的值呢?那就需要我们自定义实现累加器了:
而实现自定义的累加器我还没写完。。。。。
Sink
那么当我们通过flink对数据处理结束后,要把结果数据放到相应的数据存放点,也就是sink了,方便后续通过接口调用做报表统计。
那么数据放哪里呢?
- ES
- redis
- Hbase
- MYSQL
- kafka
ES sink
关于ES的介绍,我也发过一篇文章,只不过是入门级别的,有需要的可以看看,贴链接如下:ES最新版快速入门详解
来吧,贴代码,注意看其中的注释
import org.apache.flink.api.common.functions.RuntimeContextimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.datastream.DataStreamimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011import org.apache.http.HttpHostimport org.elasticsearch.action.index.IndexRequestimport org.elasticsearch.client.Requests/** * @description: ${description} * @author: Liu Jun Jun * @create: 2020-06-01 11:44 **/object flink2ES { def main(args: Array[String]): Unit = { // 1.获取执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2. 设置并行度为2 env.setParallelism(2) //3. 设置关于kafka数据源的配置,主题,节点,消费者组,序列化,消费offset形式 val topic = "ctm_student" val properties = new java.util.Properties() properties.setProperty("bootstrap.servers", "bigdata101:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") // 从kafka中获取数据 val kafkaDS: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]( topic, new SimpleStringSchema(), properties) ) //添加ES连接 val httpHosts = new java.util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("bigdata101", 9200)) //创建ESSink对象,在其中对数据进行操作 val esSinkBuilder = new ElasticsearchSink.Builder[String]( httpHosts, new ElasticsearchSinkFunction[String] { def createIndexRequest(element: String): IndexRequest = { val json = new java.util.HashMap[String, String] json.put("data", element) return Requests.indexRequest() .index("ws") .`type`("readingData") .source(json) } //重写process方法,对输入的数据进行处理,runtimeContext为上下文环境,requestIndexer为操作index的对象 override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { //在add方法中,参数可以为:增、删、改、行动请求 requestIndexer.add(createIndexRequest(t)) println("saved successfully") } }) //测试时这句代码一定要写,意思时:每一个请求都进行刷新 //否则在测试的适合,kafka生产几条数据,但ES中却查不到,默认ES是5000条消息刷新一次,这就涉及到了ES的架构中索引的刷新频率,下面会写相应的配置。 esSinkBuilder.setBulkFlushMaxActions(1) //真正将数据发送到ES中 kafkaDS.addSink(esSinkBuilder.build()) //触发执行 env.execute() }}
我们在上面的代码中,初步的通过kafka来获取数据,然后直接写到了ES中,但模拟的只是执行单个索引请求,我们在日常的生产中,肯定不是说一次请求刷新一次,这对ES来说,压力太大了,所以会有批量提交的配置。
- bulk.flush.max.actions:刷新前要缓冲的最大操作数。
bulk.flush.max.size.mb:刷新前要缓冲的最大数据大小(以兆字节为单位)。
- bulk.flush.interval.ms:刷新间隔,无论缓冲操作的数量或大小如何。
对于ES现在的版本,还支持配置重试临时请求错误的方式:
bulk.flush.backoff.enable:如果刷新的一个或多个操作由于临时原因而失败,是否对刷新执行延迟退避重试EsRejectedExecutionException。
- bulk.flush.backoff.type:退避延迟的类型,可以是CONSTANT或EXPONENTIAL
- bulk.flush.backoff.delay:延迟的延迟量。对于恒定的退避,这只是每次重试之间的延迟。对于指数补偿,这是初始基准延迟。
- bulk.flush.backoff.retries:尝试尝试的退避重试次数
Redis Sink
关于Redis大家应该很熟悉了,我们来模拟一下数据处理结束后存入Redis,我这里模拟的是redis单点。
注意:因为我们是在IDE中要访问远程的redis,所以redis的redis.conf配置文件中,需要修改2个地方:
- 注释 bind 127.0.0.1,否则只能安装redis的本机连接,其他机器不能访问
- 关闭保护模式
protected-mode no
然后就可以启动redis啦,这里再把几个简单的命令贴一下,做到全套服务,哈哈
- 启动redis服务:redis-server /usr/local/redis/redis.conf
- 进入redis:
进入的命令:redis-cli
指定IP:redis-cli -h master102
redis中文显示问题:redis-cli -h master -raw(也就是多加一个 -raw)
多个Redis同时启动,则需指定端口号访问 redis-cli -p 端口号
- 关闭redis服务:
1) 单实例关闭
如果还未通过客户端访问,可直接 redis-cli shutdown
如果已经进入客户端,直接 shutdown即可.
2) 多实例关闭
指定端口关闭 redis-cli -p 端口号 shutdown
import org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.datastream.DataStreamimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011import org.apache.flink.streaming.connectors.redis.RedisSinkimport org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfigimport org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}/** * @description: ${连接单节点redis测试} * @author: Liu Jun Jun * @create: 2020-06-12 11:23 **/object flink2Redis { def main(args: Array[String]): Unit = { // 转换 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) val topic = "test1" val properties = new java.util.Properties() properties.setProperty("bootstrap.servers", "bigdata101:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") // 从kafka中获取数据 val kafkaDS: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]( topic, new SimpleStringSchema(), properties) ) kafkaDS.print("data:") val conf = new FlinkJedisPoolConfig.Builder().setHost("bigdata103").setPort(6379).build() kafkaDS.addSink(new RedisSink[String](conf, new RedisMapper[String] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET,"sensor") } override def getKeyFromData(t: String): String = { t.split(",")(0) } override def getValueFromData(t: String): String = { t.split(",")(1) } })) env.execute() }}
flink to Hbase
有的时候,还需要把Flink处理过的数据写到Hbase,那我们也来简单试试。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment/** * @description: ${flink to Hbase} * @author: Liu Jun Jun * @create: 2020-05-29 17:53 **/object flink2Hbase { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._ val stuDS = env .socketTextStream("bigdata101",3456) .map(s => { //这里我创建了一个student样例类,只有name和age val stu: Array[String] = s.split(",") student(stu(0),stu(1).toInt) }) //这里我们模拟的比较简单,没有对数据进行处理,直接写入到Hbase,这个HBaseSink是自己写的类,往下看 val hBaseSink: HBaseSink = new HBaseSink("WordCount","info1") stuDS.addSink(hBaseSink) env.execute("app") }}case class student(name : String,age : Int)/** * @description: ${封装Hbase连接} * @author: Liu Jun Jun * @create: 2020-06-01 14:41 **/import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}import org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.util.Bytesclass HBaseSink(tableName: String, family: String) extends RichSinkFunction[student] { var conn: Connection = _//创建连接 override def open(parameters: Configuration): Unit = { conn = HbaseFactoryUtil.getConn() }//调用 override def invoke(value: student): Unit = { val t: Table = conn.getTable(TableName.valueOf(tableName)) val put: Put = new Put(Bytes.toBytes(value.age)) put.addColumn(Bytes.toBytes(family), Bytes.toBytes("name"), Bytes.toBytes(value.name)) put.addColumn(Bytes.toBytes(family), Bytes.toBytes("age"), Bytes.toBytes(value.age)) t.put(put) t.close() } override def close(): Unit = { }}
好了,到这里flink的一些基本流处理API已经差不多说完了,但是关于flink特别重要的 窗口、精准一次性、状态编程、时间语义等重点还没说,所以下一篇关于flink的文章就再聊聊这些关键点。