关于flink:小白从零开始学ApacheFlink

45次阅读

共计 17602 个字符,预计需要花费 45 分钟才能阅读完成。

1. Apache Flink 介绍

起源:http://www.54tianzhisheng.cn/…

Apache Flink 是近年来越来越风行的一款开源大数据计算引擎,它同时反对了批处理和流解决,也能用来做一些基于事件的利用。应用官网的一句话来介绍 Flink 就是 “Stateful Computations Over Streams”

首先 Flink 是一个 纯流式的计算引擎 ,它的根本数据模型是数据流。流能够是无边界的有限流,即个别意义上的流解决。也能够是有边界的无限流,这样就是批处理。因而 Flink 用一套架构同时反对了流解决和批处理。 其次 ,Flink 的一个劣势是反对 有状态的计算。如果解决一个事件(或一条数据)的后果只跟事件自身的内容无关,称为无状态解决;反之后果还和之前解决过的事件无关,称为有状态解决。略微简单一点的数据处理,比如说根本的聚合,数据流之间的关联都是有状态解决。

  • 无穷数据集:无穷的继续集成的数据汇合
  • 有界数据集:无限不会扭转的数据汇合

那么那些常见的无穷数据集有哪些呢?

  • 用户与客户端的实时交互数据
  • 利用实时产生的日志
  • 金融市场的实时交易记录

