Flink流处理API代码详解含多种SourceTransformSink案例Flink学习入门二

6次阅读

共计 13972 个字符,预计需要花费 35 分钟才能阅读完成。

大家好,我是后来,我会分享我在学习和工作中遇到的点滴,希望有机会我的某篇文章能够对你有所帮助,所有的文章都会在公众号首发,欢迎大家关注我的公众号 ” 后来 X 大数据 “,感谢你的支持与认可。

又是一周没更文了,上周末回运城看牙去了,一直都在路上,太累了。说回正题,关于 flink 的入门在上一篇已经讲过了。

今天主要说一下关于 流处理的 API,这一篇所有的代码都是 scala。

那么我们还得回到上次的 WordCount 代码,Flink 程序看起来像转换数据集合的常规程序。每个程序都包含相同的基本部分:

  1. 获得 execution environment
  2. 加载 / 创建初始数据
  3. 指定对此数据的转换
  4. 指定将计算结果放在何处
  5. 触发程序执行

获取执行环境

所以要想处理数据,还得从获取执行环境来说起。StreamExecutionEnvironment 是所有 Flink 程序的基础,所以我们来获取一个执行环境。有以下 3 种静态方法

  1. getExecutionEnvironment()
  2. createLocalEnvironment()
  3. createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
 // 获取上下文环境
val contextEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 获取本地环境
val localEnv = StreamExecutionEnvironment.createLocalEnvironment(1)
// 获取集群环境
val romoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("bigdata101",3456,2,"/ce.jar")

但一般来说,我们只需要使用第一种 getExecutionEnvironment(),因为 它将根据上下文执行正确的操作;也即是说,它会根据查询运行的方式决定返回什么样的运行环境,你是 IDE 执行,它会返回本地执行环境,如果你是集群执行,它会返回集群执行环境。

预定义的数据流源

好了,获取到环境之后,我们就开始获取数据源,flink 自身是支持多数据源的,首先来看几个预定义的数据流源

  • 基于文件

    1. readTextFile(path)- TextInputFormat 逐行读取文本文件,并将其作为字符串返回,只读取一次。
    2. readFile(fileInputFormat, path) - 根据 指定的文件输入格式 读取文件,只读取一次。

但事实上,上面的 2 个方法内部都是调用的 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
我们来看看源码:


我们选择其中第一个比较简单的方法进入,就看到了下图,发现其实上述的 2 种方法最终都会落到这个 readFile(fileInputFormat, path, watchType, interval, pathFilter)方法上,只不过后面的参数都是默认值了。

所以,这些参数当然也可以自己指定。好了,这个方法大家也不常用,所以就简单介绍下,有需要的小伙伴自己试试这些后面的参数。

  • 基于套接字

socketTextStream-从套接字读取 。元素可以由定界符分隔。
这里提到了套接字,这个我在终于懂了 TCP 协议为什么是可靠的,计算机基础(六)之运输层是讲过的,这里再说一下:
套接字 socket = {IP 地址:端口号},示例:192.168.1.99:3456
代码使用如下:

val wordDS: DataStream[String] = contextEnv.socketTextStream("bigdata101",3456)

套接字是抽象的,只是为了表示 TCP 连接而存在。

  • 基于集合
  1. fromCollection(Seq)- 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须具有相同的类型。
  2. fromCollection(Iterator)- 从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
  3. fromElements(elements: _*)- 从给定的对象序列创建数据流。所有对象必须具有相同的类型。
  4. fromParallelCollection(SplittableIterator)- 从迭代器并行创建数据流。该类指定迭代器返回的元素的数据类型。
  5. generateSequence(from, to) - 并行生成给定间隔中的数字序列。

这些预设的数据源使用的也不是很多,可以说是几乎不用。所以大家可以自己尝试一下。
当然注意,如果使用 fromCollection(Seq),因为是从 Java.util.Collection 创建数据流,所以如果你是用 scala 编程,那么就需要 引入 隐式转换

