乐趣区

关于flink:Flink-SQL-实战HBase-的结合应用

本文次要介绍 HBase 和 Flink SQL 的联合应用。HBase 作为 Google 发表 Big Table 论文的开源实现版本,是一种分布式列式存储的数据库,构建在 HDFS 之上的 NoSQL 数据库,非常适合大规模实时查问,因而 HBase 在实时计算畛域应用十分宽泛。能够实时写 HBase,也能够利用 buckload 一把把离线 Job 生成 HFile Load 到 HBase 表中。而当下 Flink SQL 的炽热水平不必多说,Flink SQL 也为 HBase 提供了 connector,因而 HBase 与 Flink SQL 的联合十分有必要实际实际。

当然,本文假如用户有肯定的 HBase 常识根底,不会具体去介绍 HBase 的架构和原理,本文着重介绍 HBase 和 Flink 在理论场景中的联合应用。次要分为两种场景,第一种场景:HBase 作为维表与 Flink Kafka table 做 temporal table join 的场景;第二种场景:Flink SQL 做计算之后的后果写到 HBase 表,供其余用户查问的场景。因而,本文介绍的内容如下所示:

· HBase 环境筹备
· 数据筹备
· HBase 作为维度表进行 temporal table join 的场景
· Flink SQL 做计算写 HBase 的场景
· 总结

一、HBase 环境筹备

因为没有测试的 HBase 环境以及为了防止净化线上 Hbase 环境。因而,本人 build 一个 Hbase docker image(大家能够 docker pull guxinglei/myhbase 拉到本地),是基于官网洁净的 ubuntu imgae 之上装置了 Hbase 2.2.0 版本以及 JDK1.8 版本。

启动容器,裸露 Hbase web UI 端口以及内置 zk 端口,不便咱们从 web 页面看信息以及创立 Flink Hbase table 须要 zk 的链接信息。

docker run -it --network=host -p 2181:2181 -p 60011:60011 docker.io/guxinglei/myhbase:latest bash

· 进入容器,启动 HBase 集群,以及启动 rest server,后续不便咱们用 REST API 来读取 Flink SQL 写进 HBase 的数据。

# 启动 hbase 集群 bin/start-hbase.sh# 后盾启动 restServerbin/hbase-daemon.sh start rest -p 8000

二、数据筹备

因为 HBase 环境是本人长期搞的单机服务,外面没有数据,须要往里面写点数据供后续示例用。在 Flink SQL 实战系列第二篇中介绍了如何注册 Flink Mysql table,咱们能够将广告位表抽取到 HBase 表中,用来做维度表,进行 temporal table join。因而,咱们须要在 HBase 中创立一张表,同时还须要创立 Flink HBase table, 这两张表通过 Flink SQL 的 HBase connector 关联起来。

· 在容器中启动 HBase shell,创立一张名为 dim_hbase 的 HBase 表,建表语句如下所示:

# 在 hbase shell 创立 hbase 表
hbase(main):002:0> create 'dim_hbase','cf'
Created table dim_hbase
Took 1.3120 seconds
=> Hbase::Table - dim_hbase

· 在 Flink 中创立 Flink HBase table,建表语句如下所示:

# 注册 Flink Hbase table
DROP TABLE IF EXISTS flink_rtdw.demo.hbase_dim_table;
CREATE TABLE flink_rtdw.demo.hbase_dim_table (
  rowkey STRING,
  cf ROW < adspace_name STRING >,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dim_hbase',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'localhost:2181'
);

· Flink MySQL table 和 Flink HBase table 曾经创立好了,就能够写抽取数据到 HBase 的 SQL job 了,SQL 语句以及 job 状态如下所示:

# 抽取 Mysql 数据到 Hbase 表中

insert into
  hbase_dim_table
select
CAST (ID as VARCHAR),
ROW(name)
from
  mysql_dim_table;

03 HBase 作为维表与 Kafka 做 temporal join 的场景

在 Flink SQL join 中,维度表的 join 肯定绕不开的,比方订单金额 join 汇率表,点击流 join 广告位的明细表等等,应用场景十分宽泛。那么作为分布式数据库的 HBase 比 MySQL 作为维度表用作维度表 join 更有劣势。在 Flink SQL 实战系列第二篇中,咱们注册了广告的点击流,将 Kafka topic 注册 Flink Kafka Table,同时也介绍了 temporal table join 在 Flink SQL 中的应用;那么本节中将会介绍 HBase 作为维度表来应用,下面大节中曾经将数据抽取到 Hbase 中了,咱们间接写 temporal table join 计算逻辑即可。

· 作为广告点击流的 Flink Kafa table 与 作为广告位的 Flink HBase table 通过广告位 Id 进行 temporal table join,输入广告位 ID 和广告位中文名字,SQL join 逻辑如下所示:

select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       hbase_dim_table.cf.adspace_name as publisher_adspace_name
from adsdw_dwd_max_click_mobileapp
left join hbase_dim_table FOR SYSTEM_TIME AS OF adsdw_dwd_max_click_mobileapp.procTime
on cast(adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as string) = hbase_dim_table.rowkey;