数据运算模型有哪些呢:

  • 流式:只有数据始终在产生,计算就继续地进行
  • 批处理:在事后定义的工夫内运行计算,当实现时开释计算机资源

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-i3IYaQm9-1595768814163)(https://ws3.sinaimg.cn/large/…]

2. What is Flink?

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-1KK2dXk1-1595768814167)(https://ws3.sinaimg.cn/large/…]

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-8uDNlF7v-1595768814169)(https://ws2.sinaimg.cn/large/…]

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-W7z6hQI9-1595768814171)(https://ws4.sinaimg.cn/large/…]

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-4mHt3Iq8-1595768814173)(/Applications/Typora.app/Contents/Resources/TypeMark/Docs/img/006tNbRwly1fw6nu5yishj31kw0w04cm.jpg)]

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-qGWVgGhG-1595768814174)(https://ws2.sinaimg.cn/large/…]

从下至上:

1、部署:Flink 反对本地运行、能在独立集群或者在被 YARN 或 Mesos 治理的集群上运行,也能部署在云上。

2、运行:Flink 的外围是分布式流式数据引擎,意味着数据以一次一个事件的模式被解决。

3、API:DataStream、DataSet、Table、SQL API。

4、扩大库:Flink 还包含用于简单事件处理,机器学习,图形处理和 Apache Storm 兼容性的专用代码库。

3. Flink 数据流编程模型

1. 形象级别

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-yZWEXSka-1595768814175)(https://ws2.sinaimg.cn/large/…]

  • 最底层提供了有状态流。它将通过 过程函数(Process Function)嵌入到 DataStream API 中。它容许用户能够自在地解决来自一个或多个流数据的事件,并应用统一、容错的状态。除此之外,用户能够注册事件工夫和处理事件回调,从而使程序能够实现简单的计算。
  • DataStream / DataSet API 是 Flink 提供的外围 API,DataSet 解决有界的数据集,DataStream 解决有界或者无界的数据流。用户能够通过各种办法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。
  • Table API 是以 为核心的申明式 DSL,其中表可能会动态变化(在表白流数据时)。Table API 提供了例如 select、project、join、group-by、aggregate 等操作,应用起来却更加简洁(代码量更少)。

你能够在表与 DataStream/DataSet 之间无缝切换,也容许程序将 Table APIDataStream 以及 DataSet 混合应用。

  • Flink 提供的最高层级的形象是 SQL。这一层形象在语法与表达能力上与 Table API 相似,然而是以 SQL 查问表达式的模式体现程序。SQL 形象与 Table API 交互亲密,同时 SQL 查问能够间接在 Table API 定义的表上执行。

2. Flink 程序与数据流构造

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-j4U1eMhC-1595768814176)(https://ws1.sinaimg.cn/large/…]

Flink 应用程序构造就是如上图所示:

1、Source: 数据源,Flink 在流解决和批处理上的 source 大略有 4 类:基于本地汇合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也能够定义本人的 source。

2、Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,能够将数据转换计算成你想要的数据。

3、Sink:接收器,Flink 将转换计算后的数据发送的地点,你可能须要存储下来,Flink 常见的 Sink 大略有如下几类:写入文件、打印进去、写入 socket、自定义的 sink。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也能够定义本人的 sink。

4. 为什么抉择 Flink

  1. Flink 在 JVM 中提供了本人的内存治理,使其独立于 Java 的默认垃圾收集器。它通过应用散列,索引,缓存和排序无效地进行内存治理。
  2. Flink 领有丰盛的库来进行机器学习,图形处理,关系数据处理等。因为其架构,很容易执行简单的事件处理和警报
  3. Flink 能满足高并发和低提早(计算大量数据很快)。下图显示了 Apache Flink 与 Apache Storm 在实现流数据荡涤的分布式工作的性能比照。

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-4Tz386WK-1595768814177)(https://ws3.sinaimg.cn/large/…]

  1. Flink 保障状态化计算强一致性。”状态化“意味着利用能够保护随着时间推移曾经产生的数据聚合或者,并且 Filnk 的检查点机制在一次失败的事件中一个利用状态的强一致性。
  2. Flink 反对流式计算和带有事件工夫语义的视窗。事件工夫机制使得那些事件无序达到甚至提早达到的数据流可能计算出准确的后果。
  3. 除了提供数据驱动的视窗外,Flink 还反对基于工夫,计数,session 等的灵便视窗。视窗可能用灵便的触发条件定制化从而达到对简单的流传输模式的反对。Flink 的视窗使得模仿实在的创立数据的环境成为可能
  4. Flink 的容错能力是轻量级的,容许零碎放弃高并发,同时在雷同工夫内提供强一致性保障。Flink 以零数据失落的形式从故障中复原,但没有思考可靠性和提早之间的折衷。
  5. Flink 保留点提供了一个状态化的版本机制,使得能以无失落状态和最短停机工夫的形式更新利用或者回退历史数据。

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-6QPM1ZUj-1595768814178)(https://ws1.sinaimg.cn/large/…]

  1. ……..

5. 搭建 Flink 环境并运行简略程序

1. 装置 Flink

通过 homebrew 来装置。

brew install apache-flink

2. 查看是否装置胜利

flink -v

3. 启动 Flink

进入到:/usr/local/Cellar/apache-flink/1.7.1/libexec/bin

./start-cluster.sh

4. 启动胜利

拜访 http://localhost:8081/#/overview

5. 新建 maven 我的项目

GroupId=org.apache.flink
ArtifactId=flink-quickstart-java
Version=1.6.1

  1. 创立一个 SocketTextStreamWordCount 文件,退出以下代码:
package study;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author wangjun
 * @date 2019/03/25
 */
public class SocketTextStreamWordCount {public static void main(String[] args) throws Exception {
        // 参数查看
        if (args.length != 2) {System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }

        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);


        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取数据
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        // 计数
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {String[] tokens = s.toLowerCase().split("\\W+");

            for (String token: tokens) {if (token.length() > 0) {collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}
  1. 接着进入工程目录,应用以下命令打包

mvn clean package -Dmaven.test.skip=true

  1. 开启监听 9000 端口:

nc -l 9000

  1. 进入 flink 装置目录 bin 下执行以下命令跑程序

flink run -c study.SocketTextStreamWordCount /Users/wangjun/timwang/codeArea/study/target/original-study-1.0-SNAPSHOT.jar 127.0.0.1 9000

  1. 咱们能够在 webUI 中看到正在运行的程序

  1. 能够在 nc 监听端口中输出 text,比方

  1. 而后咱们通过 tail 命令看一下输入的 log 文件,来察看统计后果。进入目录 apache-flink/1.6.0/libexec/log,执行以下命令:

tail -f flink-wangjun-taskexecutor-0-wangjundeMBP.out

6. Data Source 介绍

数据起源,Flink 做为一款流式计算框架,它可用来做批处理,即解决动态的数据集、历史的数据集;也能够用来做流解决,即实时的解决些实时数据流,实时的产生数据流后果,只有数据源源不断的过去,Flink 就可能始终计算上来,这个 Data Sources 就是数据的起源地。

Flink 中你能够应用 StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序增加数据起源。

Flink 曾经提供了若干实现好了的 source functions,当然你也能够通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩大 RichParallelSourceFunction 来自定义并行的 source,

1. Flink 数据起源

StreamExecutionEnvironment 中能够应用以下几个已实现的 stream sources,

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-9Jk36Myp-1595768814183)(https://ws4.sinaimg.cn/large/…]

a. 基于汇合

1、fromCollection(Collection) – 从 Java 的 Java.util.Collection 创立数据流。汇合中的所有元素类型必须雷同。

2、fromCollection(Iterator, Class) – 从一个迭代器中创立数据流。Class 指定了该迭代器返回元素的类型。

3、fromElements(T …) – 从给定的对象序列中创立数据流。所有对象类型必须雷同。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Person> input = env.fromElements(new Person(1, "name", 12),
        new Person(2, "name2", 13),
        new Person(3, "name3", 14)
);

4、fromParallelCollection(SplittableIterator, Class) – 从一个迭代器中创立并行数据流。Class 指定了该迭代器返回元素的类型。

5、generateSequence(from, to) – 创立一个生成指定区间范畴内的数字序列的并行数据流。

b. 基于文件

1、readTextFile(path) – 读取文本文件,即合乎 TextInputFormat 标准的文件,并将其作为字符串返回。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

2、readFile(fileInputFormat, path) – 依据指定的文件输出格局读取文件(一次)。

3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) – 这是下面两个办法外部调用的办法。它依据给定的 fileInputFormat 和读取门路读取文件。依据提供的 watchType,这个 source 能够定期(每隔 interval 毫秒)监测给定门路的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者解决一次门路对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你能够通过 pathFilter 进一步排除掉须要解决的文件。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

c. 基于 Socket:

socketTextStream(String hostname, int port) – 从 socket 读取。元素能够用分隔符切分。

d. 自定义

addSource – 增加一个新的 source function。例如,你能够 addSource(new FlinkKafkaConsumer011<>(…)) 以从 Apache Kafka 读取数据

package study;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

/**
 * @author wangjun
 * @date 2019/3/25
 */
public class SourceFromMySQL extends RichSourceFunction<Student> {
    private PreparedStatement ps;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {super.open(parameters);
        connection = getConnection();
        String sql = "select * from student;";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {super.close();
        if (connection != null) {connection.close();
        }
        if (ps != null) {ps.close();
        }
    }

    @Override
    public void run(SourceContext<Student> ctx) throws Exception {ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            Student student = new Student(resultSet.getInt("id"),
                    resultSet.getString("name").trim(),
                    resultSet.getString("password").trim(),
                    resultSet.getInt("age"));
            ctx.collect(student);
        }
    }

    @Override
    public void cancel() {}

    private static Connection getConnection() {
        Connection con = null;
        try {Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/cloud?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
        } catch (Exception e) {System.out.println("-----------mysql get connection has exception , msg ="+ e.getMessage());
        }
        return con;
    }
}
package study;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author wangjun
 * @date 2019/3/25
 */
public class MySqlSourceMain {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new SourceFromMySQL()).print();

        env.execute("Flink add data source");
    }
}

2. 几种数据源特点

1、基于汇合:有界数据集,更偏差于本地测试用

2、基于文件:适宜监听文件批改并读取其内容

3、基于 Socket:监听主机的 host port,从 Socket 中获取数据

4、自定义 addSource:大多数的场景数据都是无界的,会源源不断的过去。比方去生产 Kafka 某个 topic 上的数据,这时候就须要用到这个 addSource,可能因为用的比拟多的起因吧,Flink 间接提供了 FlinkKafkaConsumer011 等类可供你间接应用。你能够去看看 FlinkKafkaConsumerBase 这个根底类,它是 Flink Kafka 生产的最基本的类。

[外链图片转存失败, 源站可能有防盗链机制, 倡议将图片保留下来间接上传(img-SS0BBXqM-1595768814185)(https://ws4.sinaimg.cn/large/…]

7. 实例 - 计算热门商品数据

本案例将实现一个“实时热门商品”的需要,每隔 5 分钟输入最近一小时内点击量最多的前 N 个商品。将这个需要进行合成
咱们大略要做这么几件事件:

• 抽取出业务工夫戳,通知 Flink 框架基于业务工夫做窗口

• 过滤出点击行为数据

• 按一小时的窗口大小,每 5 分钟统计一次,做滑动窗口聚合(Sliding Window)

• 按每个窗口聚合,输入每个窗口中点击量前 N 名的商品

1. 数据筹备

这里咱们筹备了一份淘宝用户行为数据集(来自阿里云天池公开数据集)。本数据集蕴含了淘宝上某一天随机一百万用户的所有行为(包含点击、购买、加购、珍藏)。数据集的组织模式和 MovieLens-20M 相似,即数据集的每一行示意一条用户行为,由用户 ID、商品 ID、商品类目 ID、行为类型和工夫戳组成,并以逗号分隔。对于数据集中每一列的详细描述如下:

curl https://raw.githubusercontent… UserBehavior.csv

2. 开发

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 为了打印到控制台的后果不乱序,咱们配置全局的并发为 1,这里扭转并发对后果正确性没有影响
env.setParallelism(1);


// UserBehavior.csv 的本地文件门路
URL fileUrl = HotItems.class.getClassLoader().getResource("UserBehavior.csv");
Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
// 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo
PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>)
        TypeExtractor.createTypeInfo(UserBehavior.class);
// 因为 Java 反射抽取出的字段程序是不确定的,须要显式指定下文件中字段的程序
String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
// 创立 PojoCsvInputFormat
PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);

// PojoCsvInputFormat 创立输出源。DataStream<UserBehavior> dataSource = env.createInput(csvInput, pojoType);

3. 创立模仿数据源

CsvInputFormat 创立模仿数据源。咱们先创立一个 UserBehavior 的 POJO 类(所有成员变量申明成 public 便是 POJO 类),强类
型化后能不便后续的解决

package study.goods;

import java.io.Serializable;

/**
 * @author wangjun
 * @date 2019/3/26
 */
public class UserBehavior implements Serializable {
    /**
     * 用户 ID
     */
    public long userId;
    /**
     * 商品 ID
     */
    public long itemId;
    /**
     * 商品类目 ID
     */
    public int categoryId;
    /**
     * 用户行为, 包含("pv", "buy", "cart", "fav")
     */
    public String behavior;
    /**
     * 行为产生的工夫戳,单位秒
     */
    public long timestamp;

    public long getUserId() {return userId;}

    public void setUserId(long userId) {this.userId = userId;}

    public long getItemId() {return itemId;}

    public void setItemId(long itemId) {this.itemId = itemId;}

    public int getCategoryId() {return categoryId;}

    public void setCategoryId(int categoryId) {this.categoryId = categoryId;}

    public String getBehavior() {return behavior;}

    public void setBehavior(String behavior) {this.behavior = behavior;}

    public long getTimestamp() {return timestamp;}

    public void setTimestamp(long timestamp) {this.timestamp = timestamp;}
}

​ 当咱们说“统计过来一小时内点击量”,这里的“一小时”是指什么呢?在 Flink 中它能够是指 ProcessingTime,也能够是 EventTime,由用户决定。
​ • ProcessingTime:事件被解决的工夫。也就是由机器的零碎工夫来决定。
​ • EventTime:事件产生的工夫。个别就是数据自身携带的工夫。
在本案例中,咱们须要统计业务工夫上的每小时的点击量,所以要基于 EventTime 来解决。那么如果让 Flink 依照咱们想要的业务工夫来解决呢?这里次要有两件事件要做。

​ 第一件是通知 Flink 咱们当初依照 EventTime 模式进行解决,Flink 默认应用 ProcessingTime 解决,所以咱们要显式设置下。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

​ 第二件事件是指定如何取得业务工夫,以及生成 Watermark。Watermark 是用来追踪业务事件的概念,能够了解成 EventTime 世界中的时钟,用来批示以后解决到什么时刻的数据了。因为咱们的数据源的数据曾经通过整顿,没有乱序,即事件的工夫戳是枯燥递增的,所以能够将每条数据的业务工夫就当做 Watermark。这里咱们用 AscendingTimestampExtractor 来实现工夫戳的抽取和 Watermark 的生成。

// 这样咱们就失去了一个带有工夫标记的数据流了,前面就能做一些窗口的操作
DataStream<UserBehavior> timedData = dataSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
    @Override
    public long extractAscendingTimestamp(UserBehavior userBehavior) {
        // 原始数据单位秒,将其转成毫秒
        return userBehavior.timestamp * 1000;
    }
});