import org.apache.flink.streaming.api.scala._

获取数据源 Source

大家也能发现,以上的方法几乎都是从一个固定的数据源中获取数据,适合自己测试,但在生产中肯定是不能使用的,所以我们来看看正儿八经的数据源:
官方支持的 source 与 sink 如下:

  1. Apache Kafka(源 / 接收器)
  2. Apache Cassandra(接收器)
  3. Amazon Kinesis Streams(源 / 接收器)
  4. Elasticsearch(接收器)
  5. Hadoop 文件系统(接收器)
  6. RabbitMQ(源 / 接收器)
  7. Apache NiFi(源 / 接收器)
  8. Twitter Streaming API(源)
  9. Google PubSub(源 / 接收器)

加粗的 3 个日常中比较常用的,那么也发现其实数据源只有 kafka,sink 有 ES 和 HDFS,那么我们先来说说 kafka Source,关于 Kafka 的安装部署这里就不讲了,自行 Google。我们来贴代码与分析。

Kafka Source

  1. 在 pom.xml 中导入 kafka 依赖
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

这里就涉及到了版本的问题:大家可以根据自己的版本进行调试。但是注意:目前 flink 1.7 版本开始,通用 Kafka 连接器被视为处于 BETA 状态,并且可能不如 0.11 连接器那么稳定。所以建议大家使用flink-connector-kafka-0.11_2.11

  1. 贴测试代码
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka. FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._

/**
  * @description: ${kafka Source 测试}
  * @author: Liu Jun Jun
  * @create: 2020-06-10 10:56
  **/
object kafkaSource {def main(args: Array[String]): Unit = {
// 获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    // 配置 kafka 连接器
    properties.setProperty("bootstrap.servers", "bigdata101:9092")
    properties.setProperty("group.id", "test")
    // 设置序列化方式
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    // 设置 offset 消费方式:可设置的参数为:earliest,latest,none
    properties.setProperty("auto.offset.reset", "latest")
    //earliest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费
    //latest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据
    //none:topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常

    val kafkaDS: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String](
        "test1",
        new SimpleStringSchema(),
        properties
      )
    )
    kafkaDS.print("测试:")
    env.execute("kafkaSource")
  }
}


在这个里面,要注意的是消费 offset 的方式,3 个参数的区别:

  1. 如果存在已经提交的 offest 时,不管设置为 earliest 或者 latest 都会从已经提交的 offest 处开始消费
  2. 如果不存在已经提交的 offest 时,earliest 表示从头开始消费,latest 表示从最新的数据消费, 也就是新产生的数据.
  3. none:topic 各分区都存在已提交的 offset 时,从提交的 offest 处开始消费;只要有一个分区不存在已提交的 offset,则抛出异常

关于 kafka 序列化的设置这个需要根据实际的需求配置。

上面只是简单的使用 Kafka 作为了数据源获取到了数据,至于怎么做检查点以及精准一次性消费这类共性话题,我们之后单独拿出来再讲,这次先把基本的 API 过一下

Transform 算子

说完了 Source,接下来就是 Transform,这类的算子可以说是很多了,官网写的非常全,我把链接贴在这里,大家可以直接看官网:flink 官网的转换算子介绍

而我们常用的也就是下面这些,功能能 spark 的算子可以说是几乎一样。所以我们简单看一下:

  1. Map 映射 —– 以元素为单位进行映射,会生成新的数据流;DataStream → DataStream
// 输入单词转换为(word,1)wordDS.map((_,1))
  1. FlatMap 压平,DataStream → DataStream
// 输入的一行字符串按照空格切分单词
dataStream.flatMap(_.split(" "))
  1. Filter 过滤,DataStream → DataStream
// 过滤出对 2 取余等于 0 的数字
dataStream.filter(_ % 2 == 0)
  1. KeyBy 分组