· temporal table join job 提交 Flink 集群上的状态以及 join 后果如下所示:

四、计算结果 sink 到 HBase 作为后果的场景

下面大节中,HBase 作为维度表用作 temporal table join 是十分常见的场景,实际上 HBase 作为存储计算结果也是十分常见的场景,毕竟 Hbase 作为分布式数据库,底层存储是领有多正本机制的 HDFS,保护简略,扩容不便,实时查问快,而且提供各种客户端不便上游应用存储在 HBase 中的数据。那么本大节就介绍 Flink SQL 将计算结果写到 HBase,并且通过 REST API 查问计算结果的场景。

· 进入容器中,在 HBase 中新建一张 HBase 表,一个 column family 就满足需要,建表语句如下所示:

# 注册 hbase sink table
create 'dwa_hbase_click_report','cf'

· 建设好 HBase 表之后,咱们须要在 Flink SQL 创立一张 Flink HBase table,这个时候咱们须要明确 cf 这个 column famaly 上面 column 字段,在 Flink SQL 实战第二篇中,曾经注册好了作为点击流的 Flink Kafka table,因而本节中,将会计算点击流的 uv 和点击数,因而两个 column 别离为 uv 和 click_count,建表语句如下所示:

# 注册 Flink Hbase table
DROP TABLE IF EXISTS flink_rtdw.demo.dwa_hbase_click_report;
CREATE TABLE flink_rtdw.demo.dwa_hbase_click_report (
  rowkey STRING,
  cf ROW < uv BIGINT, click_count BIGINT >,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dwa_hbase_click_report',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'hostname:2181'
);

· 后面点击流的 Flink Kafka table 和存储计算结果的 HBase table 和 Flink HBase table 曾经筹备了,咱们将做一个 1 分钟的翻转窗口计算 uv 和点击数,并且将计算结果写到 HBase 中。对 HBase 理解的人应该晓得,rowkey 的设计对 hbase regoin 的散布有着十分重要的影响,基于此咱们的 rowkey 是应用 Flink SQL 内置的 reverse 函数进行广告位 Id 进行反转和窗口启始工夫做 concat,因而,SQL 逻辑语句如下所示:

INSERT INTO dwa_hbase_click_report
SELECT
CONCAT(REVERSE(CAST(publisher_adspace_adspaceId AS STRING)) ,
'_',
CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000) AS STRING)
  ) as rowkey, 
ROW(COUNT(DISTINCT audience_mvid) , COUNT(audience_behavior_click_creative_impressionId)) as cf
FROM
  adsdw_dwd_max_click_mobileapp
WHERE publisher_adspace_adspaceId IS NOT NULL AND audience_mvid IS NOT NULL AND audience_behavior_click_creative_impressionId IS NOT NULL
GROUP BY
  TUMBLE(ets, INTERVAL '1' MINUTE),
  publisher_adspace_adspaceId;

· SQL job 提交之后的状态以及后果 check 如下所示:

上述 SQL job 曾经胜利的将结算后果写到 HBase 中了。对于线上的 HBase 服务来讲,很多共事不肯定有 HBase 客户端的权限,从而也不能通过 HBase shell 读取数据;另外作为线上报表服务显然不可能通过 HBase shell 来通过查问数据。因而,在实时报表场景中,数据开发工程师将数据写入 HBase, 前端工程师通过 REST API 来读取数据。后面咱们曾经启动了 HBase rest server 过程,咱们能够通 rest 服务提供读取 HBase 外面的数据。

· 咱们先 get 一条刚刚写到 HBase 中的数据看看,如下所示:

· 上面咱们开始通过 REST API 来查问 HBase 中的数据,第一步,执行如下语句拿到 scannerId;首先须要将要查问的 rowkey 进行 base64 编码能力应用,前面须要将后果进行 base64 解码

rowkey base64 编码前:0122612_1606295280000
base64 编码之后:MDEyMjYxMl8xNjA2Mjk1MjgwMDAw

curl -vi -X PUT \
         -H "Accept: text/xml" \
         -H "Content-Type: text/xml" \
         -d '<Scanner startRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw"endRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw"></Scanner>' \
"http://hostname:8000/dwa_hbase_click_report/scanner"

· 第二步,执行如下语句依据上条语句返回的 scannerID 查问数据,能够看到返回的后果:

curl -vi -X GET \
         -H "Accept: application/json" \
"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"

· 第三步,查问结束之后,执行如下语句删除该 scannerId:

curl -vi -X DELETE \
         -H "Accept: text/xml" \
"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"

五、总结

在本篇文章中,咱们介绍了 HBase 和 Flink SQL 的联合应用比拟宽泛两种的场景:作为维度表用以及存储计算结果;同时应用 REST API 对 HBase 中的数据进行查问,对于查问用户来说,防止间接裸露 HBase 的 zk,同时将 rest server 和 HBase 集群解耦。

作者简介

余敖,360 数据开发高级工程师,目前专一于基于 Flink 的实时数仓建设与平台化工作。对 Flink、Kafka、Hive、Spark 等进行数据 ETL 和数仓开发有丰盛的教训。

退出移动版