在开始窗口操作之前,先回顾下需要“每隔 5 分钟输入过来一小时内点击量最多的前 N 个商品”。因为原始数据中存在点击、加购、购买、珍藏各种行为的数据,然而咱们只须要统计点击量,所以先应用 FilterFunction 将点击行为数据过滤出来

DataStream<UserBehavior> pvData = timedData.filter(new FilterFunction<UserBehavior>() {
    @Override
    public boolean filter(UserBehavior userBehavior) throws Exception {
    // 过滤出只有点击的数据
        return userBehavior.behavior.equals("pv");
    }
});

因为要每隔 5 分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔 5 分钟滑动一次。即别离要统计 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的商品点击量。是一个常见的滑动窗口需要(Sliding Window)

DataStream<ItemViewCount> windowedData = pvData.keyBy("itemId")
        .timeWindow(Time.minutes(60), Time.minutes(5))
        .aggregate(new CountAgg(), new WindowResultFunction());

咱们应用.keyBy(“itemId”)对商品进行分组,应用.timeWindow(Time size, Time slide)对每个 商 品 做 滑 动 窗 口(1 小 时 窗 口,5 分 钟 滑 动 一 次)。然 后 我 们 应用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能应用 AggregateFunction 提前聚合掉数据,缩小 state 的存储压力。较之.apply(WindowFunction wf)会将窗口中的数据都存储下来,最初一起计算要高效地多。aggregate()办法的第一个参数用于
这里的 CountAgg 实现了 AggregateFunction 接口,性能是统计窗口中的条数,即遇到一条数据就加一。

