关于flink:Flink-常用-API-详解

7次阅读

共计 22719 个字符,预计需要花费 57 分钟才能阅读完成。

@[TOC]

<font color=red size=50> 还有视频解说在我的 B 站 - 宝哥 chbxw, 心愿大家能够反对一下,谢谢。</font>

前言之分层 API

Flink 依据形象水平分层,提供了三种不同的 API。每一种 API 在简洁性和表达力上有着不同的偏重,并且针对不同的利用场景。

  • ProcessFunction 是 Flink 所提供最底层接口。ProcessFunction 能够解决一或两条输出数据流中的单个事件或者纳入一个特定窗口内的多个事件。它提供了对于工夫和状态的细粒度管制。开发者能够在其中任意地批改状态,也可能注册定时器用以在将来的某一时刻触发回调函数。因而,你能够利用 ProcessFunction 实现许多有状态的事件驱动利用所须要的基于单个事件的简单业务逻辑。
  • DataStream API 为许多通用的流解决操作提供了解决原语。这些操作包含窗口、逐条记录的转换操作,在处理事件时进行内部数据库查问等。DataStream API 反对 Java 和 Scala 语言,事后定义了例如 map()、reduce()、aggregate() 等函数。你能够通过扩大实现预约义接口或应用 Java、Scala 的 lambda 表达式实现自定义的函数。
  • SQL & Table API:Flink 反对两种关系型的 API,Table API 和 SQL。这两个 API 都是批处理和流解决对立的 API,这意味着在无边界的实时数据流和有边界的历史记录数据流上,关系型 API 会以雷同的语义执行查问,并产生雷同的后果。Table API 和 SQL 借助了 Apache Calcite 来进行查问的解析,校验以及优化。它们能够与 DataStream 和 DataSet API 无缝集成,并反对用户自定义的标量函数,聚合函数以及表值函数。
  • 扩大库

    • 简单事件处理(CEP):模式检测是事件流解决中的一个十分常见的用例。Flink 的 CEP 库提供了 API,使用户可能以例如正则表达式或状态机的形式指定事件模式。CEP 库与 Flink 的 DataStream API 集成,以便在 DataStream 上评估模式。CEP 库的利用包含网络入侵检测,业务流程监控和欺诈检测。
    • DataSet API:DataSet API 是 Flink 用于批处理应用程序的外围 API。DataSet API 所提供的根底算子包含 map、reduce、(outer) join、co-group、iterate 等。所有算子都有相应的算法和数据结构反对,对内存中的序列化数据进行操作。如果数据大小超过预留内存,则适量数据将存储到磁盘。Flink 的 DataSet API 的数据处理算法借鉴了传统数据库算法的实现,例如混合散列连贯(hybrid hash-join)和内部归并排序(external merge-sort)。
    • Gelly: Gelly 是一个可扩大的图形处理和剖析库。Gelly 是在 DataSet API 之上实现的,并与 DataSet API 集成。因而,它可能受害于其可扩大且强壮的操作符。Gelly 提供了内置算法,如 label propagation、triangle enumeration 和 page rank 算法,也提供了一个简化自定义图算法实现的 Graph API。

一、DataStream 的编程模型

DataStream 的编程模型包含四个局部:Environment,DataSource,Transformation,Sink。

二、Flink 的 DataSource 数据源

2.1、基于文件,此处是 HDFS

package com.chb.flink.source

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FileSource {def main(args: Array[String]): Unit = {
        // 初始化 Flink 的 Streaming(流计算)上下文执行环境
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // 导入隐式转换,倡议写在这里,能够避免 IDEA 代码提醒出错的问题
        import org.apache.flink.streaming.api.scala._


        // 读取数据
        val stream = streamEnv.readTextFile("hdfs://10.0.0.201:9000/README.txt")
        // 转换计算
        val result: DataStream[(String, Int)] = stream.flatMap(_.split(","))
            .map((_, 1))
            .keyBy(0)
            .sum(1)
        // 打印后果到控制台
        result.print()
        // 启动流式解决,如果没有该行代码下面的程序不会运行
        streamEnv.execute("wordcount")
    }
}