// 计算 wordCount,按照单词分组, 这里的 0 指的是 tuple 的位数,因为(word,1)这类新的流,按照 word 分组,而 word 就是第 0 位
wordDS.map((_,1)).keyBy(0)
  1. reduce 聚合
// 对上述 KeyBy 后的(word,count)做聚合,合并当前的元素和上次聚合的结果,实现了 wordCount
wordDS.map((_,1)).keyBy(0).reduce{(s1,s2) =>{(s1._1,s1._2 + s2._2)
      }
    }

关于双流 join 与窗口的算子我在下一站会着重说,这一篇先了解一些常用的简单的 API 目的就达到了。

函数

在 flink 中,对数据处理,除了上述一些简单的转换算子外,还经常碰到一些无法通过上述算子解决的问题,于是就需要我们 自定义实现 UDF 函数
关于 UDF,UDTF,UDAF
UDF:User Defined Function,用户自定义函数,一进一出
UDAF:User- Defined Aggregation Funcation 用户自定义聚合函数,多进一出
UDTF:User-Defined Table-Generating Functions,用户定义表生成函数,用来解决输入一行输出多行

UDF 函数类

其实在我们常用的算子如 map、filter 等都暴露了对应的接口,可以自定义实现:
举例如 map:

 val StuDS: DataStream[Stu] = kafkaDS.map(
 // 在内部我们可以自定义实现 MapFunction,从而实现类型的转换
      new MapFunction[ObjectNode, Stu]() {override def map(value: ObjectNode): Stu = {JSON.parseObject(value.get("value").toString, classOf[Stu])
        }
      }
    )

富函数 Rich Functions

除了上述的函数外,使用多的就还有富函数 Rich Functions,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法, open,close,getRuntimeContext,和 setRuntimeContext,所以可以实现更复杂的功能,比如累加器和计算器等。

那我们来简单实现一个累加器

累加器是具有加法运算和最终累加结果的简单结构,可在作业结束后使用。最简单的累加器是一个计数器:您可以使用 Accumulator.add(V value)方法将其递增。在工作结束时,Flink 将汇总(合并)所有部分结果并将结果发送给客户端。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration

/**
  * @description: ${description}
  * @author: Liu Jun Jun
  * @create: 2020-06-12 17:55
  **/
object AccumulatorTest {def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.streaming.api.scala._

    val dataDS = env
        .readTextFile("input/word.txt")
      //.socketTextStream("bigdata101", 3456)


    val resultDS: DataStream[String] = dataDS.map(new RichMapFunction[String, String] {

      // 第一步:定义累加器
      private val numLines = new IntCounter

      override def open(parameters: Configuration): Unit = {super.open(parameters)
        // 注册累加器
        getRuntimeContext.addAccumulator("num-lines", this.numLines)
      }

      override def map(value: String): String = {this.numLines.add(1)
        value
      }

      override def close(): Unit = super.close()
    })
    resultDS.print("单词输入")

    val jobExecutionResult = env.execute("单词统计")
    // 输出单词个数
    println(jobExecutionResult.getAccumulatorResult("num-lines"))
  }
}

注意:这个案例中,我使用的是有限流,原因是这个累加器的值只有在最后程序的结束的时候才能打印出来,或者是可以直接在 Flink UI 中体现。

那么如何实现随时打印打印出累加器的值呢?那就需要我们自定义实现累加器了:

而实现自定义的累加器我还没写完。。。。。

Sink

那么当我们通过 flink 对数据处理结束后,要把结果数据放到相应的数据存放点,也就是 sink 了,方便后续通过接口调用做报表统计。

那么数据放哪里呢?

  1. ES
  2. redis
  3. Hbase
  4. MYSQL
  5. kafka

ES sink

关于 ES 的介绍,我也发过一篇文章,只不过是入门级别的,有需要的可以看看,贴链接如下:ES 最新版快速入门详解