package study.goods;

import org.apache.flink.api.common.functions.AggregateFunction;

/**
 * COUNT 统计的聚合函数实现,每呈现一条记录加一 
 * @author wangjun
 * @date 2019/3/26
 */
public class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
    @Override
    public Long createAccumulator() {return 0L;}

    @Override
    public Long add(UserBehavior userBehavior, Long acc) {return acc + 1;}

    @Override
    public Long getResult(Long acc) {return acc;}

    @Override
    public Long merge(Long acc1, Long acc2) {return acc1 + acc2;}
}

aggregate(AggregateFunction af, WindowFunction wf)的第二个参数 WindowFunction 将每个 key 每个窗口聚合后的后果带上其余信息进行输入。咱们这里实现的 WindowResultFunction 将主键商品 ID,窗口,点击量封装成了 ItemViewCount 进行输入。

package study.goods;

/**
 * @author wangjun
 * @date 2019/3/26
 */
public class ItemViewCount {
    public long itemId; // 商品 ID
    public long windowEnd; // 窗口完结工夫戳
    public long viewCount; // 商品的点击量
    public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {ItemViewCount result = new ItemViewCount();
        result.itemId = itemId;
        result.windowEnd = windowEnd;
        result.viewCount = viewCount;
        return result;
    }
}

