乐趣区

flink学习系列基础知识一

前言

最近因公司业务需求,需要使用到大数据分析。选择了 flink,第一次听说 flink 我也是很懵逼的状态,不过一段时间下来有了一点心得,在这里和大家分享分享。有很多描述不准确的,大家多提提意见。

1.flink 是什么,为什么要 flink?

其实大数据框架有很多,比如 Hadoop(批处理),Storm(流处理),Samza(流处理),Spark… 但是我们选择的是 flink,为什么呢?因为 flink 是“流式批处理”,flink 将每一项视作真正的数据流。Flink 提供的 DataStream API 可用于处理无尽的数据流。Flink 可配合使用的基本组件包括:

  • Stream(流)是指在系统中流转的,永恒不变的无边界数据集
  • Operator(操作方)是指针对数据流执行操作以产生其他数据流的功能
  • Source(源)是指数据流进入系统的入口点
  • Sink(槽)是指数据流离开 Flink 系统后进入到的位置,槽可以是数据库或到其他系统的连接器

说了这么多,我们做一个简单的 demo 来体验一下 flink:
假设我们在电商平台,需要近实时 (5min) 统计 (1h 内) 商品点击量的前三名。然后实时展示出来。如果使用 java, 我们需要做一个定时任务,监听商品点击事件,然后每 5min 使用 sql 计算一下 … 如果数据量小, 间隔时间比较长, 还比较好,如果数据量大, 间隔时间比较短 … 那服务器的压力就会贼大 … 但是使用 flink 会怎么样呢?先看下代码(40 几 W 条数据从阿里淘宝获取,github 上):

/**

  • Created by liuliang
  • on 2019/5/24

*/
public class HotItems {

public static void main(String[] args) throws Exception {

    // 创建 execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 告诉系统按照 EventTime 处理
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    // 为了打印到控制台的结果不乱序,配置全局的并发为 1,改变并发对结果正确性没有影响
    env.setParallelism(1);

    // URL fileUrl = HotItems.class.getClassLoader().getResource("D:\\mft\\codes\\flink-learnning\\src\\main\\java\\cn\\crawler\\mft\\UserBehavior.csv");
    Path filePath = Path.fromLocalFile(new File("D:\\mft\\codes\\flink-learnning\\src\\main\\java\\cn\\crawler\\mft\\UserBehavior.csv"));

    // 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo
    PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class);


    // 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序
    String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
    // 创建 PojoCsvInputFormat
    PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);


    env
            // 创建数据源,得到 UserBehavior 类型的 DataStream
            .createInput(csvInput, pojoType)
            // 抽取出时间和生成 watermark
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                @Override
                public long extractAscendingTimestamp(UserBehavior userBehavior) {
                    // 原始数据单位秒,将其转成毫秒
                    return userBehavior.timestamp * 1000;
                }
            })
            // 过滤出只有点击的数据
            .filter(new FilterFunction<UserBehavior>() {
                @Override
                public boolean filter(UserBehavior userBehavior) throws Exception {
                    // 过滤出只有点击的数据
                    return userBehavior.behavior.equals("pv");
                }
            })
            .keyBy("itemId")
            .timeWindow(Time.minutes(60), Time.minutes(5))
            .aggregate(new CountAgg(), new WindowResultFunction())
            .keyBy("windowEnd")
            .process(new TopNHotItems(3))
            .print();

    env.execute("Hot Items Job");

}



/** 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串 */
public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {

    private final int topSize;

    public TopNHotItems(int topSize) {this.topSize = topSize;}

    // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
    private ListState<ItemViewCount> itemState;

    @Override
    public void open(Configuration parameters) throws Exception {super.open(parameters);
        ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
                "itemState-state",
                ItemViewCount.class);
        itemState = getRuntimeContext().getListState(itemsStateDesc);
    }

    @Override
    public void processElement(
            ItemViewCount input,
            Context context,
            Collector<String> collector) throws Exception {

        // 每条数据都保存到状态中
        itemState.add(input);
        // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于 windowEnd 窗口的所有商品数据
        context.timerService().registerEventTimeTimer(input.windowEnd + 1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        // 获取收到的所有商品点击量
        List<ItemViewCount> allItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {allItems.add(item);
        }
        // 提前清除状态中的数据,释放空间
        itemState.clear();
        // 按照点击量从大到小排序
        allItems.sort(new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {return (int) (o2.viewCount - o1.viewCount);
            }
        });
        // 将排名信息格式化成 String, 便于打印
        StringBuilder result = new StringBuilder();
        result.append("====================================\n");
        result.append("时间:").append(new Timestamp(timestamp-1)).append("\n");
        for (int i=0; i<allItems.size() && i < topSize; i++) {ItemViewCount currentItem = allItems.get(i);
            // No1:  商品 ID=12224  浏览量 =2413
            result.append("No").append(i).append(":")
                    .append("商品 ID=").append(currentItem.itemId)
                    .append("浏览量 =").append(currentItem.viewCount)
                    .append("\n");
        }
        result.append("====================================\n\n");

        // 控制输出频率,模拟实时滚动结果
        Thread.sleep(1000);

        out.collect(result.toString());
    }
}