2.2、基于汇合的源

有点像 Spark 的序列化

package com.chb.flink.source

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object CollectionSource {def main(args: Array[String]): Unit = {
        // 初始化 Flink 的 Streaming(流计算)上下文执行环境
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // 导入隐式转换,倡议写在这里,能够避免 IDEA 代码提醒出错的问题
        import org.apache.flink.streaming.api.scala._


        // 读取数据
        var dataStream = streamEnv.fromCollection(Array(new StationLog("001", "186", "189", "busy", 1577071519462L, 0),
            new StationLog("002", "186", "188", "busy", 1577071520462L, 0),
            new StationLog("003", "183", "188", "busy", 1577071521462L, 0),
            new StationLog("004", "186", "188", "success", 1577071522462L, 32)
        ))
        dataStream.print()
        streamEnv.execute()}
}

/*
    * 通信基站日志数据
    * @param sid 基站 ID
    * @param callOut 主叫号码
    * @param callIn 被叫号码
    * @param callType 通话类型 eg: 呼叫失败(fail),占线(busy), 拒接(barring),接通(success)
    * @param callTime 呼叫工夫戳,准确到毫秒
    * @Param duration 通话时长 单位:秒
*/
class StationLog(sid: String, callOut: String, callIn: String, callType: String, callTime: Long, duration: Long)

2.3、Kafka

首 先 需 要 配 置 Kafka 连 接 器 的 依 赖,另 外 更 多 的 连 接 器 可 以 查 看 官 网

2.3.1、引入依赖

        <!-- Kafka connector-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.10.1</version>
            <exclusions>
                <exclusion>
                    <!-- 排除对 Jackson 的援用;-->
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

2.3.2、Kafka 第一种 Source

package com.chb.flink.source

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer

object KafkaSourceByString {def main(args: Array[String]): Unit = {
        // 初始化 Flink 的 Streaming(流计算)上下文执行环境
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // 导入隐式转换
        import org.apache.flink.streaming.api.scala._

        // kafka 配置
        val props = new Properties()
        props.setProperty("bootstrap.servers", "ShServer:9092")
        props.setProperty("group.id", "chb01")
        props.setProperty("key.deserializer", classOf[StringDeserializer].getName)
        props.setProperty("value.deserializer", classOf[StringDeserializer].getName)
        props.setProperty("auto.offset.reset", "latest")

        // 设置 kafka 为数据源
        val flinkKafkaConSumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), props)
        val stream = streamEnv.addSource(flinkKafkaConSumer)
        stream.print()
        streamEnv.execute()}
}

2.3.3、Kafka 第二种 Source

package com.chb.flink.source

import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer

object KafkaSourceByKeyValue {def main(args: Array[String]): Unit = {
        // 初始化 Flink 的 Streaming(流计算)上下文执行环境
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // 导入隐式转换
        import org.apache.flink.streaming.api.scala._

        val props = new Properties()
        props.setProperty("bootstrap.servers", "ShServer:9092")
        props.setProperty("group.id", "fink02")
        props.setProperty("key.deserializer", classOf[StringDeserializer].getName)
        props.setProperty("value.deserializer", classOf[StringDeserializer].getName)
        props.setProperty("auto.offset.reset", "latest")
        // 设置 kafka 为数据源
        val stream = streamEnv.addSource(new
                FlinkKafkaConsumer[(String, String)]("test", new KafkaDeserializationSchema[(String, String)] {

                    // 流是否完结
                    override def isEndOfStream(t: (String, String)) = false

                    override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]) = {if (consumerRecord != null) {
                            var key = "null"
                            var value = "null"
                            if (consumerRecord.key() != null)
                                key = new String(consumerRecord.key(), "UTF-8")
                            if (consumerRecord.value() != null)
                                value = new String(consumerRecord.value(), "UTF-8")
                            (key, value)
                        } else { // 如果 kafka 中的数据为空返回一个固定的二元组
                            ("null", "null")
                        }
                    }

                    // 设置返回类型为二元组
                    override def getProducedType =
                        createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
                }
                    , props).setStartFromEarliest())
        stream.print()
        streamEnv.execute()}
}

