大家好,咱们是 BTC.com 团队。2020 年,咱们有幸接触到了 Flink 和 PyFlink 生态,从团队本身需要登程,欠缺了团队内实时计算的工作和需要,搭建了流批一体的计算环境。
在实现实时计算的过程中,咱们在实践中播种了一些教训,在此分享一些这方面的心路历程。次要分享的纲要如下:
- 困惑 • 形容 • 思考 • 口头
- 流批一体的架构
- 架构
- 成果
- Zeppelin、PyFlink on K8S 等实际
- Zeppelin
- PyFlink on K8S
- 区块链畛域实际
- 瞻望 • 总结
01 困惑 • 形容 • 思考 • 口头
作为工程师,咱们每天都在一直地理解需要,研发业务。
有一天,咱们被拉到了一次团队总结会议上,收到了以下的需要:
销售总监 A:
咱们想要晓得销售的历史和实时转化率、销售额,能不能统计一下实时的 TOP5 的商品,还有就是大促时候,用户实时拜访、商品实时浏览量 TOP5 的状况呢,能够依据他历史拜访的记录实时举荐相干的吗?
市场总监 B:
咱们想要晓得市场推广的成果,每次流动的实时数据,不然咱们的市场投放无奈精确评估成果,及时反馈啊。
研发总监 C:
有些用户的 Bug 无奈复现,日志能够再实时一点吗?传统日志剖析,须要肯定的梳理,可不可以间接荡涤 / 解决相干的数据?
洽购总监 D:
这些年是不是风行数字化,洽购这边想预测洽购需要,做一下实时分类和治理收入,预测将来供给起源,欠缺一下老本。这个有方法做吗?还有有些供应商不太稳固啊,能监控到他们的状况吗?
运维总监 E:
网站有时候拜访比较慢,没有中央能够看到实时的机器状况,搞个什么监控大屏,这个有方法解决吗?
部门领导 F:
能够实现下面的人的需要吗。
做以上的理解之后,才发现,大家对于数据需要的渴望水平,应用方不仅须要历史的数据,而且还须要实时性的数据。
在电商、金融、制作等行业,数据有着迅猛的增长,诸多的企业面临着新的挑战,数据分析的实时处理框架,比如说做一些实时数据分析报表、实时数据处理计算等。
和大多数企业相似,在此之前,咱们是没有实时计算这方面的教训和积攒的。这时,就开始困惑了,怎么能够更好地做下面的需要,在老本和成果之间获得均衡,如何设计相干的架构?
穷则思变,在有了困惑当前,咱们就开始筹备梳理已有的条件和咱们到底须要什么。
咱们的业务范围次要在区块链浏览器与数据服务、区块链矿池、多币种钱包等。在区块链浏览器的业务里,BTC.com 目前已是寰球当先的区块链数据服务平台,矿池业务在业内排行第一,区块链浏览器也是寰球前三大浏览器之一。
首先,咱们通过 parser 解析区块链上的数据,失去各方面的数据信息,能够剖析出每个币种的地址活跃度、地址交易状况、交易流向、参加水平等内容。目前,BTC.com 区块链浏览器与行业内各大矿池和交易所等公司都有相干单干,能够更好地实现一些数据的统计、整顿、演绎、输入等。
面向的用户,不仅有业余的区块链开发人员,也有各样的 B 端和 C 端用户,C 端用户能够进行区块链地址的标注,智能合约的运行,查看智能合约相干内容等,以及链上数据的检索和查看。B 端用户则有更业余的反对和领导,提供 API、区块链节点等一些的定制以及交易减速、链上的业务单干、数据定制等。
从数据量级来讲,截至目前,比特币大略有 5 亿笔交易,3000 多万地址,22 亿输入(output:每笔交易的输入),并且还在一直增长中。以太坊的话,则更多。而 BTC.com 的矿池和区块链浏览器都反对多币种,各币种的总数据量级约为几十 T。
矿池是矿工购买矿机设备后连贯到的服务平台,矿工能够通过连贯矿池从而获取更稳固的收益。这是一个须要保障 7 * 24 小时稳固的服务,外面有矿机一直地提交其计算好的矿池下发的工作的解,矿池将达到网络难度的解进行播送。这个过程也能够认为是近乎是实时的,矿机通过提交到服务器,服务器外部再提交到 Kafka 音讯队列,同时有一些组件监听这些音讯进行生产。而这些提交上来的解能够从中剖析出矿机的工作状态、算力、连贯状况等。
在业务上,咱们须要进行历史数据和实时数据的计算。
历史数据要关联一些币价,历史交易信息,而这些交易信息须要始终保留,是一种典型的批处理工作。
每当有新区块的确认,就有一些数据能够失去解决和剖析,比方某个地址在这个区块里产生了一笔交易,那么能够从其交易流向去剖析是什么样的交易,开掘交易相关性。或者是在这个区块里有一些非凡的交易,比方 segwit 的交易、比方闪电网络的交易,就是有一些这个币种特有的货色能够进行解析剖析和统计。并且在新区块确认时的难度预测也有所变动。
还有就是大额交易的监控,通过新区块的确认和未确认交易,锁定一些大额交易,联合地址的一些标注,锁定交易流向,更好地进行数据分析。
还有是一些区块链方面的 OLAP 方面的需要。
总结了在数据统计方面的需要和问题当前,咱们就开始进行思考:什么是最合适的架构,如何让人员参加少、成本低?
解决问题,无非就是提出假如,通过度量,而后刷新认知。
在浏览了一些材料当前,咱们认为,大部分的计算框架都是通过输出,进行解决,而后失去输入。首先,咱们要获取到数据,这里数据能够从 MySQL 也能够从 Kafka,而后进行计算,这里计算能够是聚合,也能够是 TOP 5 类型的,在实时的话,可能还会有窗口类型的。在计算完之后,将后果做下发,下发到音讯渠道和存储,发送到微信或者钉钉,落地到 MySQL 等。
团队一开始尝试了 Spark,搭建了 Yarn,应用了 Airflow 作为调度框架,通过做 MySQL 的集成导入,开发了一些批处理工作,有着离线工作的特点,数据固定、量大、计算周期长,须要做一些简单操作。
在一些批处理工作上,这种架构是稳固的,然而随着业务的倒退,有了越来越多的实时的需要,并且实时的数据并不能保障按程序达到,按工夫戳排序,音讯的工夫字段是容许前后有差距的。在数据模型上,需要驱动式的开发,老本相对来说,Spark 的形式对于过后来说较高,对于状态的解决不是很好,导致影响一部分的效率。
其实在 2019 年的时候,就有在调研一些实时计算的事件,关注到了 Flink 框架,过后还是以 Java 为主,整体框架概念上和 Spark 不同,认为批处理是一种非凡的流,然而因为团队没有 Java 方面的基因和积淀,应用 Flink 作为实时计算的架构,在过后就暂告一个段落。
在 2020 年初的时候,不论是 Flink 社区 还是 InfoQ,还是 B 站,都有在推广 PyFlink,而且过后尤其是程鹤群 [1] 和孙金城 [2] 的视频以及孙金城老师的博客 [3] 的印象粗浅。于是就想尝试 PyFlink,其有着流批一体的劣势,而且还反对 Python 的一些函数,反对 pandas,甚至当前还能够反对 Tensorflow、Keras,这对咱们的吸引力是微小的。在之后,就在构思咱们的在 PyFlink 上的流批一体的架构。
02 流批一体的架构
架构
首先咱们要梳理数据,要分明数据从哪里来。在以 Spark 为主的期间,数据是定期从数据源加载(增量)数据,通过肯定的转换逻辑,而后写入目的地,因为数据量和业务须要,提早通常在小时级别,而实时的话,须要尽可能短的提早,因而将数据源进行了分类,整体分成了几局部,一部分是传统的数据咱们寄存在 MySQL 长久化做保留,这部分之后能够间接作为批处理的计算,也能够导入 Hive,做进一步的计算。实时的局部,实际上是有很多思路,一种形式是通过 MySQL 的 Binlog 做解析,还有就是 MySQL 的 CDC 性能,在多方考量下,最初咱们抉择了 Kafka,不仅是因为其是优良的分布式流式平台,而且团队也有对其的技术积淀。
并且实际上在本地开发的时候,装置 Kafka 也比拟不便,只须要 Brew Install Kafka,而且通过 Conduktor 客户端,也能够不便的看到每个 Topic 的状况。于是就对现有的 Parser 进行革新,使其反对 Kafka,在当收到新的区块时,会立刻向 Kafka 发送一个音讯,而后进行解决。
大略是在 2018 年的时候,团队将整体的业务迁徙到了 Kubernetes 上,在业务一直倒退的过程中,其对开发和运维上来说,加重了很多累赘,所以倡议有肯定规模的业务,最好是迁徙到 Kubernetes,其对老本的优化,DevOps,以及高可用的反对,都是其余平台和传统形式无法比拟的。
在开发作业的过程中,咱们在尽可能的应用 Flink SQL,同时联合一些 Java、Python 的 UDF、UDAF、UDTF。每个作业通过初始化相似于以下的语句,造成肯定的模式:
self.source_ddl = '''CREATE TABLE SourceTable (xxx int) WITH'''
self.sink_ddl = '''CREATE TABLE SinkTable (xxx int) WITH'''
self.transform_ddl = '''
INSERT INTO SinkTable
SELECT udf(xxx)
FROM SourceTable
GROUP BY FROM_UNIXTIME(`timestamp`, 'yyyyMMdd')
'''
在将来的话,会针对性地将数据进行分层,依照业界通用的 ODS、DWD、DWS、ADS,分出原始层,明细层和汇总层,进一步做好数据的治理。
成果
最终咱们团队基于 PyFlink 开发疾速地实现了已有的工作,局部是批处理作业,解决过来几天的数据,局部是实时作业,依据 Kafka 的音讯进行生产,目前还算比较稳定。
部署时抉择了 Kubernetes,具体上面会进行分享。在 K8S 部署了 Jobmanager 和 Taskmanager,并且应用 Kubernetes 的 job 性能作为批处理作业的部署,之后思考接入一些监控平台,比方 Prometheus 之类的。
在老本方面,因为是应用的 Kubernetes 集群,因而在机器上只有扩大主机的老本,在这种形式上,老本要比传统的 Yarn 部署形式要低,并且之后 Kuberntes 会反对原生部署,在扩大 Jobmanager 和 Taskmanager 下面会更加不便。
03 Zeppelin、PyFlink on K8S 等实际
Zeppelin 是咱们用来进行数据摸索和逻辑验证,有些数据在本地不是实在数据,利用 Zeppelin 连贯理论的链上数据,进行计算的逻辑验证,当验证实现后,便可转换成生产须要的代码进行部署。
一、Kubernetes 上搭建 PyFlink 和 Zeppelin
1. 整顿后的部署 Demo 在 github,能够参阅相干链接[4]。
2. 对于配置文件,批改以下配置的作用。
(1). 批改 configmap 的 flink-conf.yaml 文件的 taskmanager 配置。
taskmanager.numberOfTaskSlots: 10
调整 Taskmanager 能够调整运行的 job 的数量。
(2). 在 Zeppelin 的 dockerfile 中批改 zeppelin-site.xml 文件。
cp conf/zeppelin-site.xml.template conf/zeppelin-site.xml;
sed -i 's#<value>127.0.0.1</value>#<value>0.0.0.0</value>#g' conf/zeppelin-site.xml;
sed -i 's#<value>auto</value>#<value>local</value>#g' conf/zeppelin-site.xml
- 批改申请起源为 0.0.0.0,如果是线上环境,倡议开启白名单,加上 auth 认证。
- 批改 interpreter 的启动模式为 local,auto 会导致在集群启动时,以 K8s 的模式启动,目前 K8s 模式只反对 Spark,local 模式能够了解为,Zeppelin 将在本地启动一个连贯 Flink 的 interpreter 过程。
- Zeppelin 和在本地提交 Flink 作业相似,也须要 PyFlink 的根底环境,所以须要将 Flink 对应版本的 jar 包放入镜像内。
3. Zeppelin 的 ingress 中增加 websocket 配置。
nginx.ingress.kubernetes.io/configuration-snippet: |
proxy_set_header Upgrade "websocket";
proxy_set_header Connection "Upgrade";
Zeppelin 在浏览器须要和 server 端建设 socket 连贯,须要在 ingress 增加 websocket 配置。
4.Flink 和 Zeppelin 数据长久化的作用。
volumeMounts:
- mountPath: /zeppelin/notebook/
name: data
volumes:
- name: data
persistentVolumeClaim:
claimName: zeppelin-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: zeppelin-pvc
spec:
storageClassName: efs-sc
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
- 对 Flink 的 /opt/flink/lib 目录做长久化的目标,是当咱们须要新的 jar 包时,能够间接进入 Flink 的 pod 进行下载,并存放到 lib 目录,保障 jobmanager 和 taskmanager 的 jar 版本统一,同时也无需更换镜像。
- Zeppelin 的工作作业代码会寄存在 /zeppelin/notebook/ 目录下,目标是不便保留编写好的代码。
5. Flink 命令提交 job 作业的形式。
(1). 本地装置 PyFlink,Python 须要 3.5 及以上版本。
$ pip3 install apache-flink==1.11.1
(2). 测试 Demo
def word_count():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
sink_ddl = """create table Results (word VARCHAR, `count` BIGINT) with ('connector'='print')"""
t_env.sql_update(sink_ddl)
elements = [(word, 1) for word in content.split(" ")]
_# 这里也能够通过 Flink SQL_
t_env.from_elements(elements, ["word", "count"])
.group_by("word")
.select("word, count(1) as count")
.insert_into("Results")
t_env.execute("word_count")
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
word_count()
或者是实时处理的 Demo:
def handle_kafka_message():
s_env = StreamExecutionEnvironment.get_execution_environment()
# s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)
st_env = StreamTableEnvironment
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())
source_ddl = '''
CREATE TABLE SourceTable (word string) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'Topic',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.zookeeper.connect' = 'localhost:2121',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
'''sink_ddl ="""
create table Results (word VARCHAR) with ('connector' = 'print')
"""
st_env.sql_update(sink_ddl)
st_env.sql_update(source_ddl)
st_env.from_path("source").insert_into("sink")
st_env.execute("KafkaTest")
if __name__ == '__main__':
handle_kafka_message()
(3). 本地测试 Flink 命令提交 job 作业。
$ flink run -m localhost:8081 -py word_count.py
python/table/batch/word_count.py
Job has been submitted with JobID 0a31b61c2f974bcc3f344f57829fc5d5
Program execution finished
Job with JobID 0a31b61c2f974bcc3f344f57829fc5d5 has finished.
Job Runtime: 741 ms
(4). 如果存在多个 Python 文件,能够先 zip 打包后再进行提交作业。
$ zip -r flinkdemo.zip ./*
$ flink run -m localhost:8081 -pyfs flinkdemo.zip -pym main
(5). Kubernetes 通过集群的 CronJob 定时调度来提交 Job,之后会做自研一些 UI 后盾界面做作业管理与监控。
04 在区块链畛域实际
随着区块链技术的越来越成熟,利用越来越多,行业标准化、规范化的趋势也开始浮现,也越来越依赖于云计算、大数据,毕竟是数字经济的产物。BTC.com 也在扎根于区块链技术基础设施,为各类公司各类利用提供数据和业务上的反对。
近些年,有个词火遍了 IT 业界,中台,不论是大公司还是守业公司,都喜爱扯上这个概念,号称本人业务中台,数据中台等。咱们的了解中,中台是一种整合各方面资源的能力,从传统的单兵作战,到晋升武器装备后勤保障,晋升作战能力。在数据上突破数据孤岛,在需要疾速变动的前台和日趋稳固的后盾中获得均衡。而中台更重要的是服务,最终还是要回馈到客户,回馈到合作伙伴。
在区块链畛域,BTC.com 有着深厚的行业技术积攒,能够提供各方面数据化的能力。比方在利用机器学习进行链上数据的预估,预估 eth 的 gas price,还有最佳手续费等,利用 keras 深度学习的能力,进行一些回归计算,在之后也会将 Flink、机器学习和区块链联合起来,对外提供更多预测类和规范化分类的数据样本,之前是在用定时工作一直训练模型,与 Flink 联合之后,会更加实时。在这方面,当前也会提供更多的课题,比方币价与 Defi,舆情,市场等的关系,区块链地址与交易的标注和分类。甚至于将机器学习训练的模型,放于 IPFS 网络中,通过去中心化的代币进行训练,提供方便调用样本和模型的能力。
在目前,BTC.com 推出了一些通过数据挖掘实现的能力,包含交易推送、OLAP 链上剖析报表等,改善和晋升相干行业和开发者理论的体验。咱们在各种链上都有监控节点,监控各区块链网络的可用性、去中心化水平,监控智能合约。在接入一些联盟链、隐衷加密货币,能够为联盟链、隐衷加密货币提供这方面的数据能力。
BTC.com 将为区块链产业生态倒退做出更多致力,以科技公司的实质,以技术倒退为第一驱动力,以市场和客户为导向,开发翻新和交融利用,做好基础设施。
05 瞻望与总结
从实时计算的趋势,到流批一体的架构,通过对 PyFlink 和 Flink 的学习,稳固在线上运行了多种作业工作,对接了理论业务需要。并且搭建了 Zeppelin 平台,使得业务开发上更加不便。在计算上尽可能地依赖 SQL,不便各方面的集成与调试。
在社区方面,PyFlink 也是没有令咱们悲观的,较快的响应能力,不断完善的文档。在 Confluence[5]上也能够看到一些 Flink Improvement Proposals,其中也有一些是 PyFlink 相干的,在不远的未来,还会反对 Pandas UDAF,DataStream API,ML API,也冀望在之后能够反对 Joblistener,总之,在这里也非常感谢相干团队。
将来的瞻望,总结起来就是,通过业务实现数据的价值化。而数据中台的终局,是将数据变现。
原文链接
本文为阿里云原创内容,未经容许不得转载。