关于后端:如何使用-Flink-SQL-探索-GitHub-数据集|FlinkLearning-实战营

44次阅读

共计 7198 个字符,预计需要花费 18 分钟才能阅读完成。

作者|王洪顺(弘舜)

为进一步帮忙开发者学习应用 Flink,Apache Flink 中文社区近期发动 Flink-Learning 实战营我的项目。本次实战营通过实在乏味的实战场景帮忙开发者实操体验 Flink,课程包含实时数据接入、实时数据分析、实时数据利用的场景实。并联合小松鼠助教模式,全方位帮忙入营开发者轻松玩转 Flink,点击下方图片扫码即刻入营

本期将持续具体介绍 Flink- Learning 实战营。


想要理解如何应用 Flink 在 GitHub 中发现最热门的我的项目吗?本试验应用阿里云实时计算 Flink 版内置的 GitHub 公开事件数据集,通过 Flink SQL 实时摸索剖析 Github 公开数据集中暗藏的彩蛋!

实现本试验后,您将把握的常识有:

  • 理解 Flink 和流式计算的劣势
  • 对 Flink SQL 根底能力和 Flink 实时处理个性有初步体验

试验简介

通过 Flink 对 GitHub 的实时事件流进行剖析,并通过报表直观展现,理解 GitHub 的最新热门趋势、特定仓库或者组织的活跃度。

体验此场景后,能够对 Flink SQL 根底能力和 Flink 实时处理个性有直观的初步体验。

■ 为回馈宽广开源开发者对社区的反对,阿里云实时计算 Flink 版提供云原生收费试用资源

试验资源

本场景应用到的试验资源和配置如下:

阿里云实时计算 Flink 版

配置项 规格
Task Manger 个数 4 个
Task Manager CPU 2 外围
Task Manager Memory 8 GiB
Job Manager CPU 1 核
Job Manager Memory 2 GiB

体验指标

对 Flink SQL 根底能力和 Flink 实时处理个性有直观的初步体验。

背景常识

GitHub 公开数据集(GitHub Archive)是 GitHub 提供的一个凋谢数据汇合,它蕴含了每个公共仓库的事件数据,例如提交、拉取申请、问题和评论等。GitHub 公开数据集的数据能够用于进行各种类型的钻研和剖析,例如开源社区的合作状况、开发者的行为特色、编程语言的发展趋势等。使开发者们更好地理解 GitHub 上的流动和趋势,并从中取得有价值的信息和洞察。

本试验将 GitHub 公开数据集实时同步到 SLS 作为数据源,依据 Flink 对数据进行多种维度的剖析并且通过报表直观展现。

前置常识

  • 理解 Flink 相干的基础知识。
  • 理解 Flink SQL 相干的基础知识。

环境搭建

创立 Session 集群。进入阿里云控制台,抉择实时计算 Flink 版。而后抉择曾经购买的工作空间。

在开始阿里云实时计算 Flink 版作业编写前,须要先创立 Session 集群,只有创立了 Flink 集群,能力执行工作。

1. 点击系统管理 -> Session 集群 -> 创立 Session

2. 创立 Session 集群时设置为 SQL Preview 集群,这样无需设置 Sink,即可将 Select 语句的后果输入成图表的格局。

试验 1:Github 关注数排行榜

本试验统计从一周前起的 Github 关注度排行榜。

操作

1. 作业 SQL 代码。其中 startTime 尽量设置为以后此刻的一周前左近 ,如果设置的工夫太早,后面有效计算工夫比拟长,不仅消耗资源,而且很久能力加载出计算结果。 依据不同的地区设置相应的 project 和 endPoint,如实例为上海的服务平台,因而设置 ’project’ = ‘github-events-shanghai’ 和 ’endPoint’ = ‘https://cn-shanghai-intranet.log.aliyuncs.com’,其余地区如北京、杭州、深圳更改为对应值即可。

-- 通过 DDL 语句创立 SLS 源表,SLS 中寄存了 Github 的实时数据。CREATE TEMPORARY TABLE gh_event(
  id STRING,                                        -- 每个事件的惟一 ID。created_at BIGINT,                                -- 事件工夫,单位秒。created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件工夫戳(以后会话时区下的工夫戳,如:Asia/Shanghai)。type STRING,                                      -- Github 事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent 等。actor_id STRING,                                  -- Github 用户 ID。actor_login STRING,                               -- Github 用户名。repo_id STRING,                                   -- Github 仓库 ID。repo_name STRING,                                 -- Github 仓库名,如:apache/flink, apache/spark, alibaba/fastjson 等。org STRING,                                       -- Github 组织 ID。org_login STRING                                 -- Github 组织名,如:apache,google,alibaba 等。) WITH (
  'connector' = 'sls',                              -- 实时采集的 Github 事件寄存在阿里云 SLS 中。'project' = 'github-events-shanghai',                     -- 寄存公开数据的 SLS 我的项目。例如 'github-events-hangzhou'。'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com',                   -- 公开数据仅限阿里云实时计算 Flink 版通过私网地址拜访。例如 'https://cn-hangzhou-intranet.log.aliyuncs.com'。'logStore' = 'realtime-github-events',            -- 寄存公开数据的 SLS logStore。'accessId' =  '',         -- 只读账号的 AK。'accessKey'=' ',   -- 只读账号的 SK。'batchGetSize'='500',                           -- 批量读取数据,每批最多拉取 500 条。'startTime'='2023-06-01 14:00:00'              -- 开始工夫,尽量设置到须要计算的工夫左近,否则有效计算的工夫较长。默认值为以后值
);