2.3.3.1、Kafka 生产测试

package com.chb.flink.source

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import scala.util.Random

object MyKafkaProducer {def main(args: Array[String]): Unit = {val props = new Properties()
        props.setProperty("bootstrap.servers", "ShServer:9092")
        // 留神此处是序列化
        props.setProperty("key.serializer", classOf[StringSerializer].getName)
        props.setProperty("value.serializer", classOf[StringSerializer].getName)

        val producer = new KafkaProducer[String, String](props)

        val random = new Random()
        while(true) {producer.send(new ProducerRecord[String, String]("test", "key" + random.nextInt(), "value" + random.nextInt()))
            Thread.sleep(1000)
        }

    }

}

2.4、自定义 Source

自定义数据源,有两种形式实现:
 通过实现 SourceFunction 接口来自定义无并行度(也就是并行度只能为 1)的 Source。
 通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来自定义有并行度的数据源。

2.4.1、实现 SourceFunction 的自定义 Source

package com.chb.flink.source

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.util.Random

/**
 * 当然也能够自定义数据源,有两种形式实现:*  通过实现 SourceFunction 接口来自定义无并行度(也就是并行度只能为 1)的 Source。*  通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来自
 * 定义有并行度的数据源。* *
 * 写一个实现 SourceFunction 接口
 */
class MyCustomerSource extends SourceFunction[StationLog] {
    // 是否终止数据流的标记
    var flag = true;

    /**
     * 次要的办法
     * 启动一个 Source
     * 大部分状况下,都须要在这个 run 办法中实现一个循环,这样就能够循环产生数据了
     *
     * @param sourceContext * @throws Exception
     */
    override def run(sourceContext: SourceFunction.SourceContext[StationLog]):
    Unit = {val random = new Random()
        var types = Array("fail", "busy", "barring", "success")
        while (flag) { // 如果流没有终止,持续获取数据
            1.to(5).map(i => {var callOut = "1860000%04d".format(random.nextInt(10000))
                var callIn = "1890000%04d".format(random.nextInt(10000))
                new StationLog("station_" + random.nextInt(10), callOut, callIn, types(random.nextInt(4)), System.currentTimeMillis(), 0)
            }).foreach(sourceContext.collect(_)) // 发数据
            Thread.sleep(2000) // 每发送一次数据休眠 2 秒
        }
    }

    // 终止数据流
    override def cancel(): Unit = flag = false}

object CustomerSource {def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        import org.apache.flink.streaming.api.scala._

        val stream: DataStream[StationLog] = env.addSource(new MyCustomerSource)
        stream.print()
        env.execute()}
}

三、Flink 的 Sink 数据指标

Flink 针对 DataStream 提供了大量的曾经实现的数据指标(Sink),包含文件、Kafka、Redis、HDFS、Elasticsearch 等等。

3.1、HDFS Sink

3.1.1、配置反对 Hadoop FileSystem 的连接器依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.11</artifactId>
    <version>1.10.1</version>
</dependency>

3.1.2、Streaming File Sink

https://ci.apache.org/project…

  Streaming File Sink 能把数据写入 HDFS 中,还能够反对分桶写入,每一个 分桶 就对应 HDFS 中的一个目录。默认依照 小时来分桶 ,在一个桶外部,会进一步将输入基于滚动策略 切分成更小 的文件。这有助于避免桶文件变得过大。滚动策略也是能够配置的,默认策略会依据文件大小和超时工夫来滚动文件,超时工夫是指没有新数据写入局部文件(part file)的工夫。

