关于后端:消息推送平台的实时数仓flink消费kafka消息入到hive

32次阅读

共计 11347 个字符,预计需要花费 29 分钟才能阅读完成。

大家好,3y 啊。好些天没更新了,并没有偷懒,只不过始终在装置环境,差点都想放弃了。

上一次比拟大的更新是做了 austin 的预览地址,把企业微信的利用和机器人音讯各种的音讯类型和性能给欠缺了。上一篇文章也提到了,austin 惯例的性能曾经更新得差不多了,剩下的就是各种细节的欠缺。

不晓得大家还记不记得我过后布局 austin 时,所画出的架构图:

当初就剩下 austin-datahouse 这个模块没有实现了,也有挺多同学在看代码的时候问过我这个模块在哪 … 其实就是还没实现,先布局,牛逼先吹出去(互联网人必备技能)

音讯推送平台🔥推送下发【邮件】【短信】【微信服务号】【微信小程序】【企业微信】【钉钉】等音讯类型

  • https://gitee.com/zhongfucheng/austin/
  • https://github.com/ZhongFuCheng3y/austin

至于这个模块吧,我料想它的性能就是把 austin 相干的 实时数据 写到 数据仓库 里。一方面是做数据备份,另一方面是大多数的报表很多都得依赖数据仓库去做。实际上,生产环境也会把相干的数据写到数仓中。

而在公司里,要把数据写到数据仓库,这事对开发来说个别很简略。因为无数仓这个货色,那大多数都会有相干的 根底建设 了。对于开发而言,可能就是把 日志数据写到 Kafka,在相干的后盾配置下这个 topic,就能将这个 topic 的数据同步到数据仓库里咯。如果是数据库的话,那应该大数据平台有同步数据的性能,对一般开发来说也就配置下 表名 就能同步到数据仓库里咯。

反正应用起来很简略就是了。不过,我其实不晓得具体是怎么做的。

然而不要紧啊,反正开源我的项目对于工夫这块还是很富余得啊:没有deadline,没有产品在隔壁催我写,没有相干的技术要跟我对接。那我不懂能够学,于是也花了几天看了下数仓这块内容。

在看数仓的同时,我之前在公司常常会听到 数据湖 这个词。我刚毕业的时候是没听过的,但这几年如同这个概念就火起来了。跟大数据那边聊点事的时候,常常会听到:数据入湖

那既然看都看了,顺便理解 数据湖 是个什么货色吧?对着浏览器一轮检索之后,我发现这个词还是 挺形象 的,始终没找到让我耳目一新的答案,这个数据湖也不晓得怎么就火起来了。我浏览了一遍之后,我大略能够总结出什么是数据湖,跟数据仓库有啥区别:

1、数据仓库是存储 结构化的数据 ,而数据湖是 什么数据都能存 (非结构化的数据也能存)。结构化数据能够了解为咱们的 二维表 JSON 数据,非结构化的数据能够了解为 图像文件 之类的。

数据仓库在写入的时候,就要定义好 schema 了,而数据湖在写入的时候不须要定 schema,能够等用到的时候再查出来。强调这点,阐明数据湖对数据的 schema 束缚更加灵便。

2、数据仓库和数据湖 并不是代替关系。数据是先进数据湖,将数据加工(ETL)之后,一部分数据会到数据仓库中。

3、咱们晓得现有的数据仓库个别基于 Hadoop 体系的 HDFS 分布式文件系统去搭建的,而数据湖也得存储数据的嘛,个别也是依赖 HDFS。

4、开源的数据湖技术比拟闻名的有hudiicebergDelta Lake

看完下面的形容,是不是感觉有点空洞。看似学到了很多,然而理论还是不晓得数据湖有啥牛逼之处。嗯,我也是这么想的。总体下来,感觉 数据湖就相当于数据仓库的 ODS,围绕着这些数据定义了对应的meta 信息,做元数据的治理。

说到 ODS 这个词了,就简略聊下数据仓库的分层构造吧。这个行业通用的,个别分为以下:

1、ODS(Operate Data Store),原始数据层,未通过任何加工的。

2、DIM(Dictionary Data Layer),维度数据层,比方存储地区、用户客户端这些维度的数据。

3、DWD(Data Warehouse Detail),数据明细层,把原始数据通过简略的加工(去除脏数据,空数据之后就失去明细数据)。

4、DWS(Data Warehouse Service),数据维度汇总层,比方将数据明细依据用户维度做汇总失去的汇总之后的数据。