来吧,贴代码,注意看其中的注释

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

/**
  * @description: ${description}
  * @author: Liu Jun Jun
  * @create: 2020-06-01 11:44
  **/
object flink2ES {def main(args: Array[String]): Unit = {
    // 1. 获取执行环境
    val env: StreamExecutionEnvironment =
      StreamExecutionEnvironment.getExecutionEnvironment
      //2. 设置并行度为 2
    env.setParallelism(2)
    //3. 设置关于 kafka 数据源的配置,主题,节点,消费者组,序列化,消费 offset 形式
    val topic = "ctm_student"
    val properties = new java.util.Properties()
    properties.setProperty("bootstrap.servers", "bigdata101:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    // 从 kafka 中获取数据
    val kafkaDS: DataStream[String] =
      env.addSource(new FlinkKafkaConsumer011[String](
          topic,
          new SimpleStringSchema(),
          properties) )

    // 添加 ES 连接
    val httpHosts = new java.util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("bigdata101", 9200))
    
    // 创建 ESSink 对象,在其中对数据进行操作
    val esSinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, new ElasticsearchSinkFunction[String] {def createIndexRequest(element: String): IndexRequest = {val json = new java.util.HashMap[String, String]
          json.put("data", element)

          return Requests.indexRequest()
            .index("ws")
            .`type`("readingData")
            .source(json)
        }
        // 重写 process 方法,对输入的数据进行处理,runtimeContext 为上下文环境,requestIndexer 为操作 index 的对象
     override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {

        // 在 add 方法中,参数可以为:增、删、改、行动请求
       requestIndexer.add(createIndexRequest(t))
       println("saved successfully")
      }
    })
    // 测试时这句代码一定要写,意思时:每一个请求都进行刷新
    // 否则在测试的适合,kafka 生产几条数据,但 ES 中却查不到,默认 ES 是 5000 条消息刷新一次,这就涉及到了 ES 的架构中索引的刷新频率,下面会写相应的配置。esSinkBuilder.setBulkFlushMaxActions(1)
    // 真正将数据发送到 ES 中
    kafkaDS.addSink(esSinkBuilder.build())
    // 触发执行
    env.execute()}
}

我们在上面的代码中,初步的通过 kafka 来获取数据,然后直接写到了 ES 中,但模拟的只是执行单个索引请求,我们在日常的生产中,肯定不是说一次请求刷新一次,这对 ES 来说,压力太大了,所以会有批量提交的配置。

  • bulk.flush.max.actions:刷新前要缓冲的最大操作数。
  • bulk.flush.max.size.mb:刷新前要缓冲的最大数据大小(以兆字节为单位)。

    • bulk.flush.interval.ms:刷新间隔,无论缓冲操作的数量或大小如何。

对于 ES 现在的版本,还支持配置重试临时请求错误的方式:

  • bulk.flush.backoff.enable:如果刷新的一个或多个操作由于临时原因而失败,是否对刷新执行延迟退避重试 EsRejectedExecutionException。

    • bulk.flush.backoff.type:退避延迟的类型,可以是 CONSTANT 或 EXPONENTIAL
    • bulk.flush.backoff.delay:延迟的延迟量。对于恒定的退避,这只是每次重试之间的延迟。对于指数补偿,这是初始基准延迟。
    • bulk.flush.backoff.retries:尝试尝试的退避重试次数

Redis Sink

关于 Redis 大家应该很熟悉了,我们来模拟一下数据处理结束后存入 Redis,我这里模拟的是 redis 单点。

注意:因为我们是在 IDE 中要访问远程的 redis,所以 redis 的 redis.conf 配置文件中,需要修改 2 个地方:

  1. 注释 bind 127.0.0.1,否则只能安装 redis 的本机连接,其他机器不能访问

  1. 关闭保护模式

protected-mode no