3.1.2.1、滚动策略

  • DefaultRollingPolicy
  • CheckpointRollingPolicy

    3.1.2.2、分桶策略

    • DateTimeBucketAssigner : Default time based assigner
    • BasePathBucketAssigner : Assigner that stores all part files in the base path (single global bucket)

留神必须开启 checkpoint, 否则生成的文件都是 inprocess 状态

3.1.2.3、代码实现

package com.chb.flink.sink

import com.chb.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object HDFSFileSink {def main(args: Array[String]): Unit = {
        // 初始化 Flink 的 Streaming(流计算)上下文执行环境
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // 导入隐式转换
        import org.apache.flink.streaming.api.scala._
        
        // 启动 checkPoint, 否则,生成的文件都是 inprocess 状态的
        streamEnv.enableCheckpointing(1000)

        // 数据源
        val data: DataStream[StationLog] = streamEnv.addSource(new MyCustomerSource)

        // 创立一个文件滚动规定
        val rolling: DefaultRollingPolicy[StationLog, String] = DefaultRollingPolicy.create()
            .withInactivityInterval(2000) // 不流动的间隔时间。.withRolloverInterval(2000) // 每隔两秒生成一个文件,重要
            .build()

        // 创立一个 HDFS Sink
        var hdfsSink = StreamingFileSink.forRowFormat[StationLog](
            // 留神此处是 flink 的 Path
            new Path("hdfs://ShServer:9000/sink001/"), new SimpleStringEncoder[StationLog]("UTF-8"))
            .withBucketCheckInterval(1000) // 查看分桶的间隔时间
            //            .withBucketAssigner(new MemberBucketAssigner)
            .withRollingPolicy(rolling)
            .build()

        // 增加 sink
        data.addSink(hdfsSink)


        streamEnv.execute()}

    import org.apache.flink.core.io.SimpleVersionedSerializer
    import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer

    /**
     * 自定义分桶策略
     */
    class MemberBucketAssigner extends BucketAssigner[StationLog, String] {
        // 指定桶名 yyyy-mm-dd
        override def getBucketId(info: StationLog, context: BucketAssigner.Context): String = {val date = new Date(info.callTime)
            new SimpleDateFormat("yyyy-MM-dd/HH").format(date)
        }

        override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE
    }

}

3.2、基于 Redis 的 Sink

Flink 除了内置的连接器外,还有一些额定的连接器通过 Apache Bahir 公布,包含:
 Apache ActiveMQ (source/sink)
 Apache Flume (sink)
 Redis (sink)
 Akka (sink)
 Netty (source)

3.2.1、依赖

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

3.2.2、将后果写道 redis

package com.chb.flink.sink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object RedisSink {def main(args: Array[String]): Unit = {
        // 初始化 Flink 的 Streaming(流计算)上下文执行环境
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // 导入隐式转换,倡议写在这里,能够避免 IDEA 代码提醒出错的问题
        import org.apache.flink.streaming.api.scala._
        // 读取数据
        val stream = streamEnv.socketTextStream("hadoop01", 8888)
        // 转换计算
        val result = stream.flatMap(_.split(","))
            .map((_, 1))
            .keyBy(0)
            .sum(1)

        // 连贯 redis 的配置
        val config = new FlinkJedisPoolConfig.Builder().setDatabase(1).setHost("hadoop01").setPort(6379).build()
        // 写入 redis
        result.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] {
            override def getCommandDescription = new
                    RedisCommandDescription(RedisCommand.HSET, "t_wc")

            override def getKeyFromData(data: (String, Int)) = {data._1 // 单词}

            override def getValueFromData(data: (String, Int)) = {data._2 + "" // 单词呈现的次数}
        }))
        streamEnv.execute()}
}

3.3、Kafka Sink

3.3.1、第一种

package com.chb.flink.sink

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

/**
 * Kafka Sink
 */
object KafkaSinkByString {def main(args: Array[String]): Unit = {
        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1) // 默认状况下每个工作的并行度为 1
        import org.apache.flink.streaming.api.scala._

