本文首发于 vivo 互联网技术微信公众号(https://mp.weixin.qq.com/s/EB…) 作者:周建军本文根据周建军在 2019 年 3 月 30 日 vivo 互联网技术沙龙《亿级用户的智能体验交付之路》的演讲内容整理。周建军,vivo 大数据专家。负责 vivo 一站式数据接入平台的架构设计和技术演进。SDCC 2017 深圳峰会演讲嘉宾。加入 vivo 前曾服务于腾讯数据平台部,先后参与及负责万亿级实时数据接入系统 TDBank、实时计算平台 EASYCOUNT 等系统的设计与开发。本次分享主要分为四部分:vivo 大数据平台架构概览数据采集的需求与挑战平台架构演进过程未来规划与展望一、vivo 大数据平台架构概览vivo 大数据平台是做什么的?支撑了哪些业务?以及如何支撑这些业务?我们先来看一下 vivo 大数据平台架构的整体概览。从图中可以看出,vivo 大数据平台的定位是为公司的各个业务线提供最基础的数据服务。目前支撑的业务包括互联网业务、手机业务相关的十几条大的业务线。支撑的业务主要依赖以下四个平台:一是数据生产平台,主要做数据的采集和数据的清洗,经过采集和清洗入库到存储系统,提供给数据开发人员使用。二是存储计算平台,为公司提供统一的存储计算服务。三是数据开发平台,主要给数据开发、分析人员提供数据的查询和分析的入口。四是运营维护平台,是为整个大数据平台提供物理资源管理、基础监控等基础能力的运维服务。我主要负责数据生产平台,接下来详细介绍下 vivo 的数据生产平台。如何定义数据生产呢?就我们自己而言,vivo 数据生产平台是用户与产品之间通过数据进行交互的一个桥梁。我们的业务每天产生大量的数据,数据来源主要有公网手机、App 和内部业务。数据很有价值,后续的数据应用可以基于这些数据实时发现可能存在的风险;通过数据还可以形成运营报表以分析产品运营的情况;同时也可以对手机用户做行为分析,提供画像给产品,让产品更好地了解用户和满足用户需求。既然数据这么重要,如何保证数据能够快速稳定地触达到数据分析系统呢?这里面就有一个很重要的数据生产平台。数据生产平台提供了多种数据接入方式,比如提供消息、埋点、日志等多种方式收集,然后清洗入库以供数据开发使用。那么,现在我们的数据生产平台的数据规模如何呢?目前每天采集的数据量大概是 90 TB ,每天接收的数据条是 2500 亿,Agent 个数是 1.1 万 ,服务的业务线有 250+ 。二、数据采集的需求与挑战既然每天的数据这么大,那在构建数据生产平台的过程中都遇到了什么样的问题呢?接下来分享一下我们在数据采集过程中遇到的需求和挑战。很多人会觉得数据采集很容易,前面有采集的 Agent ,后面有消息中间件。在消息中间件之后有两个路径:一是通过实时数据消费;二是通过数据分拣离线分析。从这个数据采集的链路来看,的确比较简单。当我们数据较小,业务不复杂时采用这种数据采集链路是没有问题的。当时支持的规模达到 80 万/S、5000 个 Agent 。随着业务的发展,我们发现数据采集的链路逐渐不能满足需求。首先数据生产的环境发生了变化,从单个自建的机房变成多个。我们除了在自建机房上部署,还要在混合云环境部署。在环境变化的同时,业务要求数据采集除了具备高吞吐还要高可用。这时数据采集就会出现两个致命的问题:一是数据延时;二是数据丢失。比如采集链路中的网络抖动或者节点故障会导致数据采集延迟,采集延迟又会导致数据采集不及时从而引起丢失的问题。基于这些问题,我们开启了数据采集平台的架构演进之路。三、平台架构演进过程1、采集链路 0.2 版本为了解决上述提到的问题,我们的第一个版本在改造时对应于采集链路 0.2 版本。这个版本主要做了两件事:一是 Agent 通道分离,二是 Kafka 资源分离。之前我们的采集 Agent 是单通道的,后面的 Kafka 节点故障时会阻塞整个通道。同时因为是单通道,我们没有办法区分高低优业务。所以,我们将单通道变成多通道。在做多通道的同时,当某个 topic 采集出现异常,我们就直接丢弃,单通道的顺序发送问题也得到解决。这里面设置了一个 Kafka 资源分组,用以解决单通道的问题。简单来讲 Kafka 的资源分组按照业务 topic 对 broker 进行划分。比如将高优的业务划分到一组,低优业务划分到另外的组。保持高优的处于低负载状态,高优的发送速度比较快,这样我们的高优通道的发送速率就相对比较高。这样既解决了单通道顺序发送问题,数据延迟问题也得到缓解;同时还解决了业务优先级保障的问题。这种状态下,数据采集过程又逐渐暴露出了两个问题:一是数据恢复慢。因为当采集链路出现问题,我们要恢复某一部分数据该怎么做?要么是消费端 Kafka 重置;要么从 Agent 端重新采集。由于整个链路比较长,所以恢复慢。二是 Kafka 资源紧张。所有的数据都需要经过 Kafka。当 Kafka copy 比较多数据的情况下,就会对磁盘存储造成压力,它的磁盘 IO 会成为一个瓶颈。于是,我们对采集链路进行分析,发现采集链路上绝大部分在做离线分析,实时数据分析占其中很少一部分。而离线分析的数据没有必要经过整个采集路径。所以,我们对采集路径做了进一步优化,形成了采集链路 0.3 版本。2、采集链路 0.3 版本采集链路 0.3 版本解决了 实时离线分离和数据快速恢复的问题。过简单的架构图,大家可以发现我们的采集链路当中增加了一个 logpusher 组件。当我们做离线分析时可以从 Agent 端直接上传到 HDFS。logpusher 还可以实现数据快速恢复。我们要恢复 10 分钟的数据,logpusher 可以去定制,这样就形成了快速的恢复能力。大部分的离线数据不需要通过实时链路来采集,从而减少了 Kafka 的成本压力,这样实时采集链路数据量变小了,我们整个采集链路会更加快速。随着公司业务的继续发展,前面的 Agent 数量越来越多。这时,我们遇到了另外的问题,一是 Kafka 的连接数问题,在 Agent 增多的情况下,Kafka 的连接数呈现一个线性增长;二是出现数据丢失的问题,我们的业务日志是滚动删除的,如果采集数据跟不上业务的数据,这部分会被丢弃掉,对后端的数据分析而言是采集数据丢失。为了解决这些问题,采集链路演进到了 1.0 版本。3、采集链路 1.0 版本在 1.0 版本中,我们对采集链路做了大改动 ——在 Agent 与 Kafka 之间增加一个缓冲层,这里我们称之为 Bus。Bus 主要做三件事情:一是连接收敛;二是数据缓冲;三是数据路由转发。(1)连接收敛连接收敛,简单讲就是多个连接变成一个连接。为什么要这样呢?因为 Kafka 的每一个连接都会消耗 Kafka 的资源。当连接较多的情况下,Kafka 的性能会下降,数据采集速度也会下降。随着连接增多,故障连接的个数会更大。如果连接故障,就会触发 Kafka 的 rebalance,rebalance 会进一步影响采集性能。所以我们需要做连接收敛。(2)数据缓冲数据缓冲主要是解决数据丢失的问题。如何实现呢?比如说 Kafka 出现了故障,之前的版本很明显会导致 Agent 的发送速度下降。因为 Agent 发送速度赶不上数据的生产速度,那这部分的数据就滚动删除,这样数据就丢失了。如果我们在 Bus 层做一个数据的缓冲,假如说链路出现故障,那 Bus 可以用一些本地磁盘资源,将数据进行旁路存储,这样 Agent 可以正常发送。当 Kafka 稳定之后,Bus 再异步发送到 Kafka,这样也不会影响正常的实时采集链路,这就解决了数据丢失的问题。(3)数据路由转发在引入了 Bus 之后,我们同时也做了数据转发。有的数据不一定到 Kafka ,比如有的数据需要直接到 ES,用于做检索。我们通过对 Bus 配置修改来决定数据发送的地方。数据从哪里来到哪里去做成可配置的,让整个采集链路变得更加灵活。(4)部署问题引入的 Bus 应该部署在哪个地方呢?这里有两种部署方案:一种是将 Bus 和 Kafka 部署在一起,将 Agnet 跨机房部署;另一种是将 Bus 与 Agent 部署在一起,与 Kafka 跨机房部署。无论如何选择,都存在跨机房的问题。思考之后,我们采取的是第二种部署方案。因为跨机房数据传输无疑会导致 RT 增大,数据传输的吞吐量下降。为了弥补 RT 的问题,我们通常的做法是增大发送临界区 patchSize 或者数据发送 Task 的数量。我们要么在 Agent 端增加,要么在 Bus 端增加。在 Agent 端增加,对业务是有感知的,Agent 是与业务服务部署在一起的,所有我们只能在 Bus 端修改 patchSzie 与发送的任务数。因此,我们就需要选择第二种部署方案。4、小结:从 0.1 到 1.0 版本简单回顾下,采集链路从 0.1 到1.0版本,我们做了三件事情:第一是通道分离,解决了数据顺序发送、高低优通道发送问题。第二是通过 logpusher 将实时与离线采集链路分离,解决了 Kafka 存储资源浪费的问题和数据快速恢复的问题。第三是增加 Bus 层,解决了连接数收敛、数据缓冲、数据转发的问题。通过以上几个版本的演进,我们的吞吐量从最初的 80 w/s提升到 360 w/s,采集链路也算维持在一个比较稳定的状态。5、 采集链路 V2.0 架构我们曾经还遇到过核心交换机故障的问题和机房级掉电故障问题。出现这些问题会导致整个采集链路瘫痪,同时也暴露了采集链路在机房级容灾能力上的不足。基于这两个问题我们开启了采集链路 2.0。从采集链路 2.0 架构图中可以看出,我们在采集链路的各个组件上都增加了 failover 处理机制。Agent 默认将数据发往本机房的 Bus,当本机房 Bus 异常时 Agent 会将数据发送到备用的 Bus集群,后面的 Bus 也是如此。如果 Kafka 故障,Bus 具备将数据发往备用 Kafka 集群的能力,当然这个依赖于具体的配置。采集链路 2.0 除了链路层的修改之外,还增加了一个平台管控的 manager。这个 manger 主要用于数据接入管理、运营操作管控、指标监控预警及权限管控。通过 manger 将数据接入、运营操作平台化,全链路指标对账可视化。6、采集链路 V3.0经过采集链路 2.0 之后,我们的采集链路不管在接入效率还是在链路容灾能力上都显著提升。在采集链路稳定之后,接下来我们要做的就是如何将采集链路的元数据管控起来。这个就是采集链路 3.0 版本的主要工作。采集链路 3.0 主要是做数据运营。所谓数据运营就是让数据管理员知道数据是谁在生产,谁采集、谁负责、谁授权、谁消费。数据运营就是告知数据管理者数据从哪里来,到哪里去。比如这张图可以看出数据是谁负责,以及数据的上下游;接下来这张图是一个采集任务视图,显示了数据是谁在生产。到此为止,我们平台经历了三个大的版本迭代,数据生产平台具备了高吞吐的数据采集能力、机房级链路容灾能力和平台化的数据管理能力。那么接下来我们还有什么规划呢?四、未来规划与展望1、ETL 平台任务配置化和自助测试功能前面的内容主要介绍数据采集,对于数据生产平台,还有一个重要的数据清洗 ETL 平台。ETL 任务负责将 HDFS 数据按照业务需求处理并入库到 HIVE,以供后面的数据分析与数据统计。当前的 ETL 任务存在两个问题,一个是重复性编码工作,一个是 ETL 逻辑测试验证困难。基于以上两个问题 ,ETL 调度平台打算提供两个能力,一个是 ETL 任务配置自动化的能力,第二个是 ETL 任务自助测试的能力。ETL 任务配置生成,是依赖于代码动态注入,这样可以把重复的逻辑抽取,提高 ETL 任务的开发效率。ETL 任务测试能力,是让用户在上线 ETL 任务之前,可以引入少量的数据来验证 ETL 任务的逻辑,进而提高线上ETL 任务的质量。2、大数据平台内部子系统实现数据共享除了 ETL 的规划外,我们还计划将整个大数据平台打通。对于大数据平台而言主要有两种角色的用户,一种是数据分析人员,一种是数据开发人员。当数据分析人员有一个新的需求时,数据分析人员需要先设计埋点,让前端开发者按照埋点上报数据;然后数据分析人员告知数据开发人员配置数据采集任务和数据统计任务,数据开发人员配置完整之后再告知数据分析人员可以开始使用数据。在整个过程中首先是沟通成本比较高,如果数据开发者理解有偏差或者数据分析师表达不到位,都会造成采集的数据和计算的结果不满足要求;另外整个数据分析的链路太长,人为干预太多。为了解决以上问题,我们想将这几个平台打通,让平台直接做数据共享,减少人为沟通。假设有一个新的埋点需求,埋点系统自动推送到采集系统,采集系统自动创建采集任务,同时数据分析人员建立数据分析的任务后,将分析任务推送到数据开发平台生成数据开发任务。当埋点数据变更时,自动通知下游变更。整个大数据平台的变更做到自动传导与传递。通过大数据平台内部子系统直接的数据共享可以提高新的数据分析接入效率和分析任务变更的效率。以上,就是本次主题分享实录。Q & A1、我发现 Bus 基本上是通过 Flume 集群来实现的,那是不是可以直接通过 Flume 开发?答:我们的 Bus 是基于 Flume 的二次开发。我们知道 Flume 里面有几种常用的Channel:File Channel,Kafka Channel,Spillable Memory Channel;但是这几种都不能满足我们数据落盘然后异步发送的需求,落盘数据发送不影响正常采集链路的需求;另外我们需要自定义配置化的数据分发,所以我们需要对原生 flume 进行改造。2、数据分析从模型到落地,如何提供比较高效的方式?答:对于数据分析需要经历四个过程:数据的埋点、数据采集、数据开发和数据分析。这个过程中,阻碍我们整个数据分析的效率比较低的是沟通。如果数据分析人员不懂工程,而数据开发工程师不懂业务,就会导致两边沟通起来有障碍。我们在平台元数据做共享,埋点平台有新的任务时自动通知采集平台生成采集任务,这会减少人力沟通成本,提高接入效率。3、关于 Bus 节点,我想了解下你们如何设计 Bus 节点的高可用?答:这里的 Bus 是允许宕机的,如果你的 Bus 挂掉了几台,Agent 自动重连到其他的 Bus 节点。Agent 可以设置在一个时间段内做重连。比如说我们发现后面的 Bus 连接不均衡了,开启 Agent 重连,当监控连接均衡之后关闭掉重连。当然 Agent 也不能一直做重连,因为重连会对性能有影响。更多内容敬请关注 vivo 互联网技术微信公众号注:转载文章请先与微信号:labs2020 联系。