然后就可以启动 redis 啦,这里再把几个简单的命令贴一下,做到全套服务,哈哈

  • 启动 redis 服务:redis-server /usr/local/redis/redis.conf
  • 进入 redis:

进入的命令:redis-cli
指定 IP:redis-cli -h master102
redis 中文显示问题:redis-cli -h master -raw(也就是多加一个 -raw)
多个 Redis 同时启动,则需指定端口号访问 redis-cli -p 端口号

  • 关闭 redis 服务:
    1) 单实例关闭

如果还未通过客户端访问,可直接 redis-cli shutdown
如果已经进入客户端, 直接 shutdown 即可.

2) 多实例关闭

指定端口关闭 redis-cli -p 端口号 shutdown

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

/**
  * @description: ${连接单节点 redis 测试}
  * @author: Liu Jun Jun
  * @create: 2020-06-12 11:23
  **/
object flink2Redis {def main(args: Array[String]): Unit = {
    // 转换
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)

    val topic = "test1"
    val properties = new java.util.Properties()
    properties.setProperty("bootstrap.servers", "bigdata101:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    // 从 kafka 中获取数据
    val kafkaDS: DataStream[String] =
      env.addSource(new FlinkKafkaConsumer011[String](
          topic,
          new SimpleStringSchema(),
          properties) )

    kafkaDS.print("data:")

    val conf = new FlinkJedisPoolConfig.Builder().setHost("bigdata103").setPort(6379).build()

    kafkaDS.addSink(new RedisSink[String](conf, new RedisMapper[String] {
      override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET,"sensor")
      }

      override def getKeyFromData(t: String): String = {t.split(",")(0)
      }

      override def getValueFromData(t: String): String = {t.split(",")(1)
      }
    }))

    env.execute()}
}

flink to Hbase

有的时候,还需要把 Flink 处理过的数据写到 Hbase,那我们也来简单试试。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * @description: ${flink to Hbase}
  * @author: Liu Jun Jun
  * @create: 2020-05-29 17:53
  **/
object flink2Hbase {def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.streaming.api.scala._

    val stuDS = env
        .socketTextStream("bigdata101",3456)
      .map(s => {
      // 这里我创建了一个 student 样例类,只有 name 和 age
        val stu: Array[String] = s.split(",")
        student(stu(0),stu(1).toInt)
      })
   // 这里我们模拟的比较简单,没有对数据进行处理,直接写入到 Hbase,这个 HBaseSink 是自己写的类,往下看
    val hBaseSink: HBaseSink = new HBaseSink("WordCount","info1")

    stuDS.addSink(hBaseSink)

    env.execute("app")
  }
}

case class student(name : String,age : Int)

/**
  * @description: ${封装 Hbase 连接}
  * @author: Liu Jun Jun
  * @create: 2020-06-01 14:41
  **/
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes

class HBaseSink(tableName: String, family: String) extends RichSinkFunction[student] {

  var conn: Connection = _
// 创建连接
  override def open(parameters: Configuration): Unit = {conn = HbaseFactoryUtil.getConn()
  }

// 调用
  override def invoke(value: student): Unit = {val t: Table = conn.getTable(TableName.valueOf(tableName))

    val put: Put = new Put(Bytes.toBytes(value.age))
    put.addColumn(Bytes.toBytes(family), Bytes.toBytes("name"), Bytes.toBytes(value.name))
    put.addColumn(Bytes.toBytes(family), Bytes.toBytes("age"), Bytes.toBytes(value.age))
    t.put(put)
    t.close()}
  override def close(): Unit = {}
}

好了,到这里 flink 的一些基本流处理 API 已经差不多说完了,但是关于 flink 特别重要的 窗口、精准一次性、状态编程、时间语义等重点还没说,所以下一篇关于 flink 的文章就再聊聊这些关键点。

扫码关注公众号“后来 X 大数据”,回复【电子书】,领取超多本 pdf【java 及大数据 电子书】

正文完
 0