        // 读取 netcat 流中数据(实时流)val stream1: DataStream[String] = streamEnv.socketTextStream("hadoop01", 8888)

        // 转换计算
        val result = stream1.flatMap(_.split(","))

        // 数据写入 Kafka,并且是 KeyValue 格局的数据
        result.addSink(new FlinkKafkaProducer[String]("hadoop01:9092", "t_topic", new SimpleStringSchema()))
        streamEnv.execute()}
}

3.3.2、第二种

package com.chb.flink.sink

import java.lang
import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

/**
 * Kafka Sink
 */
object KafkaSinkByKeyValue {def main(args: Array[String]): Unit = {
        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1) // 默认状况下每个工作的并行度为 1
        import org.apache.flink.streaming.api.scala._

        // 读取 netcat 流中数据(实时流)val stream1: DataStream[String] = streamEnv.socketTextStream("hadoop01", 8888)

        // 转换计算
        val result = stream1.flatMap(_.split(","))
            .map((_, 1))
            .keyBy(0)
            .sum(1)

        //Kafka 生产者的配置
        val props = new Properties()
        props.setProperty("bootstrap.servers", "hadoop01:9092")
        props.setProperty("key.serializer", classOf[StringSerializer].getName)
        props.setProperty("value.serializer", classOf[StringSerializer].getName)

        // 数据写入 Kafka,并且是 KeyValue 格局的数据
        result.addSink(new FlinkKafkaProducer[(String, Int)]("t_topic",
            new KafkaSerializationSchema[(String, Int)] {override def serialize(element: (String, Int), aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {new ProducerRecord("t_topic", element._1.getBytes, (element._2 + "").getBytes())
                }
            }, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)) //EXACTLY_ONCE 准确一次
        streamEnv.execute()}
}

3.4、自定义 Sink

package com.chb.flink.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.chb.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/*
 * 从自定义的 Source 中读取 StationLog 数据,通过 Flink 写入 Mysql 数据库
 *
 * 当然你能够本人定义 Sink,有两种实现形式:* 1、实现 SinkFunction 接口。* 2、实现 RichSinkFunction 类。后者减少了生命周期的治理性能。* 比方须要在 Sink 初始化的时候创立连贯对象,则最好应用第二种。* 案例需要:把 StationLog 对象写入 Mysql 数据库中。*/
object CustomJdbcSink {

    // 自定义一个 Sink 写入 Mysql
    class MyCustomSink extends RichSinkFunction[StationLog] {
        var conn: Connection = _
        var pst: PreparedStatement = _

        // 生命周期治理,在 Sink 初始化的时候调用
        override def open(parameters: Configuration): Unit = {conn = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123123")
            pst = conn.prepareStatement("insert into t_station_log(sid, call_out, call_in, call_type, call_time, duration) values(?, ?, ?, ?, ?, ?)")
        }

        // 把 StationLog 写入到表 t_station_log
        override def invoke(value: StationLog, context: SinkFunction.Context[_]): Unit = {pst.setString(1, value.sid)
            pst.setString(2, value.callOut)
            pst.setString(3, value.callIn)
            pst.setString(4, value.callType)
            pst.setLong(5, value.callTime)
            pst.setLong(6, value.duration)
            pst.executeUpdate()}

        override def close(): Unit = {pst.close()
            conn.close()}
    }

    def main(args: Array[String]): Unit = {
        // 初始化 Flink 的 Streaming(流计算)上下文执行环境
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1)
        // 导入隐式转换,倡议写在这里,能够避免 IDEA 代码提醒出错的问题
        import org.apache.flink.streaming.api.scala._
        val data: DataStream[StationLog] = streamEnv.addSource(new
                MyCustomerSource)
        // 数据写入 msyql
        data.addSink(new MyCustomSink)
        streamEnv.execute()}
}

四、DataStream 转换算子

这个非常简单,看 api 就晓得

五、函数类和富函数类