5、ADS(Application Data Store),数据应用层,这部分数据能给到后端以接口的形式给到前端做可视化应用了。

至于为什么要分层,跟当初咱们了解 DAO/Service/Controller 的思维差不多,大略就是 复用 便于后续批改变动

扯了那么多吧,聊会 ausitn 我的项目吧,我是打算怎么做的呢?因为我的实时计算 austin-stream 模块是采纳 Flink 去做的,我打算 austin-datahouse 也是采纳 flink 去做。

这几年在大数据畛域 湖仓一体 流批一体 这些概念都十分火,而对于 austin 来说,第一版迭代还不必走得这么急。我目前的想法是利用 flinktableapi去对接 Hive,通过SupersetMetabaseDataEase 其中一个开源的 大数据可视化工具 Hive的数据给读取进去,那第一版就差不多实现了。

现状

自从我决定开始写 austin-data-house 数据仓库模块,曾经过了两周有多了。这两周多我都在被 部署装置环境 折磨,中途有很屡次就想放弃了。

我初学编程,到当初工作了几年,我还是没变,判若两人地厌恶装置环境

花了这么长时间调试装置部署环境,实现的性能其实很简略:生产 Kafka 的音讯,写入 hive。(我在写全链路追踪性能实时引擎用的是flink,为了技术架构对立,我还是心愿通过flink 来实现。)

flink1.9 开始反对 hive。到目前为止,flink 稳固的版本在 1.16.0flink 反对 hive 也就这两年的事。

austin所依赖的组件有很多(失常线上环境都会有这些组件,只是不必咱们本人搭建而已)。各种组件的环境问题被我一一驯服了,但有很大水平上的功绩是在 docker-compose 上。

说到数据仓库,第一工夫必定是想到 hive。尽管我没装过hadoop/hive/hdfs 大数据相干的组件,但略微想想这都是简单的。那装置 hive 天然就会想到有没有 docker 镜像,一键装置可多爽啊。

之前接入的 flink 也是跑在 docker 上的,把 hive 也找个镜像,两者交融交融不就行了嘛?

想法很好,我就开干了。

基础知识

flinkhive 交融,实际上是借助 hive catalog 来买通 hivehive catalog 对接着 hive metastore(hive 存储元数据的中央)。

当咱们应用 flink 创立出的元数据,会经由 hive catalog 最终长久化到hive metastore,同时咱们会利用hive catalog 提供的接口对 hive 进行写入和读取。

装置 hive 环境

那时候简略搜了下,还真被我找到了 hive 的镜像,没想到这么侥幸,还是反对 docker-compose 的,一键装置,美滋滋。

https://github.com/big-data-europe/docker-hive

我就简略复述下过程吧,比较简单:

1、把仓库拉到本人的服务器上

git clone git@github.com:big-data-europe/docker-hive.git

2、进入到我的项目的文件夹里

cd docker-hive

3、启动我的项目

docker-compose up -d

一顿下载之后,能够发现就启动胜利了,通过 docker ps 命令就看到运行几个镜像了。

没错,这就装置好 hive 了,是不是非常简单。具体启动了什么,咱们能够简略看下 docker-compose.yml 文件的内容。

最初,咱们能够连上 hive 的客户端,感触下疾速装置好 hive 的成功感。

# 进入 bash
docker-compose exec hive-server bash

# 应用 beeline 客户端连贯
/opt/hive/bin/beeline -u jdbc:hive2://localhost:10000

深陷迷雾

hive装置好了之后,我就快马加鞭地想晓得怎么跟 flink 进行交融了。我就搜了几篇博客看个大略,起初发现大多数博客的内容其实就是 翻译了 flink 官网的内容。

不过,翻博客的过程中让我大抵理解了一点:如果我要应用 flink 连贯 hive,那我要 手动 flink连贯 hivejar包导入到 flink/lib 目录下。

说实话,这还是比拟麻烦的。我还认为只须要在我的工程里导入相干的依赖就好了,没想到还得本人手动把 jar 包下来下来,而后传入到 flink 的装置目录下。

我吭哧吭哧地做了,但把我写好的工程 jar 包传上去提交给 jobmanager 不是缺这就是少那依赖。我置信我能搞掂,反正就是版本依赖的问题嘛,我在行的。

前面又发现在 flink 工程项目里用 maven 引入 hadoop 依赖是不够的,flink新版本里默认打的镜像是没有 hadoop 的,要手动在 flink 环境目录下引入hadoop。这个也是麻烦的,但只有我在镜像里下载些环境,也不是不能干。