/** 用于输出窗口的结果 */
public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {

    @Override
    public void apply(
            Tuple key,  // 窗口的主键,即 itemId
            TimeWindow window,  // 窗口
            Iterable<Long> aggregateResult, // 聚合函数的结果,即 count 值
            Collector<ItemViewCount> collector  // 输出类型为 ItemViewCount
    ) throws Exception {Long itemId = ((Tuple1<Long>) key).f0;
        Long count = aggregateResult.iterator().next();
        collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
    }
}


/** COUNT 统计的聚合函数实现,每出现一条记录加一 */
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {

    @Override
    public Long createAccumulator() {return 0L;}

    @Override
    public Long add(UserBehavior userBehavior, Long acc) {return acc + 1;}

    @Override
    public Long getResult(Long acc) {return acc;}

    @Override
    public Long merge(Long acc1, Long acc2) {return acc1 + acc2;}
}


/** 商品点击量(窗口操作的输出类型) */
public static class ItemViewCount {
    public long itemId;     // 商品 ID
    public long windowEnd;  // 窗口结束时间戳
    public long viewCount;  // 商品的点击量

    public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {ItemViewCount result = new ItemViewCount();
        result.itemId = itemId;
        result.windowEnd = windowEnd;
        result.viewCount = viewCount;
        return result;
    }
}



/** 用户行为数据结构 **/
public static class UserBehavior {
    public long userId;         // 用户 ID
    public long itemId;         // 商品 ID
    public int categoryId;      // 商品类目 ID
    public String behavior;     // 用户行为, 包括("pv", "buy", "cart", "fav")
    public long timestamp;      // 行为发生的时间戳,单位秒
}

}

实时模拟的结果:

====================================
时间: 2017-11-26 09:05:00.0
No0:  商品 ID=5051027  浏览量 =3
No1:  商品 ID=3493253  浏览量 =3
No2:  商品 ID=4261030  浏览量 =3
====================================


====================================
时间: 2017-11-26 09:10:00.0
No0:  商品 ID=812879  浏览量 =5
No1:  商品 ID=2600165  浏览量 =4
No2:  商品 ID=2828948  浏览量 =4
====================================


====================================
时间: 2017-11-26 09:15:00.0
No0:  商品 ID=812879  浏览量 =7
No1:  商品 ID=138964  浏览量 =5
No2:  商品 ID=4568476  浏览量 =5
====================================


====================================
时间: 2017-11-26 09:20:00.0
No0:  商品 ID=812879  浏览量 =8
No1:  商品 ID=2338453  浏览量 =8
No2:  商品 ID=2563440  浏览量 =7
====================================

可以看到,我们用比较简单的代码,就实现了热点 TOP n 的问题. 可见 flink 使用起来还是很方便的(至少比 java 方便不少)。

2.flink 这么强大?为甚?

从上一个例子里面,我们已经初步体会到了 flink 的方便之处。我想从一下几个方面解释一下:

  1. 支持多种窗口
  2. 支持 table api【第二讲介绍】
  3. exactly-once (正好一次)【第二讲介绍】

1. 支持多种窗口
1.1 关于 flink 窗口我手动画了一个简单的图:

1.2flink 窗口函数
窗口函数就是这四个:ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction.(当然也可以自定义 window)

3.flink 工作流程?

dataSource ->  DataTransformation(*)  ->dataSink

3.1 登陆监控 demo 了解 dataSource 和 dataSink

    dataSource:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source
        自定义 source:
        a:flink 提供了很多定义好的 sourceFunction 比如 Kafka,RabbitMq,Mysql...
        b:StreamExecutionEnvironment.addSource(sourceFunction) 自己写 sourceFunction 
          (实现 ParallelSourceFunction / RichParallelSourceFunction)
    dataSink:
        写入文件、打印出来、写入 socket、自定义的 sink 
        自定义的 sink 
        a: 同理,dataSink 提供了很多定义好的 dataSink...
        b: 自定义 dataSink

3.2 DataTransformation(*)

    简单的 Transformation 示意图【图 2】Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce /
    Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。hello-demo
    注【1】

4.flink 在我们测试环境上集成的 demo

    1: 登陆异地监控 (讲清楚架构关系)
    2: 代理树
    

5.flink 怎么发布?web 操作界面简单介绍。

打 jar 包,设置参数(并发度,main 函数等),上传


注:
【1】
map 就是做一些映射,比如我们把两个字符串合并成一个字符串,把一个字符串拆成两个或者三个字符串。
flatMap 类似于把一个记录拆分成两条、三条、甚至是四条记录, 例如把一个字符串分割成一个字符数组。
Filter 就类似于过滤。
keyBy 就等效于 SQL 里的 group by。
aggregate 是一个聚合操作,如计数、求和、求平均等。
reduce 就类似于 MapReduce 里的 reduce。
join 操作就有点类似于我们数据库里面的 join。
connect 实现把两个流连成一个流。
repartition 是一个重新分区操作(还没研究)。
project 操作就类似于 SQL 里面的 snacks(还没研究)

【以上涉及到的代码,我已经上传到 github 上面:https://github.com/iamcrawler…】

退出移动版