上节的所有算子简直都能够自定义一个 函数类、富函数类 作为参数。因为 Flink 裸露了这两种函数类的接口,常见的函数接口有:

  • MapFunction
  • FlatMapFunction
  • ReduceFunction
  • 。。。。。

富函数 接口同其余惯例函数接口的不同在于:能够获取运行环境的上下文,在上下文环境中能够治理状态(State),并领有一些生命周期办法,所以能够实现更简单的性能。富函数的接口有:

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction
  • 。。。。。

    5.1、一般函数类举例:依照指定的工夫格局输入每个通话的拨号工夫和完结工夫

    5.2、富函数类举例:把呼叫胜利的通话信息转化成实在的用户姓名

    通话用户对应的用户表(在 Mysql 数据中)为:

    因为须要从数据库中查问数据,就须要创立连贯,创立连贯的代码必须写在生命周期的 open 办法中。所以须要应用富函数类。
    Rich Function 有一个生命周期的概念。典型的生命周期办法 有:

    • open()办法是 rich function 的初始化办法,当一个算子例如 map 或者 filter 被调用之前 open()会被调用。
    • close()办法是生命周期中的最初一个调用的办法,做一些清理工作。
  • getRuntimeContext()办法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,工作的名字,以及 state 状态
package com.chb.flink.func

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import java.text.SimpleDateFormat

import com.chb.flink.source.StationLog
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
 * 富函数类举例:把呼叫胜利的通话信息转化成实在的用户姓名
 */
object TestFunction {def main(args: Array[String]): Unit = {
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        // 隐式转换
        import org.apache.flink.streaming.api.scala._

        val data: DataStream[StationLog] = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
            .map(line => {val arr = line.split(",")
                new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
            })

        // 定义工夫输入格局
        val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        // 过滤那些通话胜利的
        data.filter(_.callType.equals("success"))
            .map(new CallMapFunction(format))
            .print()
        streamEnv.execute()}
}


// 自定义的富函数类
class CallRichMapFunction() extends RichMapFunction[StationLog, StationLog] {
    var conn: Connection = _
    var pst: PreparedStatement
    = _

    // 生命周期治理,初始化的时候创立数据连贯
    override def open(parameters: Configuration): Unit = {conn = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123456")
        pst = conn.prepareStatement("select name from t_phone where phone_number =?")
    }

    override def map(in: StationLog): StationLog = {
        // 查问主叫用户的名字
        pst.setString(1, in.callOut)
        val set1: ResultSet = pst.executeQuery()
        if (set1.next()) {in.callOut = set1.getString(1)
        }
        // 查问被叫用户的名字
        pst.setString(1, in.callIn)
        val set2: ResultSet = pst.executeQuery()
        if (set2.next()) {in.callIn = set2.getString(1)
        }
        in
    }

    // 敞开连贯
    override def close(): Unit = {pst.close()
        conn.close()}

}

六、底层 ProcessFunctionAPI

ProcessFunction 是一个 低层次的流解决操作,容许返回所有 Stream 的根底构建模块:

  • 拜访 Event 自身数据(比方:Event 的工夫,Event 的以后 Key 等)
  • 治理状态 State(仅在 Keyed Stream 中)

    • 治理定时器 Timer(包含:注册定时器,删除定时器等)

总而言之,ProcessFunction 是 Flink 最底层的 API,也是性能最弱小的。

例如:监控每一个手机,如果在 5 秒内呼叫它的通话都是失败的,收回正告信息。

package com.chb.flink.func

import java.text.SimpleDateFormat
import java.util.Date

import com.chb.flink.source.StationLog
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

/**
 * 监控每一个手机号,如果在 5 秒内呼叫它的通话都是失败的,收回正告信息
 * 在 5 秒中内只有有一个呼叫不是 fail 则不必正告
 */
object TestProcessFunction {def main(args: Array[String]): Unit = {
        // 初始化 Flink 的 Streaming(流计算)上下文执行环境
        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        // 导入隐式转换
        import org.apache.flink.streaming.api.scala._