1、装置vim

apt-get update

apt-get install vim

2、装置hadoop

2.1、下载hadoop

wget https://archive.apache.org/dist/hadoop/common/hadoop-2.7.4/hadoop-2.7.4.tar.gz

2.2、解压hadoop

tar -zxf hadoop-2.7.4.tar.gz

2.3、配置环境变量

vim /etc/profile
export HADOOP_HOME=/opt/hadoop-2.7.4
export PATH=$HADOOP_HOME/bin:$PATH
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile

2.4、在 flink 的 docker 容器里还得把 .bashrc 也得改了才失效

过于乐观的我,搞了 10 天左右吧,终于顶不住了,下定决心:我肯定要 对立版本 ,不能修修补补了,该什么版本就走什么版本, 推倒素来 吧。我就按着 flink 官网来走,一步一步走下来不可能错的吧!

flink最新的版本是 v1.17-SNAPSHOT,那我就挑 上一个稳固 的版本就行了!顺便一看,我之前写全链路追踪 austin 接入 flink 的时候,代码的还是 14.3 版本。但管不了这么多了,就用 1.16.0 版本吧。

首先,我发现我的 flink 镜像拉取的是最新的版本 image: flink:latest。那我得找1.16.0 版本的 docker-compose 来部署,版本就得对立,前面的事才好搞。这个好找,在官网很快就找到了:image: flink:1.16.0-scala_2.12

新的镜像搞下来了当前,我又吭哧地把相干的 jar 都手动地导入到 flink 容器里。另外,我发现官网写的 pom 依赖,压根就下载不下来的,这不对劲啊

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  <version>1.16.0</version>
  <scope>provided</scope>
</dependency>

我开始认为是我的 maven 仓库配置问题,找遍了仓库在那个 artifactId 下,最大的也就只有 1.14.x 的版本。去找了下 flinkissue,发现有人跟我一样的问题。

https://github.com/apache/flink/pull/21553

持续尝试提交我本人写好的flink.jar。毫无意外地,又报错了,有些是之前的报错,我很快地就能解决掉。

我一想,意识到是哪里没做好了:hive的版本,hadoop的版本,flink的版本这三者也要束缚。那我转头一看,发现之前我从镜像里拉下来 hive 版本是 2.3.2,外面装的hadoop 版本是2.7.4。于是,我又对立了这三者的版本。信念很足,感觉肯定能成。

再次提交,还是有问题,疯狂 Google 但就是始终找不到解决方案。能查出来的材料,网上的全都是“原始”装置部署的,就没有通过 flink docker 镜像跟 hive 交融的,而且也不是跨机器的(给进去的案例都是在同一台机器上,我是 hive 部署一台机器上,flink部署在另一台机器上)。

花了几天调试还是解决不掉,怎么搞呢?放弃又不甘心。咋整?持续推倒重来呗

在应用 flink 容器 调试的过程中我曾经发现了:

1、拉下来的 docker 镜像里的内容,跟官网所形容的 jar 包是有出入的,有的是要我手动去下载的。但过后我感觉既然版本曾经限定了,那应该问题也不大。

2、hadoop环境变量在 flink docker 容器下很难调试。每次从新推倒素来的时候,我都得手动配置一次,步骤也繁琐。即使我挂载了相干的jar 包和整个目录

3、flink容器内重启和启动集群环境不可控,老是呈现奇奇怪怪的问题。

那这一次,我就不必 docker-compose 部署 flink 了,间接在 centos 装置部署flink,持续整。

随着我每一次推倒重来,我就感觉我离胜利越来越近越来越近。从环境变量报错缺失 CALSS_PATH 的问题,曾经到了 sql 的语法的问题,从 sql 语法的问题到找不到近程地址 namenode can't found 的问题,从近程地址的问题,到 HDFS 调用不通的问题。最初,终于调试胜利了。

上面就记录我能调试胜利的装置过程,各种坑谬误异样就不记录了(篇幅问题),这里也吐槽够了。

装置 flink 环境

1、下载 flink 压缩包

wget https://dlcdn.apache.org/flink/flink-1.16.0/flink-1.16.0-bin-scala_2.12.tgz

2、解压flink

tar -zxf flink-1.16.0-bin-scala_2.12.tgz

3、批改该目录下的 conf 下的 flink-conf.yaml 文件中 rest.bind-address 配置,不然 近程拜访不到 8081 端口,将其改为0.0.0.0