当初咱们失去了每个商品在每个窗口的点击量的数据流。

为了统计每个窗口下最热门的商品,咱们须要再次按窗口进行分组,这里依据 ItemViewCount 中的 windowEnd 进行 keyBy()操作。而后应用 ProcessFunction 实现一个自定义的 TopN 函数 TopNHotItems 来计算点击量排名前 3 名的商品,并将排名后果格式化成字符串,便于后续输入。

DataStream<String> topItems = windowedData
        .keyBy("windowEnd")
        .process(new TopNHotItems(3)); // 求点击量前 3 名的商品

ProcessFunction 是 Flink 提供的一个 low-level API,用于实现更高级的性能。它次要提供了定时器 timer 的性能(反对 EventTime 或 ProcessingTime)。本案例中咱们将利用 timer 来判断何时收齐了某个 window 下所有商品的点击量数据。因为 Watermark 的进度是全局的,

在 processElement 办法中,每当收到一条数据(ItemViewCount),咱们就注册一个 windowEnd+ 1 的定时器(Flink 框架会主动疏忽同一时间的反复注册)。windowEnd+1 的定时器被触发时,意味着收到了 windowEnd+1 的 Watermark,即收齐了该 windowEnd 下的所有商品窗口统计值。我
们在 onTimer()中解决将收集的所有商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输入。