        // 读取 socket 数据
        val data = streamEnv.socketTextStream("10.0.0.201", 8888)
            .map(line => {var arr = line.split(",")
                new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
            })
        // 解决数据
        data.keyBy(_.callOut)
            .process(new MonitorCallFail())
            .print()


        streamEnv.execute()}

    class MonitorCallFail() extends KeyedProcessFunction[String, StationLog, String] {
        // 定义一个状态记录时间
        lazy val timeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time", classOf[Long]))

        // 解决数据
        override def processElement(value: StationLog,
                                    context: KeyedProcessFunction[String, StationLog, String]#Context,
                                    collector: Collector[String]): Unit = {val time = timeState.value() // 从状态中取出工夫
            if (value.callType.equals("fail") && time == 0) { // 第一次失败
                // 获取以后工夫,注册定时器
                val now = context.timerService().currentProcessingTime()
                var onTime = now + 5000L // 5 秒后触发
                context.timerService().registerProcessingTimeTimer(onTime);
                println("first time:" + new Date())
                timeState.update(onTime)
            }

            // 有呼叫胜利,勾销触发器
            if (!value.callType.equals("fail") && time != 0) {context.timerService().deleteProcessingTimeTimer(time)
                timeState.clear()}


        }

        // 工夫到,执行触发器,收回告警
        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext,
                             out: Collector[String]): Unit = {val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
            var warnStr = "触发工夫:" + df.format(new Date(timestamp)) + "手机号:" + ctx.getCurrentKey
            out.collect(warnStr)
            timeState.clear()}

    }


}

七、侧输入流 Side Output

  在 flink 解决数据流时,咱们常常会遇到这样的状况:在解决一个数据源时,往往 须要将该源中的不同类型的数据做宰割解决

  • 如果应用 filter 算子对数据源进行筛选宰割的话,势必会造成数据流的屡次复制,造成不必要的性能节约;
  • 侧输入就是将数据流进行宰割,而不对流进行复制的一种分流机制。
  • flink 的侧输入的另一个作用就是对延时早退的数据进行解决,这样就能够不用抛弃早退的数据。

案例:依据基站的日志,请把呼叫胜利的 Stream(支流)和不胜利的 Stream(侧流)别离输入。

package com.chb.flink.func

import com.chb.flink.source.StationLog
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

/**
 * 把呼叫胜利的 Stream(支流)和不胜利的 Stream(侧流)别离输入。*/
object TestSideOutputStream {


    // 侧输入流首先须要定义一个流的标签 , 此处须要将隐式转换放在后面
    import org.apache.flink.streaming.api.scala._
    var notSuccessTag = new OutputTag[StationLog]("not_success")

    def main(args: Array[String]): Unit = {
        // 初始化 Flink 的 Streaming(流计算)上下文执行环境

        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)


        // 读取文件数据
        val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
            .map(line => {var arr = line.split(",")
                new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
            })
        val mainStream: DataStream[StationLog] = data.process(new CreateSideOutputStream(notSuccessTag))

        // 失去侧流
        val sideOutput: DataStream[StationLog] = mainStream.getSideOutput(notSuccessTag)
        mainStream.print("main")
        sideOutput.print("sideoutput")
        streamEnv.execute()}

    class CreateSideOutputStream(tag: OutputTag[StationLog]) extends ProcessFunction[StationLog, StationLog] {override def processElement(value: StationLog, ctx: ProcessFunction[StationLog, StationLog]#Context, out: Collector[StationLog]): Unit = {if (value.callType.equals("success")) { // 输入支流
                out.collect(value)
            } else { // 输入侧流
                ctx.output(tag, value)
            }
        }
    }

}

还有视频解说在我的 B 站 - 宝哥 chbxw, 心愿大家能够反对一下,谢谢。

Flink 目录导读

关注我的公众号【宝哥大数据】,更多干货。

正文完
 0