-- 配置开启 mini-batch, 每 2s 解决一次。SET 'table.exec.mini-batch.enabled'='true'; 
SET 'table.exec.mini-batch.allow-latency'='2s'; 
SET 'table.exec.mini-batch.size'='4096'; 

-- 作业设置 4 个并发,聚合更快。SET 'parallelism.default' = '4';


-- 查看 Github 新增 star 数 Top 5 仓库。SELECT DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as `date`, repo_name, COUNT(*) as num
FROM gh_event WHERE type = 'WatchEvent' 
GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), repo_name
ORDER BY num DESC
LIMIT 5;

2. 验证 SQL 是否正确并且执行

3. 配置图表

​ a. 抉择 Y Bar 并且编辑标题栏为 Top 5

​ b. 配置 group by repo_name,order by num,即依据 repo_name 分组比拟数量

​ c. 试验能够始终运行,一直生产最新的数据。然而如果以后集群的 CPU 数配置的较少,不足以执行两个工作,又想执行下一个试验是,能够将本试验进行。点击后果左侧的红色方框即可。

后果

第一名:s0md3v/roop 视频换脸(最近我在 b 站也常常翻到)

第二名:pengzhile/pandora 潘多拉实现了网页版 ChatGPT 的次要操作

第三名:ClassmateLin/dm-ticket 大麦网抢票(疫情放开,预计上周演唱会很多)

第四名:ShishirPatil/gorilla 连贯海量 API 的大型语言模型

第五名:iperov/DeepFaceLive 换脸

由此可见最近一周最风行的 repo 就是 ai 视频换脸和大模型,最风行的畛域就是 ai

试验 2:统计组织活跃度变动

本试验统计 apache 和 alibaba 组织开源在从 24 小时前开始的活跃度趋势变动。

操作

1.SQL 代码如下。其中 startTime 尽量设置为以后此刻的 24 小时前左近,如果设置的工夫太早,后面有效计算工夫比拟长,不仅消耗资源,而且很久能力加载出计算结果。如果想要统计 alibaba, 改成 org_login =’alibaba’ 即可

CREATE TEMPORARY TABLE gh_event(
  id STRING,                                        -- 每个事件的惟一 ID。created_at BIGINT,                                -- 事件工夫,单位秒。created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件工夫戳(以后会话时区下的工夫戳,如:Asia/Shanghai)。type STRING,                                      -- Github 事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent 等。actor_id STRING,                                  -- Github 用户 ID。actor_login STRING,                               -- Github 用户名。repo_id STRING,                                   -- Github 仓库 ID。repo_name STRING,                                 -- Github 仓库名,如:apache/flink, apache/spark, alibaba/fastjson 等。org STRING,                                       -- Github 组织 ID。org_login STRING                                 -- Github 组织名,如:apache,google,alibaba 等。) WITH (
  'connector' = 'sls',                              -- 实时采集的 Github 事件寄存在阿里云 SLS 中。'project' = 'github-events-shanghai',                     -- 寄存公开数据的 SLS 我的项目。例如 'github-events-hangzhou'。'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com',                   -- 公开数据仅限阿里云实时计算 Flink 版通过私网地址拜访。例如 'https://cn-hangzhou-intranet.log.aliyuncs.com'。'logStore' = 'realtime-github-events',            -- 寄存公开数据的 SLS logStore。'accessId' =  '',         -- 只读账号的 AK。'accessKey'=' ',   -- 只读账号的 SK。'batchGetSize'='500',                           -- 批量读取数据,每批最多拉取 500 条。'startTime'='2023-06-07 14:00:00'               -- 开始工夫,尽量设置到须要计算的工夫左近,否则有效计算的工夫较长
);

-- 配置开启 mini-batch, 每 2s 解决一次。SET 'table.exec.mini-batch.enabled'='true';
SET 'table.exec.mini-batch.allow-latency'='2s';
SET 'table.exec.mini-batch.size'='4096';

-- 作业设置 4 个并发,聚合更快。SET 'parallelism.default' = '4';