这里咱们还应用了 ListState<ItemViewCount> 来存储收到的每条 ItemViewCount 音讯,保障在产生故障时,状态数据的不失落和一致性。ListState 是 Flink 提供的相似 Java List 接口的 State API,它集成了框架的 checkpoint 机制,主动做到了 exactly-once 的语义保障

package study.goods;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;


/**
 * @author wangjun
 * @date 2019/3/26
 */
public class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
    private final int topSize;
    // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
    private ListState<ItemViewCount> itemState;

    public TopNHotItems(int topSize) {this.topSize = topSize;}

    @Override
    public void open(Configuration parameters) throws Exception {super.open(parameters);
        // 状态的注册
        ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
                "itemState-state",
                ItemViewCount.class);
        itemState = getRuntimeContext().getListState(itemsStateDesc);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        // 获取收到的所有商品点击量
        List<ItemViewCount> allItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {allItems.add(item);
        }
        // 提前革除状态中的数据,开释空间
        itemState.clear();
        // 依照点击量从大到小排序
        allItems.sort(new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {return (int) (o2.viewCount - o1.viewCount);
            }
        });
        // 将排名信息格式化成 String, 便于打印
        StringBuilder result = new StringBuilder();
        result.append("====================================\n");
        result.append("工夫:").append(new Timestamp(timestamp - 1)).append("\n");
        for (int i = 0; i < topSize; i++) {ItemViewCount currentItem = allItems.get(i);
            // No1: 商品 ID=12224 浏览量 =2413
            result.append("No").append(i).append(":")
                    .append("商品 ID=").append(currentItem.itemId)
                    .append("浏览量 =").append(currentItem.viewCount)
                    .append("\n");
        }
        result.append("====================================\n\n");
        out.collect(result.toString());
    }

    @Override
    public void processElement(ItemViewCount input, Context context, Collector<String> collector) throws Exception {
        // 每条数据都保留到状态中
        itemState.add(input);
        // 注册 windowEnd+1 的 EventTime Timer, 当触发时,阐明收齐了属于 windowEnd 窗口的所有商品数据
        context.timerService().registerEventTimeTimer(input.windowEnd + 1);
    }
}

正文完
 0