rest.bind-address: 0.0.0.0

4、将 flink 官网提到连贯 hive 所须要的 jar 包下载到 flinklib目录下(一共 4 个)

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/1.16.0/flink-sql-connector-hive-2.3.9_2.12-1.16.0.jar

wget https://repo.maven.apache.org/maven2/org/apache/hive/hive-exec/2.3.4/hive-exec-2.3.4.jar

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.16.0/flink-connector-hive_2.12-1.16.0.jar 

wget https://repo.maven.apache.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar

5、依照官网批示把 flink-table-planner_2.12-1.16.0.jarflink-table-planner-loader-1.16.0.jar 这俩个 jar 包挪动其目录;

mv $FLINK_HOME/opt/flink-table-planner_2.12-1.16.0.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.16.0.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.16.0.jar $FLINK_HOME/opt/flink-table-planner-loader-1.16.0.jar

6、把后续 kafka 所须要的依赖也下载到 lib 目录下

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.16.0/flink-connector-kafka-1.16.0.jar

wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.1/kafka-clients-3.3.1.jar

装置 hadoop 环境

因为 hive 的镜像曾经锁死了 hadoop 的版本为 2.7.4,所以我这边flink 所以来的 hadoop 也是下载 2.7.4 版本

1、下载 hadoop 压缩包

wget https://archive.apache.org/dist/hadoop/common/hadoop-2.7.4/hadoop-2.7.4.tar.gz

2、解压hadoop

tar -zxf hadoop-2.7.4.tar.gz

装置 jdk11

因为高版本的 flink 须要jdk 11,所以这边装置下该版本的jdk

yum install java-11-openjdk.x86_64
yum install java-11-openjdk-devel.x86_64

配置 jdk、hadoop 的环境变量

这一步为了能让 flink 在启动的时候,加载到 jdkhadoop的环境。

1、编辑 /etc/profile 文件

vim /etc/profile

2、文件内容最底下减少以下配置:

JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.17.0.8-2.el7_9.x86_64
JRE_HOME=$JAVA_HOME/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH
export HADOOP_HOME=/root/hadoop-2.7.4
export PATH=$HADOOP_HOME/bin:$PATH
export HADOOP_CLASSPATH=`hadoop classpath`

3、让配置文件失效

source /etc/profile

austin 数据仓库工程代码

间接上 austin 仓库地址,文章篇幅就不贴代码了,该写的正文我都写了。

http://gitee.com/zhongfucheng/austin

这个工程代码量非常少,一共就 4 个外围文件pom.xml/hive-site.xml/AustinHiveBootStrap.java,要应用的时候留神该两处中央即可:

1、com.java3y.austin.datahouse.constants.DataHouseConstant#KAFKA_IP_PORT将这里改成本人的 kafkaipport

2、hive-site.xml文件全局替换掉 hive_ip 为本人的 hive 地址,一共两处

部署工程代码到 Flink

咱们把 jar 包上传到服务器,而后应用命令提交 jar 包给 flink 执行。也能够关上 flink 的治理后盾,在页面上提交 jar 包并启动。我这里就抉择应用命令的形式来提交,次要因为在外网透出 flink 的端口,很容器被攻打(我曾经重装系统几次了。。)

(flink命令在 $FLINK_HOME/bin 下)

./start-cluster.sh
./flink run austin-data-house-0.0.1-SNAPSHOT.jar

启动 Kafka 生产者写入测试数据

启动消费者的命令(将 ipport改为本人服务器所部署的 Kafka 信息):

$KAFKA_HOME/bin/kafka-console-producer.sh --topic austinTraceLog  --broker-list ip:port

输出测试数据:

{"state":"1","businessId":"2","ids":[1,2,3],"logTimestamp":"123123"}

行将胜利

到这一步,离胜利就十分近了,但还是有 通信 的问题:flink无奈辨认 namenode/namenodedatanode之间的通信问题等等。于是咱们须要做以下措施:

1、hive在部署的时候,减少 datanode/namenode 的通信端口,部署 hive 应用这个 docker-compose 文件的内容:

version: "3"