-- 从一天前开始统计事件总量
SELECT NOW(), max(created_at_ts) as created_ts, COUNT(*) as event_count
FROM gh_event 
WHERE  org_login ='apache' and
created_at_ts >= NOW() - INTERVAL '1' DAY;

2. 点击执行,并且配置图表

​ a. 点击图表配置

​ b. 编辑题目为 ”apache”,并且抉择 X/Y Line

​ c. 配置 X 轴为 create_ts, y 轴为 event_count

后果

apache 作为全球性的开源组织,一天内的活跃度比拟平均,而阿里巴巴开源根本由国内开发者关注和奉献,夜间减少比拟平缓,在 9 点之后显著晋升。

试验 3: 统计仓库奉献工夫散布状况

本试验统计 flink 和 spark 开源仓库在从一周前前开始的奉献散布状况。奉献包含代码提交、commit 评论、issue 评论、提交 PR 申请、PR 申请的审查评论等与开发者相干的事件。

1. 作业 SQL 代码。其中 startTime 尽量设置为以后此刻的一周前左近,如果设置的工夫太早,后面有效计算工夫比拟长,不仅消耗资源,而且很久能力加载出计算结果。如果想要统计 spark, 改成 repo_name = ‘apache/flink” 即可。

CREATE TEMPORARY TABLE gh_event(
    id STRING,                                        -- 每个事件的惟一 ID。created_at BIGINT,                                -- 事件工夫,单位秒。created_at_ts as TO_TIMESTAMP(created_at*1000),   -- 事件工夫戳(以后会话时区下的工夫戳,如:Asia/Shanghai)。type STRING,                                      -- Github 事件类型,如:。ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent 等。actor_id STRING,                                  -- Github 用户 ID。actor_login STRING,                               -- Github 用户名。repo_id STRING,                                   -- Github 仓库 ID。repo_name STRING,                                 -- Github 仓库名,如:apache/flink, apache/spark, alibaba/fastjson 等。org STRING,                                       -- Github 组织 ID。org_login STRING                                 -- Github 组织名,如:apache,google,alibaba 等。) WITH (
  'connector' = 'sls',                              -- 实时采集的 Github 事件寄存在阿里云 SLS 中。'project' = 'github-events-shanghai',                     -- 寄存公开数据的 SLS 我的项目。例如 'github-events-hangzhou'。'endPoint' = 'https://cn-shanghai-intranet.log.aliyuncs.com',                   -- 公开数据仅限阿里云实时计算 Flink 版通过私网地址拜访。例如 'https://cn-hangzhou-intranet.log.aliyuncs.com'。'logStore' = 'realtime-github-events',            -- 寄存公开数据的 SLS logStore。'accessId' =  '',         -- 只读账号的 AK。'accessKey'=' ',   -- 只读账号的 SK。'batchGetSize'='500',                           -- 批量读取数据,每批最多拉取 500 条。'startTime'='2023-06-01 14:00:00'              -- 开始工夫,尽量设置到须要计算的工夫左近,否则有效计算的工夫较长
);

-- 配置开启 mini-batch, 每 2s 解决一次。SET 'table.exec.mini-batch.enabled'='true';
SET 'table.exec.mini-batch.allow-latency'='2s';
SET 'table.exec.mini-batch.size'='4096';

-- 作业设置 4 个并发,聚合更快。SET 'parallelism.default' = '4';


-- 配置开启 mini-batch, 每 2s 解决一次。SET 'table.exec.mini-batch.enabled'='true'; 
SET 'table.exec.mini-batch.allow-latency'='2s'; 
SET 'table.exec.mini-batch.size'='4096'; 

-- 作业设置 4 个并发,聚合更快。SET 'parallelism.default' = '4';

-- 统计从上周起的奉献量
SELECT  DATE_FORMAT(created_at_ts, 'yyyy-MM-dd') as comment_date, HOUR(created_at_ts) AS comment_hour ,COUNT(*) AS comment_count
FROM gh_event
WHERE created_at_ts >= NOW() - INTERVAL '7' DAY 
        AND repo_name = 'apache/flink'
       AND (type ='CommitCommentEvent' OR 
            type='IssueCommentEvent' or 
            type = 'PullRequestReviewCommentEvent'or 
            type = 'PushEvent' or 
            type = 'PullRequestEvent' or 
            type = 'PullRequestReviewEvent')
GROUP BY DATE_FORMAT(created_at_ts, 'yyyy-MM-dd'), HOUR(created_at_ts) ;

2. 点击执行,并且配置图表。抉择 Heatmap, 设置 Group by comment_date, Spli By comment_hour,Color 为 Sum(comment_count),即 X 轴为天,Y 周为小时,依据总数量显示色彩深浅。

想要理解更多对于如何在 GitHub 中发现最热门的我的项目的常识吗?快来尝试一下吧!

点击即刻入营


更多内容

流动举荐阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)理解流动详情:https://click.aliyun.com/m/1000372333/

正文完
 0