services:
  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
    volumes:
      - namenode:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop-hive.env
    ports:
      - "50070:50070"
      - "9000:9000"
      - "8020:8020"
  datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
    volumes:
      - datanode:/hadoop/dfs/data
    env_file:
      - ./hadoop-hive.env
    environment:
      SERVICE_PRECONDITION: "namenode:50070"
    ports:
      - "50075:50075"
      - "50010:50010"
      - "50020:50020"
  hive-server:
    image: bde2020/hive:2.3.2-postgresql-metastore
    env_file:
      - ./hadoop-hive.env
    environment:
      HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore"
      SERVICE_PRECONDITION: "hive-metastore:9083"
    ports:
      - "10000:10000"
  hive-metastore:
    image: bde2020/hive:2.3.2-postgresql-metastore
    env_file:
      - ./hadoop-hive.env
    command: /opt/hive/bin/hive --service metastore
    environment:
      SERVICE_PRECONDITION: "namenode:50070 datanode:50075 hive-metastore-postgresql:5432"
    ports:
      - "9083:9083"
  hive-metastore-postgresql:
    image: bde2020/hive-metastore-postgresql:2.3.0
    ports:
      - "5432:5432"
  presto-coordinator:
    image: shawnzhu/prestodb:0.181
    ports:
      - "8080:8080"
volumes:
  namenode:
  datanode:

2、在部署 flink 服务器上减少 hosts,有以下(ip 为部署 hive 的地址):

127.0.0.1 namenode
127.0.0.1 datanode
127.0.0.1 b2a0f0310722

其中 b2a0f0310722datanode 的主机名,该主机名会随着 hivedocker而变更,咱们能够登录 namenode 的后盾地址找到其主机名。而办法则是在部署 hive 的地址输出:

http://localhost:50070/dfshealth.html#tab-datanode

3、把工程下的 hive-site.xml 文件拷贝到 $FLINK_HOME/conf

4、hadoop的配置文件 hdfs-site.xml 减少以下内容(我的目录在/root/hadoop-2.7.4/etc/hadoop

<property>
    <name>dfs.client.use.datanode.hostname</name>
    <value>true</value>
    <description>only cofig in clients</description>
</property>

5、启动 flink-sql 的客户端:

./sql-client.sh

6、在 sql 客户端下执行以下脚本命令,注:hive-conf-dir要放在 $FLINK_HOME/conf

CREATE CATALOG my_hive WITH (
    'type' = 'hive',
    'hive-conf-dir' = '/root/flink-1.16.0/conf'
);
use catalog my_hive;
create database austin;

7、重启 flink 集群

./stop-cluster.sh
./start-cluster.sh

8、从新提交执行 flink 工作

./flink run austin-data-house-0.0.1-SNAPSHOT.jar

数据可视化

到下面为止,咱们曾经把数据写入到 hive 表了,咱们是不可能每一次都在命令行窗口里查问 hive 的数据。个别在公司里都会有 可视化平台 供咱们开发 / 数仓 / 数据分析师 / 经营 去查问 hive 的数据。

我简略看了几个开源的可视化平台:Superset/Metabase/DataEase。最初抉择了Metabase,无他,看着悦目一些。

部署 Metabase 很简略,也是应用 docker 进行装置部署,就两行命令(后续我会将其退出到 docker-compose 外面)。

docker pull metabase/metabase:latest
docker run -d -p 5001:3000 --name metabase metabase/metabase

完了之后,咱们就能够关上 5001 端口到 Metabase 的后盾了。

咱们能够在 Metabase 的后盾增加 presto 进而连贯 hive 去查问记录。

这个 presto 服务咱们在搭建 hive 的时候曾经一起启动了,所以这里间接应用就好了。

到这一步,咱们就能够通过在页面上写 sql 把音讯推送过程中埋点的 明细数据 查问进去

最初

这数据仓库整个装置环境和调试过程的确折腾人,屡次推倒重来(甚至不惜重装系统重来)。还好最初输出 Kafka 一条音讯,在 hive 表里能看到一条记录,能看到后果之后,折腾或者是值得的。

如果想学 Java 我的项目的,强烈推荐 我的开源我的项目 音讯推送平台 Austin(8K stars),能够用作 毕业设计 ,能够用作 校招 ,能够看看 生产环境是怎么推送音讯 的。开源我的项目音讯推送平台 austin 仓库地址:

音讯推送平台🔥推送下发【邮件】【短信】【微信服务号】【微信小程序】【企业微信】【钉钉】等音讯类型

  • https://gitee.com/zhongfucheng/austin/
  • https://github.com/ZhongFuCheng3y/austin

参考资料:

  • https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/overview/
  • https://blog.51cto.com/u_15105906/5849229
  • https://blog.csdn.net/qq_38403590/article/details/126172610

正文完
 0