福利-Flink-Forward-Asia-2019-由你决定填问卷送周边

2018 年 12 月,Apache Flink Community China 成功举办了国内首届 Flink Forward China,并在诸多合作伙伴的帮助下成功将其打造为规模最大、参与人数最多的 Flink Forward 大会。 今年,Apache Flink 年度最高规格的盛会即将再次拉开帷幕!Flink Forward China 已正式升级为 Flink Forward Asia,并计划于 11 月底举办,规模将逾2000 人。 参与调研送周边为进一步提高会议质量,让 Apache Flink 社区与开发者更近距离地接触,我们诚挚地邀请您参与会前调研,想听什么议题,想见哪位大佬,想要什么礼品,由你决定!福利活动奉上: 点击文末链接,推荐您的小伙伴填写问卷,并在问卷底部的「问卷推荐人」一栏中正确填入您的姓名,可获取相应周边奖励:1. 推荐1-4人填写:Apache Flink 社区限量版专刊 S2 实体书籍2. 推荐5-10人填写:Apache Flink 社区定制马克杯3. 推荐10人以上填写:Apache Flink 社区定制T恤本次问卷调研福利我们首次送出最新定制的 Apache Flink 社区 T 恤、人气最高的社区定制马克杯以及限量纸质版 Apache Flink 第二季专刊《重新定义计算:Apache Flink 实践》。 社区周边展示Apache Flink 社区周边大饱眼福时间到,看看你最想要哪一款! Apache Flink 定制版T恤,小姐姐同款,Meetup 来撞衫! 用 Apache Flink 旗舰款马克杯,喝水都会更开心! ...

July 10, 2019 · 1 min · jiezi

回顾-Apache-Flink-X-Apache-RocketMQ-上海站PPT下载

7 月 6 日,Apache Flink Meetup X Apache RocketMQ · 上海站,来自阿里巴巴、网易的 Flink 技术专家与 Apache RocketMQ 社区大咖一起分享关于 Flink、RocketMQ 的应用实践与前沿技术。 ▼ PPT 下载 ▼ Apache Flink Meetup X Apache RocketMQ · 上海站,嘉宾分享的PPT下载请在后台回复关键字“0706PPT”领取。 《网易云音乐消息队列改造之路与 Apache Flink 应用实践》 林德智 | 网易云音乐 消息队列负责人 岳猛 | Apache Flink Contributor,网易云音乐 实时计算平台研发工程师 本次分享主要介绍了网易云音乐消息队列基于 RocketMQ 的应用以及在消息队列的基础上深度融合的 Flink 流式处理引擎为云音乐提供的实时计算解决方案,分享了在直播,广告,曲库,内容等取得的应用效果。 云音乐消息队列的历史基于 RocketMQ 改造消息队列部分高级特性与 bug 修复RocketMQ 与 Flink 结合应用实践《万亿级消息及流处理引擎 - Apache RocketMQ 的现在和未来》 ...

July 9, 2019 · 1 min · jiezi

应用案例-从Storm到Flink有赞五年实时计算效率提升实践

作者 | 贺飞 公司介绍:有赞是一个商家服务公司,提供全行业全场景的电商解决方案。在有赞,大量的业务场景依赖对实时数据的处理,作为一类基础技术组件,服务着有赞内部几十个业务产品,几百个实时计算任务,其中包括交易数据大屏,商品实时统计分析,日志平台,调用链,风控等多个业务场景,本文将介绍有赞实时计算当前的发展历程和当前的实时计算技术架构。 1.实时计算在有赞发展从技术栈的角度,我们的选择和大多数互联网公司一致,从早期的 Storm,到 JStorm, Spark Streaming 和最近兴起的 Flink。从发展阶段来说,主要经历了两个阶段,起步阶段和平台化阶段;下面将按照下图中的时间线,介绍实时计算在有赞的发展历程。 1.1 起步阶段这里的的起步阶段的基本特征是,缺少整体的实时计算规划,缺乏平台化任务管理,监控,报警工具,用户提交任务直接通过登录 AG 服务器使用命令行命令提交任务到线上集群,很难满足用户对可用性的要求。但是,在起步阶段里积累了内部大量的实时计算场景。 1.1.1 Storm 登场2014 年初,第一个 Storm 应用在有赞内部开始使用,最初的场景是把实时事件的统计从业务逻辑中解耦出来,Storm 应用通过监听 MySQL 的 binlog 更新事件做实时计算,然后将结果更新到 MySQL 或者 Redis 缓存上,供在线系统使用。类似的场景得到了业务开发的认可,逐渐开始支撑起大量的业务场景。早期,用户通过登录一组线上环境的 AG 服务器,通过 Storm 的客户端向 Storm 集群做提交任务等操作, 这样在 2 年多的时间里,Storm 组件积累了近百个实时应用。Storm 也同样暴露出很多问题,主要体现在系统吞吐上,对吞吐量巨大,但是对延迟不敏感的场景,显得力不从心。 1.1.2 引入 Spark Streaming2016 年末,随着 Spark 技术栈的日益成熟,又因为 Storm 引擎本身在吞吐 / 性能上跟 Spark Streaming 技术栈相比有明显劣势,所以从那时候开始,部分业务团队开始尝试新的流式计算引擎。因为有赞离线计算有大量 Spark 任务的使用经验,Spark Streaming 很自然的成为了第一选择,随着前期业务日志系统和埋点日志系统的实时应用的接入,大量业务方也开始逐渐接入。同 Storm 一样,业务方完成实时计算应任务开发后,通过一组 AG 服务器,使用 Spark 客户端,向大数据 Yarn 集群提交任务。 ...

July 4, 2019 · 2 min · jiezi

回顾-Apache-Flink-19-版本新特性强势预告内含PPT下载链接

6月29日,Apache Flink Meetup 北京站圆满落幕,Apache Flink 1.9 版本是自 Flink 1.0 之后变化最大的版本,社区对 Flink 进行大量重构并且加入了很多新 Feature。此次 Meetup 重点解读 Flink 1.9 版本新特性。 ▼ PPT下载 ▼关注Apache Flink 社区公众号Ververica,回复关键字“0629PPT”即可下载Apache Flink Meetup 北京站全部嘉宾分享的PPT. 本期 Meetup 由 Apache Flink PMC 与 Committer 开场,对 Flink 1.9 版本新特性进行全面分享;阿里巴巴技术专家从 Table API 和算法层面分享 Flink 的机器学习生态;还有 Flink on Kubernetes 、Flink 1.9 版本与 Hive 的兼容性解读,以及超过千台集群、日处理条目超过 264 亿条,处理峰值超过 3.6 千万条 / s 的 Flink 在快手的应用实践。 《Apache Flink 1.9 特性解读》 ...

July 3, 2019 · 2 min · jiezi

Apache-Flink-零基础入门一基础概念解析

作者:陈守元、戴资力 一、Apache Flink 的定义、架构及原理Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计算。 1. Flink Application了解 Flink 应用开发需要先理解 Flink 的 Streams、State、Time 等基础处理语义以及 Flink 兼顾灵活性和方便性的多层次 API。 Streams:流,分为有限数据流与无限数据流,unbounded stream 是有始无终的数据流,即无限数据流;而 bounded stream 是限定大小的有始有终的数据集合,即有限数据流,二者的区别在于无限数据流的数据会随时间的推演而持续增加,计算持续进行且不存在结束的状态,相对的有限数据流数据大小固定,计算最终会完成并处于结束的状态。State,状态是计算过程中的数据信息,在容错恢复和 Checkpoint 中有重要的作用,流计算在本质上是 Incremental Processing,因此需要不断查询保持状态;另外,为了确保 Exactly- once 语义,需要数据能够写入到状态中;而持久化存储,能够保证在整个分布式系统运行失败或者挂掉的情况下做到 Exactly- once,这是状态的另外一个价值。Time,分为 Event time、Ingestion time、Processing time,Flink 的无限数据流是一个持续的过程,时间是我们判断业务状态是否滞后,数据处理是否及时的重要依据。API,API 通常分为三层,由上而下可分为 SQL / Table API、DataStream API、ProcessFunction 三层,API 的表达能力及业务抽象能力都非常强大,但越接近 SQL 层,表达能力会逐步减弱,抽象能力会增强,反之,ProcessFunction 层 API 的表达能力非常强,可以进行多种灵活方便的操作,但抽象能力也相对越小。2.Flink Architecture在架构部分,主要分为以下四点: 第一, Flink 具备统一的框架处理有界和无界两种数据流的能力 第二, 部署灵活,Flink 底层支持多种资源调度器,包括 Yarn、Kubernetes 等。Flink 自身带的 Standalone 的调度器,在部署上也十分灵活。 第三, 极高的可伸缩性,可伸缩性对于分布式系统十分重要,阿里巴巴双11大屏采用 Flink 处理海量数据,使用过程中测得 Flink 峰值可达 17 亿/秒。 ...

July 2, 2019 · 4 min · jiezi

大数据架构如何做到流批一体

阿里妹导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值。这也是当下许多企业,在大数据上深耕的原因。大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发展?今天,我们都会一一解读,并介绍如何结合云上存储、计算组件,实现更优的通用大数据架构模式,以及该模式可以涵盖的典型数据处理场景。大数据处理的挑战现在已经有越来越多的行业和技术领域需求大数据分析系统,例如金融行业需要使用大数据系统结合 VaR(value at risk) 或者机器学习方案进行信贷风控,零售、餐饮行业需要大数据系统实现辅助销售决策,各种 IOT 场景需要大数据系统持续聚合和分析时序数据,各大科技公司需要建立大数据分析中台等等。 抽象来看,支撑这些场景需求的分析系统,面临大致相同的技术挑战: 业务分析的数据范围横跨实时数据和历史数据,既需要低延迟的实时数据分析,也需要对 PB 级的历史数据进行探索性的数据分析;可靠性和可扩展性问题,用户可能会存储海量的历史数据,同时数据规模有持续增长的趋势,需要引入分布式存储系统来满足可靠性和可扩展性需求,同时保证成本可控;技术栈深,需要组合流式组件、存储系统、计算组件和;可运维性要求高,复杂的大数据架构难以维护和管控;简述大数据架构发展Lambda 架构 Lambda 架构是目前影响最深刻的大数据处理架构,它的核心思想是将不可变的数据以追加的方式并行写到批和流处理系统内,随后将相同的计算逻辑分别在流和批系统中实现,并且在查询阶段合并流和批的计算视图并展示给用户。Lambda的提出者 Nathan Marz 还假定了批处理相对简单不易出现错误,而流处理相对不太可靠,因此流处理器可以使用近似算法,快速产生对视图的近似更新,而批处理系统会采用较慢的精确算法,产生相同视图的校正版本。 Lambda架构典型数据流程是(http://lambda-architecture.net/)): 所有的数据需要分别写入批处理层和流处理层;批处理层两个职责:(i)管理 master dataset (存储不可变、追加写的全量数据),(ii)预计算batch view;服务层对 batch view 建立索引,以支持低延迟、ad-hoc 方式查询 view;流计算层作为速度层,对实时数据计算近似的 real-time view,作为高延迟batch view 的补偿快速视图;所有的查询需要合并 batch view 和 real-time view;Lambda 架构设计推广了在不可变的事件流上生成视图,并且可以在必要时重新处理事件的原则,该原则保证了系统随需求演进时,始终可以创建相应的新视图出来,切实可行地满足了不断变化的历史数据和实时数据分析需求。 Lambda 架构的四个挑战 Lambda 架构非常复杂,在数据写入、存储、对接计算组件以及展示层都有复杂的子课题需要优化: 写入层上,Lambda 没有对数据写入进行抽象,而是将双写流批系统的一致性问题反推给了写入数据的上层应用;存储上,以 HDFS 为代表的master dataset 不支持数据更新,持续更新的数据源只能以定期拷贝全量 snapshot 到 HDFS 的方式保持数据更新,数据延迟和成本比较大;计算逻辑需要分别在流批框架中实现和运行,而在类似 Storm 的流计算框架和Hadoop MR 的批处理框架做 job 开发、调试、问题调查都是比较复杂的;结果视图需要支持低延迟的查询分析,通常还需要将数据派生到列存分析系统,并保证成本可控。流批融合的 Lambda 架构 针对 Lambda 架构的问题3,计算逻辑需要分别在流批框架中实现和运行的问题,不少计算引擎已经开始往流批统一的方向去发展,例如 Spark 和 Flink,从而简化lambda 架构中的计算部分。实现流批统一通常需要支持: ...

July 2, 2019 · 2 min · jiezi

用Flink取代Spark-Streaming知乎实时数仓架构演进

作者 | 知乎数据工程团队 “数据智能” (Data Intelligence) 有一个必须且基础的环节,就是数据仓库的建设,同时,数据仓库也是公司数据发展到一定规模后必然会提供的一种基础服务。从智能商业的角度来讲,数据的结果代表了用户的反馈,获取结果的及时性就显得尤为重要,快速的获取数据反馈能够帮助公司更快的做出决策,更好的进行产品迭代,实时数仓在这一过程中起到了不可替代的作用。 本文主要讲述知乎的实时数仓实践以及架构的演进,这包括以下几个方面: 实时数仓 1.0 版本,主题:ETL 逻辑实时化,技术方案:Spark Streaming。实时数仓 2.0 版本,主题:数据分层,指标计算实时化,技术方案:Flink Streaming。实时数仓未来展望:Streaming SQL 平台化,元信息管理系统化,结果验收自动化。实时数仓 1.0 版本1.0 版本的实时数仓主要是对流量数据做实时 ETL,并不计算实时指标,也未建立起实时数仓体系,实时场景比较单一,对实时数据流的处理主要是为了提升数据平台的服务能力。实时数据的处理向上依赖数据的收集,向下关系到数据的查询和可视化,下图是实时数仓 1.0 版本的整体数据架构图。 第一部分是数据采集,由三端 SDK 采集数据并通过 Log Collector Server 发送到 Kafka。第二部分是数据 ETL,主要完成对原始数据的清洗和加工并分实时和离线导入 Druid。第三部分是数据可视化,由 Druid 负责计算指标并通过 Web Server 配合前端完成数据可视化。 其中第一、三部分的相关内容请分别参考:知乎客户端埋点流程、模型和平台技术,Druid 与知乎数据分析平台,此处我们详细介绍第二部分。由于实时数据流的稳定性不如离线数据流,当实时流出现问题后需要离线数据重刷历史数据,因此实时处理部分我们采用了 lambda 架构。 Lambda 架构有高容错、低延时和可扩展的特点,为了实现这一设计,我们将 ETL 工作分为两部分:Streaming ETL 和 Batch ETL。 Streaming ETL这一部分我会介绍实时计算框架的选择、数据正确性的保证、以及 Streaming 中一些通用的 ETL 逻辑,最后还会介绍 Spark Streaming 在实时 ETL 中的稳定性实践。 计算框架选择在 2016 年年初,业界用的比较多的实时计算框架有 Storm 和 Spark Streaming。Storm 是纯流式框架,Spark Streaming 用 Micro Batch 模拟流式计算,前者比后者更实时,后者比前者吞吐量大且生态系统更完善,考虑到知乎的日志量以及初期对实时性的要求,我们选择了 Spark Streaming 作为实时数据的处理框架。 ...

June 27, 2019 · 3 min · jiezi

原理解析-深入了解-Apache-Flink-的网络协议栈

作者:Nico Kruber 翻译:曹英杰 Flink 的网络协议栈是组成 flink-runtime 模块的核心组件之一,是每个 Flink 作业的核心。它连接所有 TaskManager 的各个子任务(Subtask),因此,对于 Flink 作业的性能包括吞吐与延迟都至关重要。与 TaskManager 和 JobManager 之间通过基于 Akka 的 RPC 通信的控制通道不同,TaskManager 之间的网络协议栈依赖于更加底层的 Netty API。 本文将首先介绍 Flink 暴露给流算子(Stream operator)的高层抽象,然后详细介绍 Flink 网络协议栈的物理实现和各种优化、优化的效果以及 Flink 在吞吐量和延迟之间的权衡。 1.逻辑视图Flink 的网络协议栈为彼此通信的子任务提供以下逻辑视图,例如在 A 通过 keyBy() 操作进行数据 Shuffle : 这一过程建立在以下三种基本概念的基础上: ▼ 子任务输出类型(ResultPartitionType):Pipelined(有限的或无限的):一旦产生数据就可以持续向下游发送有限数据流或无限数据流。Blocking:仅在生成完整结果后向下游发送数据。 ▼ 调度策略:同时调度所有任务(Eager):同时部署作业的所有子任务(用于流作业)。上游产生第一条记录部署下游(Lazy):一旦任何生产者生成任何输出,就立即部署下游任务。上游产生完整数据部署下游:当任何或所有生产者生成完整数据后,部署下游任务。 ▼ 数据传输:高吞吐:Flink 不是一个一个地发送每条记录,而是将若干记录缓冲到其网络缓冲区中并一次性发送它们。这降低了每条记录的发送成本因此提高了吞吐量。低延迟:当网络缓冲区超过一定的时间未被填满时会触发超时发送,通过减小超时时间,可以通过牺牲一定的吞吐来获取更低的延迟。 我们将在下面深入 Flink 网络协议栈的物理实现时看到关于吞吐延迟的优化。对于这一部分,让我们详细说明输出类型与调度策略。首先,需要知道的是子任务的输出类型和调度策略是紧密关联的,只有两者的一些特定组合才是有效的。 Pipelined 结果是流式输出,需要目标 Subtask 正在运行以便接收数据。因此需要在上游 Task 产生数据之前或者产生第一条数据的时候调度下游目标 Task 运行。批处理作业生成有界结果数据,而流式处理作业产生无限结果数据。 批处理作业也可能以阻塞方式产生结果,具体取决于所使用的算子和连接模式。在这种情况下,必须等待上游 Task 先生成完整的结果,然后才能调度下游的接收 Task 运行。这能够提高批处理作业的效率并且占用更少的资源。 下表总结了 Task 输出类型以及调度策略的有效组合: ...

June 26, 2019 · 3 min · jiezi

原理解析-深入了解-Apache-Flink-的网络协议栈

作者:Nico Kruber 翻译:曹英杰 Flink 的网络协议栈是组成 flink-runtime 模块的核心组件之一,是每个 Flink 作业的核心。它连接所有 TaskManager 的各个子任务(Subtask),因此,对于 Flink 作业的性能包括吞吐与延迟都至关重要。与 TaskManager 和 JobManager 之间通过基于 Akka 的 RPC 通信的控制通道不同,TaskManager 之间的网络协议栈依赖于更加底层的 Netty API。 本文将首先介绍 Flink 暴露给流算子(Stream operator)的高层抽象,然后详细介绍 Flink 网络协议栈的物理实现和各种优化、优化的效果以及 Flink 在吞吐量和延迟之间的权衡。 1.逻辑视图Flink 的网络协议栈为彼此通信的子任务提供以下逻辑视图,例如在 A 通过 keyBy() 操作进行数据 Shuffle : 这一过程建立在以下三种基本概念的基础上: ▼ 子任务输出类型(ResultPartitionType):Pipelined(有限的或无限的):一旦产生数据就可以持续向下游发送有限数据流或无限数据流。Blocking:仅在生成完整结果后向下游发送数据。 ▼ 调度策略:同时调度所有任务(Eager):同时部署作业的所有子任务(用于流作业)。上游产生第一条记录部署下游(Lazy):一旦任何生产者生成任何输出,就立即部署下游任务。上游产生完整数据部署下游:当任何或所有生产者生成完整数据后,部署下游任务。 ▼ 数据传输:高吞吐:Flink 不是一个一个地发送每条记录,而是将若干记录缓冲到其网络缓冲区中并一次性发送它们。这降低了每条记录的发送成本因此提高了吞吐量。低延迟:当网络缓冲区超过一定的时间未被填满时会触发超时发送,通过减小超时时间,可以通过牺牲一定的吞吐来获取更低的延迟。 我们将在下面深入 Flink 网络协议栈的物理实现时看到关于吞吐延迟的优化。对于这一部分,让我们详细说明输出类型与调度策略。首先,需要知道的是子任务的输出类型和调度策略是紧密关联的,只有两者的一些特定组合才是有效的。 Pipelined 结果是流式输出,需要目标 Subtask 正在运行以便接收数据。因此需要在上游 Task 产生数据之前或者产生第一条数据的时候调度下游目标 Task 运行。批处理作业生成有界结果数据,而流式处理作业产生无限结果数据。 批处理作业也可能以阻塞方式产生结果,具体取决于所使用的算子和连接模式。在这种情况下,必须等待上游 Task 先生成完整的结果,然后才能调度下游的接收 Task 运行。这能够提高批处理作业的效率并且占用更少的资源。 下表总结了 Task 输出类型以及调度策略的有效组合: ...

June 25, 2019 · 3 min · jiezi

揭秘每秒千万级的实时数据处理是怎么实现的

1、设计背景闲鱼目前实际生产部署环境越来越复杂,横向依赖各种服务盘宗错节,纵向依赖的运行环境也越来越复杂。当服务出现问题的时候,能否及时在海量的数据中定位到问题根因,成为考验闲鱼服务能力的一个严峻挑战。 线上出现问题时常常需要十多分钟,甚至更长时间才能找到问题原因,因此一个能够快速进行自动诊断的系统需求就应用而生,而快速诊断的基础是一个高性能的实时数据处理系统。 这个实时数据处理系统需要具备如下的能力: 1、数据实时采集、实时分析、复杂计算、分析结果持久化。2、可以处理多种多样的数据。包含应用日志、主机性能监控指标、调用链路图。3、高可靠性。系统不出问题且数据不能丢。4、高性能,底延时。数据处理的延时不超过3秒,支持每秒千万级的数据处理。 本文不涉及问题自动诊断的具体分析模型,只讨论整体实时数据处理链路的设计。 2、输入输出定义为了便于理解系统的运转,我们定义该系统整体输入和输出如下:  输入: 服务请求日志(包含traceid、时间戳、客户端ip、服务端ip、耗时、返回码、服务名、方法名)        环境监控数据(指标名称、ip、时间戳、指标值)。比如cpu、 jvm gc次数、jvm gc耗时、数据库指标。 输出: 一段时间内的某个服务出现错误的根因,每个服务的错误分析结果用一张有向无环图表达。(根节点即是被分析的错误节点,叶子节点即是错误根因节点。叶子节点可能是一个外部依赖的服务错误也可能是jvm异常等等)。 3、架构设计 在实际的系统运行过程中,随着时间的推移,日志数据以及监控数据是源源不断的在产生的。每条产生的数据都有一个自己的时间戳。而实时传输这些带有时间戳的数据就像水在不同的管道中流动一样。 如果把源源不断的实时数据比作流水,那数据处理过程和自来水生产的过程也是类似的: 自然地,我们也将实时数据的处理过程分解成采集、传输、预处理、计算、存储几个阶段。 整体的系统架构设计如下: 采集采用阿里自研的sls日志服务产品(包含logtail+loghub组件),logtail是采集客户端,之所以选择logtail是因为其优秀的性能、高可靠性以及其灵活插件扩展机制,闲鱼可以定制自己的采集插件实现各种各样数据的实时采集。 传输loghub可以理解为一个数据发布订阅组件,和kafka的功能类似,作为一个数据传输通道其更稳定、更安全,详细对比文章参考:https://yq.aliyun.com/articles/35979?spm=5176.10695662.1996646101.searchclickresult.6f2c7fbe6g3xgP 预处理实时数据预处理部分采用blink流计算处理组件(开源版本叫做flink,blink是阿里在flink基础上的内部增强版本)。目前常用的实时流计算开源产品有Jstorm、SparkStream、Flink。Jstorm由于没有中间计算状态的,其计算过程中需要的中间结果必然依赖于外部存储,这样会导致频繁的io影响其性能;SparkStream本质上是用微小的批处理来模拟实时计算,实际上还是有一定延时;Flink由于其出色的状态管理机制保证其计算的性能以及实时性,同时提供了完备SQL表达,使得流计算更容易。  计算与持久化数据经过预处理后最终生成调用链路聚合日志和主机监控数据,其中主机监控数据会独立存储在tsdb时序数据库中,供后续统计分析。tsdb由于其针对时间指标数据的特别存储结构设计,非常适合做时序数据的存储与查询。调用链路日志聚合数据,提供给cep/graph service做诊断模型分析。cep/graph service是闲鱼自研的一个应用,实现模型分析、复杂的数据处理以及外部服务进行交互,同时借助rdb实现图数据的实时聚合。 最后cep/graph service分析的结果作为一个图数据,实时转储在lindorm中提供在线查询。lindorm可以看作是增强版的hbase,在系统中充当持久化存储的角色。 4、设计细节与性能优化采集日志和指标数据采集使用logtail,整个数据采集过程如图: 其提供了非常灵活的插件机制,共有四种类型的插件: inputs: 输入插件,获取数据。processors: 处理插件,对得到的数据进行处理。aggregators: 聚合插件,对数据进行聚合。flushers: 输出插件,将数据输出到指定 sink。由于指标数据(比如cpu、内存、jvm指标)的获取需要调用本地机器上的服务接口获取,因此应尽量减少请求次数,在logtail中,一个input占用一个goroutine。闲鱼通过定制input插件和processors插件,将多个指标数据(比如cpu、内存、jvm指标)在一个input插件中通过一次服务请求获取(指标获取接口由基础监控团队提供),并将其格式化成一个json数组对象,在processors插件中再拆分成多条数据,以减少系统的io次数同时提升性能。 传输数据传输使用LogHub,logtail写入数据后直接由blink消费其中的数据,只需设置合理的分区数量即可。分区数要大于等于bink读取任务的并发数,避免blink中的任务空转。 预处理预处理主要采用bink实现,主要的设计和优化点: 1:编写高效的计算流程blink是一个有状态的流计算框架,非常适合做实时聚合、join等操作。在我们的应用中只需要关注出现错误的的请求上相关服务链路的调用情况,因此整个日志处理流分成两个流:a、服务的请求入口日志作为一个单独的流来处理,筛选出请求出错的数据。b、其他中间链路的调用日志作为另一个独立的流来处理,通过和上面的流join on traceid实现出错服务依赖的请求数据塞选。  如上图所示通过双流join后,输出的就是所有发生请求错误相关链路的完整数据。 2:设置合理的state生存周期blink在做join的时候本质上是通过state缓存中间数据状态,然后做数据的匹配。而如果state的生命周期太长会导致数据膨胀影响性能,如果state的生命周期太短就会无法正常关联出部分延迟到来的数据,所以需要合理的配置state生存周期,对于该应用允许最大数据延迟为1分钟。 使用niagara作为statebackend,以及设定state数据生命周期,单位毫秒state.backend.type=niagarastate.backend.niagara.ttl.ms=600003:开启 MicroBatch/MiniBatchMicroBatch 和 MiniBatch 都是微批处理,只是微批的触发机制上略有不同。原理上都是缓存一定的数据后再触发处理,以减少对 state 的访问从而显著提升吞吐,以及减少输出数据量。 开启joinblink.miniBatch.join.enabled=true使用 microbatch 时需要保留以下两个 minibatch 配置blink.miniBatch.allowLatencyMs=5000防止OOM,每个批次最多缓存多少条数据blink.miniBatch.size=200004:动态负载使用 Dynamic-Rebalance 替代 Rebalanceblink任务在运行是最忌讳的就是存在计算热点,为保证数据均匀使用Dynamic Rebalance,它可以根据当前各subpartition中堆积的buffer的数量,选择负载较轻的subpartition进行写入,从而实现动态的负载均衡。相比于静态的rebalance策略,在下游各任务计算能力不均衡时,可以使各任务相对负载更加均衡,从而提高整个作业的性能。 开启动态负载task.dynamic.rebalance.enabled=true5:自定义输出插件数据关联后需要将统一请求链路上的数据作为一个数据包通知下游图分析节点,传统的方式的是通过消息服务来投递数据。但是通过消息服务有两个缺点:1、其吞吐量和rdb这种内存数据库相比比还是较大差距(大概差一个数量级)。2、在接受端还需要根据traceid做数据关联。 ...

June 21, 2019 · 1 min · jiezi

从-Spark-Streaming-到-Apache-Flink-实时数据流在爱奇艺的演进

本文将为大家介绍Apache Flink在爱奇艺的生产与实践过程。你可以借此了解到爱奇艺引入Apache Flink的背景与挑战,以及平台构建化流程。主要内容如下: 爱奇艺在实时计算方面的的演化和遇到的一些挑战爱奇艺使用Flink的User Case爱奇艺Flink平台化构建流程爱奇艺在Flink上的改进未来工作爱奇艺简介 爱奇艺在2010年正式上线,于2018年3月份在纳斯达克上市。我们拥有规模庞大且高度活跃的用户基础,月活跃用户数5.65亿人,在在线视频领域名列第一。在移动端,爱奇艺月度总有效时长59.08亿小时,稳居中国APP榜第三名。 一、爱奇艺在实时计算方面的演化和遇到的一些挑战1. 实时计算在爱奇艺的演化过程 实时计算是基于一些实时到达、速率不可控、到达次序独立不保证顺序、一经处理无法重放除非特意保存的无序时间序列的数据的在线计算。 因此,在实时计算中,会遇到数据乱序、数据延时、事件时间与处理时间不一致等问题。爱奇艺的峰值事件数达到1100万/秒,在正确性、容错、性能、延迟、吞吐量、扩展性等方面均遇到不小的挑战。 爱奇艺从2013年开始小规模使用storm,部署了3个独立集群。在2015年,开始引入Spark Streaming,部署在YARN上。在2016年,将Spark Streaming平台化,构建流计算平台,降低用户使用成本,之后流计算开始在爱奇艺大规模使用。在2017年,因为Spark Streaming的先天缺陷,引入Flink,部署在独立集群和YARN上。在2018年,构建Streaming SQL与实时分析平台,进一步降低用户使用门槛。 2. 从Spark Streaming到Apache Flink 爱奇艺主要使用的是Spark Streaming和Flink来进行流式计算。Spark Streaming的实现非常简单,通过微批次将实时数据拆成一个个批处理任务,通过批处理的方式完成各个子Batch。Spark Streaming的API也非常简单灵活,既可以用DStream的java/scala API,也可以使用SQL定义处理逻辑。但Spark Streaming受限于微批次处理模型,业务方需要完成一个真正意义上的实时计算会非常困难,比如基于数据事件时间、数据晚到后的处理,都得用户进行大量编程实现。爱奇艺这边大量使用Spark Streaming的场景往往都在于实时数据的采集落盘。 Apache Flink框架的实时计算模型是基于Dataflow Model实现的,完全支持Dataflow Model的四个问题:What,支持定义DAG图;Where:定义各类窗口(固定窗口、滑动窗口和Session窗口);When:支持灵活定义计算触发时间;How:支持丰富的Function定义数据更新模式。和Spark Streaming一样,Flink支持分层API,支持DataStream API,Process Function,SQL。Flink最大特点在于其实时计算的正确性保证:Exactly once,原生支持事件时间,支持延时数据处理。由于Flink本身基于原生数据流计算,可以达到毫秒级低延时。 在爱奇艺实测下来,相比Spark Streaming,Apache Flink在相近的吞吐量上,有更低的延时,更好的实时计算表述能力,原生实时事件时间、延时数据处理等。 二、在爱奇艺使用Flink的一些案例下面通过三个Use Case来介绍一下,爱奇艺具体是怎么使用Flink的,包括海量数据实时ETL,实时风控,分布式调用链分析。 1. 海量数据实时ETL 在爱奇艺这边所有用户在端上的任何行为都会发一条日志到nginx服务器上,总量超过千万QPS。对于具体某个业务来说,他们后续做实时分析,只希望访问到业务自身的数据,于是这中间就涉及一个数据拆分的工作。 在引入Flink之前,最早的数据拆分逻辑是这样子的,在Ngnix机器上通过“tail -f /xxx/ngnix.log | grep "xxx"”的方式,配置了无数条这样的规则,将这些不同的数据按照不同的规则,打到不同的业务kafka中。但这样的规则随着业务线的规模的扩大,这个tail进程越来越多,逐渐遇到了服务器性能瓶颈。 于是,我们就有了这样一个设想,希望通过实时流计算将数据拆分到各个业务kafka。具体来说,就是Nginx上的全量数据,全量采集到一级Kafka,通过实时ETL程序,按需将数据采集到各个业务Kafka中。当时,爱奇艺主的实时流计算基本均是基于Spark Streaming的,但考虑到Spark Streaming延迟相对来说比较高,爱奇艺从这个case展开开始推进Apache Flink的应用。 海量数据实时ETL的具体实现,主要有以下几个步骤: 解码:各个端的投递日志格式不统一,需要首先将各个端的日志按照各种解码方式解析成规范化的格式,这边选用的是JSON风控:实时拆分这边的数据都会过一下风控的规则,过滤掉很大一部分刷量日志。由于量级太高,如果将每条日志都过一下风控规则,延时会非常大。这边做了几个优化,首先,将用户数据通过DeviceID拆分,不同的DeviceID拆分到不同的task manager上,每个task manager用本地内存做一级缓存,将redis和flink部署在一起,用本地redis做二级缓存。最终的效果是,每秒redis访问降到了平均4k,实时拆分的P99延时小于500ms。拆分:按照各个业务进行拆分采样、再过滤:根据每个业务的拆分过程中根据用户的需求不同,有采样、再过滤等过程 2. 实时风控 防机器撞库盗号攻击是安全风控的一个常见需求,主要需求集中于事中和事后。在事中,进行超高频异常检测分析,过滤用户异常行为;在事后,生成IP和设备ID的黑名单,供各业务实时分析时进行防刷使用。 ...

June 21, 2019 · 1 min · jiezi

蚂蚁金服首席架构师何昌华开源SQLFlow是牛刀初试实时大数据系统才是未来基石

开源 SQLFlow,反哺业界,同时小小秀出 AI 肌肉。 这就是蚂蚁金服近日开源首个将 SQL 应用于 AI 引擎项目 SQLFlow 后,业界给出的反应。 SQLFlow,把艰深的 AI 与简单的 SQL 结合起来,大大简化了数据工程师使用 AI 技术的门槛。 而研发出 SQLFlow 的,正是蚂蚁金服计算存储首席架构师何昌华带领下的 AI Infra 团队。 何昌华斯坦福博士毕业,先在 Google 总部工作 7 年,赢得过公司最高技术奖项,其后又在独角兽 Airbnb 工作 2 年,负责后台系统的应用架构。 2017 年 5 月,他正式加盟蚂蚁金服,担任计算存储首席架构师,并在 2018 年入选了第 14 批国家“千人计划”专家。 在蚂蚁金服,何昌华的工作是开发新一代计算引擎,搭建金融型数据智能平台。 而 SQLFlow,就是计算引擎主线上的结晶之一。 不过对何昌华来说,世界正在巨变,他还要带队探索一些没人做成的事情。 比如全实时的大数据智能系统。 未来技术基石大数据的概念,最早来自于搜索引擎行业,因为搜索引擎面对的是人类在互联网上留下的爆炸性增长的庞大数据。 2010 年底,谷歌宣布新一代搜索引擎“咖啡因”正式上线,这项技术的革命性在于,任何时刻,世界上的任何网页发生了变化,都可以实时地添加到索引中,用户也可以实时地搜索到,解决了传统搜索引擎的延时问题。 何昌华当时正是咖啡因开发团队的核心技术负责人之一。 他解释,“咖啡因所实现的最核心的功能,就是实时。” 而现在何昌华在蚂蚁金服工作的目标,同样是搭建一个“完全实时”的大数据处理系统,或称之为大数据智能平台。由于线下生活场景的多样性和复杂性,这是个比构建实时搜索更有挑战性的任务。 他认为,这将成为未来技术的基石。 对于计算机来说,实时就是在发出请求到返回响应之间的延迟尽量小,对于大数据处理系统来说,这还意味着从数据生产到消费的延迟尽可能低,所有这些都意味着计算速度和能力的提升。 此前常用的大数据计算模型 MapReduce,对数据的处理是“分片式”的,数据的片与片之间有边界的概念,这种批处理的模式不可避免地会带来延时问题。 以搜索的场景为例,假如以天为时间单位对数据进行批处理,那就意味着今天更新的网页,用户明天才能搜索到,调高处理的频率可以部分解决问题,一天两次、一天四次、两小时一次…… 虽然能逐步接近“准实时”,但成本也会急剧上升。 要实现真正的实时,就必须打破这种批处理的边界,让数据处理的过程像水流一样,随来随算,随时反馈。 这也催生了后来流式计算引擎的蓬勃发展。 而在何昌华看来,除了快,“实时系统”还有两层重要含义。 第一是 OLTP(联机事务处理)和 OLAP(联机分析处理)的融合。 在以往的观念里,OLTP 对实时性的要求高,OLAP 对时效性的要求不那么高。 ...

June 4, 2019 · 1 min · jiezi

基于大数据的舆情分析系统架构-架构篇

前言互联网的飞速发展促进了很多新媒体的发展,不论是知名的大V,明星还是围观群众都可以通过手机在微博,朋友圈或者点评网站上发表状态,分享自己的所见所想,使得“人人都有了麦克风”。不论是热点新闻还是娱乐八卦,传播速度远超我们的想象。可以在短短数分钟内,有数万计转发,数百万的阅读。如此海量的信息可以得到爆炸式的传播,如何能够实时的把握民情并作出对应的处理对很多企业来说都是至关重要的。大数据时代,除了媒体信息以外,商品在各类电商平台的订单量,用户的购买评论也都对后续的消费者产生很大的影响。商家的产品设计者需要汇总统计和分析各类平台的数据做为依据,决定后续的产品发展,公司的公关和市场部门也需要根据舆情作出相应的及时处理,而这一切也意味着传统的舆情系统升级成为大数据舆情采集和分析系统。分析完舆情场景后,我们再来具体细化看下大数据舆情系统,对我们的数据存储和计算系统提出哪些需求: 海量原始数据的实时入库:为了实现一整套舆情系统,需要有上游原始输出的采集,也就是爬虫系统。爬虫需要采集各类门户,自媒体的网页内容。在抓取前需要去重,抓取后还需要分析提取,例如进行子网页的抓取。原始网页数据的处理:不论是主流门户还是自媒体的网页信息,抓取后我们需要做一定的数据提取,把原始的网页内容转化为结构化数据,例如文章的标题,摘要等,如果是商品点评类消息也需要提取有效的点评。结构化数据的舆情分析:当各类原始输出变成结构化的数据后,我们需要有一个实时的计算产品把各类输出做合理的分类,进一步对分类后的内容进行情感打标。根据业务的需求这里可能会产生不同的输出,例如品牌当下是否有热点话题,舆情影响力分析,转播路径分析,参与用户统计和画像,舆论情感分析或者是否有重大预警。舆情分析系统中间和结果数据的存储,交互分析查询:从网页原始数据清洗到最终的舆情报表这中间会产生很多类型的数据。这些数据有的会提供给数据分析同学进行舆情分析系统的调优,有的数据会提供给业务部门根据舆情结果进行决策。这些查询可能会很灵活,需要我们的存储系统具备全文检索,多字段组合灵活的交互分析能力。重大舆情事件的实时预警:对于舆情的结果除了正常的搜索和展示需求以外,当有重大事件出现我们需要能做到实时的预警。我们计划分两篇介绍完整的舆情新架构,第一篇主要是提供架构设计,会先介绍时下主流的大数据计算架构,并分析一些优缺点,然后引入舆情大数据架构。第二篇会有完整的数据库表设计和部分示例代码。大家敬请期待。 系统设计需求分析 结合文章开头对舆情系统的描述,海量大数据舆情分析系统流程图大体如下: 原始网页存储库,这个库需要能支持海量数据,低成本,低延时写入。网页数据写入后,要做实时结构化提取,提取出来的数据再进行降噪,分词,图片ocr处理等。对分词文本,图片进行情感识别产生舆情数据结果集。传统的离线全量计算很难满足舆情系统的时效性需求。计算引擎在做数据处理时,可能还需要从存储库中获取一些元数据,例如用户信息,情感词元数据信息等。除了实时的计算链路,对存量数据定期要做一些聚类,优化我们的情感词识别库,或者上游根据业务需要触发情感处理规则更新,根据新的情感打标库对存量数据做一次舆情计算。舆情的结果数据集有不同类的使用需求。对于重大舆情,需要做实时的预警。完整的舆情结果数据展示层需要支持全文检索,灵活的属性字段组合查询。业务上可能根据属性字段中的置信度,舆情时间,或者关键词组合进行分析。根据前面的介绍,舆情大数据分析系统需要两类计算,一类是实时计算包括海量网页内容实时抽取,情感词分析并进行网页舆情结果存储。另一类是离线计算,系统需要对历史数据进行回溯,结合人工标注等方式优化情感词库,对一些实时计算的结果进行矫正等。所以在系统设计上,需要选择一套既可以做实时计算又能做批量离线计算的系统。在开源大数据解决方案中,Lambda架构恰好可以满足这些需求,下面我们来介绍下Lambda的架构。 Lambda架构 (wiki) Lambda架构可以说是Hadoop,Spark体系下最火的大数据架构。这套架构的最大优势就是在支持海量数据批量计算处理(也就是离线处理)同时也支持流式的实时处理(即热数据处理)。具体是如何实现的呢,首先上游一般是一个队列服务例如kafka,实时存储数据的写入。kafka队列会有两个订阅者,一个是全量数据即图片中上半部分,全量数据会被存储在类似HDFS这样的存储介质上。当有离线计算任务到来,计算资源(例如Hadoop)会访问存储系统上的全量数据,进行全量批计算的处理逻辑。经过map/reduce环节后全量的结果会被写入一个结构化的存储引擎例如Hbase中,提供给业务方查询。队列的另一个消费订阅方是流计算引擎,流计算引擎往往会实时的消费队列中的数据进行计算处理,例如Spark Streaming实时订阅Kafka的数据,流计算结果也会写入一个结构化数据引擎。批量计算和流计算的结果写入的结构化存储引擎即上图标注3的"Serving Layer",这一层主要提供结果数据的展示和查询。在这套架构中,批量计算的特点是需要支持处理海量的数据,并根据业务的需求,关联一些其他业务指标进行计算。批量计算的好处是计算逻辑可以根据业务需求灵活调整,同时计算结果可以反复重算,同样的计算逻辑多次计算结果不会改变。批量计算的缺点是计算周期相对较长,很难满足实时出结果的需求,所以随着大数据计算的演进,提出了实时计算的需求。实时计算在Lambda架构中是通过实时数据流来实现,相比批处理,数据增量流的处理方式决定了数据往往是最近新产生的数据,也就是热数据。正因为热数据这一特点,流计算可以满足业务对计算的低延时需求,例如在舆情分析系统中,我们往往希望舆情信息可以在网页抓取下来后,分钟级别拿到计算结果,给业务方充足的时间进行舆情反馈。下面我们就来具体看一下,基于Lambda架构的思想如何实现一套完整的舆情大数据架构。 开源舆情大数据方案通过这个流程图,让我们了解了整个舆情系统的建设过程中,需要经过不同的存储和计算系统。对数据的组织和查询有不同的需求。在业界基于开源的大数据系统并结合Lambda架构,整套系统可以设计如下: 系统的最上游是分布式的爬虫引擎,根据抓取任务抓取订阅的网页原文内容。爬虫会把抓取到的网页内容实时写入Kafka队列,进入Kafka队列的数据根据前面描述的计算需求,会实时流入流计算引擎(例如Spark或者Flink),也会持久化存储在Hbase,进行全量数据的存储。全量网页的存储可以满足网页爬取去重,批量离线计算的需求。流计算会对原始网页进行结构化提取,将非结构化网页内容转化为结构数据并进行分词,例如提取出网页的标题,作者,摘要等,对正文和摘要内容进行分词。提取和分词结果会写回Hbase。结构化提取和分词后,流计算引擎会结合情感词库进行网页情感分析,判断是否有舆情产生。流计算引擎分析的舆情结果存储Mysql或者Hbase数据库中,为了方便结果集的搜索查看,需要把数据同步到一个搜索引擎例如Elasticsearch,方便进行属性字段的组合查询。如果是重大的舆情时间,需要写入Kafka队列触发舆情报警。全量的结构化数据会定期通过Spark系统进行离线计算,更新情感词库或者接受新的计算策略重新计算历史数据修正实时计算的结果。开源架构分析上面的舆情大数据架构,通过Kafka对接流计算,Hbase对接批计算来实现Lambda架构中的“batch view”和“real-time view”,整套架构还是比较清晰的,可以很好的满足在线和离线两类计算需求。但是把这一套系统应用在生产并不是一件容易的事情,主要有下面一些原因。 整套架构涉及到非常多的存储和计算系统包括:Kafka,Hbase,Spark,Flink,Elasticsearch。数据会在不同的存储和计算系统中流动,运维好整套架构中的每一个开源产品都是一个很大的挑战。任何一个产品或者是产品间的通道出现故障,对整个舆情分析结果的时效性都会产生影响。为了实现批计算和流计算,原始的网页需要分别存储在Kafka和Hbase中,离线计算是消费hbase中的数据,流计算消费Kafka的数据,这样会带来存储资源的冗余,同时也导致需要维护两套计算逻辑,计算代码开发和维护成本也会上升。舆情的计算结果存储在Mysql或者Hbase,为了丰富组合查询语句,需要把数据同步构建到Elasticsearch中。查询的时候可能需要组合Mysql和Elasticsearch的查询结果。这里没有跳过数据库,直接把结果数据写入Elasticsearch这类搜索系统,是因为搜索系统的数据实时写入能力和数据可靠性不如数据库,业界通常是把数据库和搜索系统整合,整合下的系统兼备了数据库和搜索系统的优势,但是两个引擎之间数据的同步和跨系统查询对运维和开发带来很多额外的成本。新的大数据架构Lambda plus通过前面的分析,相信大家都会有一个疑问,有没有简化的的大数据架构,在可以满足Lambda对计算需求的假设,又能减少存储计算以及模块的个数呢。Linkedin的Jay Kreps提出了Kappa架构,关于Lambda和Kappa的对比可以参考"云上大数据方案"这篇,这里不展开详细对比,简单说下,Kappa为了简化两份存储,取消了全量的数据存储库,通过在Kafka保留更长日志,当有回溯重新计算需求到来时,重新从队列的头部开始订阅数据,再一次用流的方式处理Kafka队列中保存的所有数据。这样设计的好处是解决了需要维护两份存储和两套计算逻辑的痛点,美中不足的地方是队列可以保留的历史数据毕竟有限,难以做到无时间限制的回溯。分析到这里,我们沿着Kappa针对Lambda的改进思路,向前多思考一些:假如有一个存储引擎,既满足数据库可以高效的写入和随机查询,又能像队列服务,满足先进先出,是不是就可以把Lambda和Kappa架构揉合在一起,打造一个Lambda plus架构呢?新架构在Lambda的基础上可以提升以下几点: 在支持流计算和批计算的同时,让计算逻辑可以复用,实现“一套代码两类需求”。统一历史数据全量和在线实时增量数据的存储,实现“一份存储两类计算”。为了方便舆情结果查询需求,“batch view”和“real-time view”存储在既可以支持高吞吐的实时写入,也可以支持多字段组合搜索和全文检索。总结起来就是整套新架构的核心是解决存储的问题,以及如何灵活的对接计算。我们希望整套方案是类似下面的架构: 数据流实时写入一个分布式的数据库,借助于数据库查询能力,全量数据可以轻松的对接批量计算系统进行离线处理。数据库通过数据库日志接口,支持增量读取,实现对接流计算引擎进行实时计算。批计算和流计算的结果写回分布式数据库,分布式数据库提供丰富的查询语意,实现计算结果的交互式查询。整套架构中,存储层面通过结合数据库主表数据和数据库日志来取代大数据架构中的队列服务,计算系统选取天然支持批和流的计算引擎例如Flink或者Spark。这样一来,我们既可以像Lambda进行无限制的历史数据回溯,又可以像Kappa架构一样一套逻辑,存储处理两类计算任务。这样的一套架构我们取名为“Lambda plus”,下面就详细展开如何在阿里云上打造这样的一套大数据架构。 云上舆情系统架构在阿里云众多存储和计算产品中,贴合上述大数据架构的需求,我们选用两款产品来实现整套舆情大数据系统。存储层面使用阿里云自研的分布式多模型数据库Tablestore,计算层选用Blink来实现流批一体计算。 这套架构在存储层面,全部基于Tablestore,一个数据库解决不同存储需求,根据之前舆情系统的介绍,网页爬虫数据在系统流动中会有四个阶段分别是原始网页内容,网页结构化数据,分析规则元数据和舆情结果,舆情结果索引。我们利用Tablestore宽行和schema free的特性,合并原始网页和网页结构化数据成一张网页数据。网页数据表和计算系统通过Tablestore新功能通道服务进行对接。通道服务基于数据库日志,数据的组织结构按照数据的写入顺序进行存储,正是这一特性,赋能数据库具备了队列流式消费能力。使得存储引擎既可以具备数据库的随机访问,也可以具备队列的按照写入顺序访问,这也就满足我们上面提到整合Lambda和kappa架构的需求。分析规则元数据表由分析规则,情感词库组层,对应实时计算中的维表。计算系统这里选用阿里云实时流计算产品Blink,Blink是一款支持流计算和批计算一体的实时计算产品。并且类似Tablestore可以很容易的做到分布式水平扩展,让计算资源随着业务数据增长弹性扩容。使用Tablestore + Blink的优势有以下几点: Tablestore已经深度和Blink进行整合,支持源表,维表和目的表,业务无需为数据流动开发代码。整套架构大幅降低组建个数,从开源产品的6~7个组建减少到2个,Tablestore和Blink都是全托管0运维的产品,并且都能做到很好的水平弹性,业务峰值扩展无压力,使得大数据架构的运维成本大幅降低。业务方只需要关注数据的处理部分逻辑,和Tablestore的交互逻辑都已经集成在Blink中。开源方案中,如果数据库源希望对接实时计算,还需要双写一个队列,让流计算引擎消费队列中的数据。我们的架构中数据库既作为数据表,又是队列通道可以实时增量数据消费。大大简化了架构的开发和使用成本。流批一体,在舆情系统中实时性是至关重要的,所以我们需要一个实时计算引擎,而Blink除了实时计算以外,也支持批处理Tablestore的数据, 在业务低峰期,往往也需要批量处理一些数据并作为反馈结果写回Tablestore,例如情感分析反馈等。那么一套架构既可以支持流处理又可以支持批处理是再好不过。这里我们可以参考之前的一篇文章《实时计算最佳实践:基于表格存储和Blink的大数据实时计算》。一套架构带来的优势是,一套分析代码既可以做实时流计算又可以离线批处理。整个计算流程会产生实时的舆情计算结果。重大舆情事件的预警,通过Tablestore和函数计算触发器对接来实现。Tablestore和函数计算做了增量数据的无缝对接,通过结果表写入事件,可以轻松的通过函数计算触发短信或者邮件通知。完整的舆情分析结果和展示搜索利用了Tablestore的新功能多元索引,彻底解决了开源Hbase+Solr多引擎的痛点: 运维复杂,需要有运维hbase和solr两套系统的能力,同时还需要维护数据同步的链路。Solr数据一致性不如Hbase,在Hbase和Solr数据语意并不是完全一致,加上Solr/Elasticsearch在数据一致性很难做到像数据库那么严格。在一些极端情况下会出现数据不一致的问题,开源方案也很难做到跨系统的一致性比对。查询接口需要维护两套API,需要同时使用Hbase client和Solr client,索引中没有的字段需要主动反查Hbase,易用性较差。参考文献Lambda大数据架构Kappa大数据架构Lambda和Kappa架构对比总结本文基于《百亿级全网舆情分析系统存储设计》并结合Tablestore的新功能做了现代大数据舆情系统的架构升级,实现了海量信息下的实时舆情分析存储系统。也介绍了开源方案,并和我们的方案做了详细的对比。 本文作者:宇珩阅读原文 本文为云栖社区原创内容,未经允许不得转载。

June 3, 2019 · 1 min · jiezi

入门教程-5分钟从零构建第一个-Flink-应用

本文转载自 Jark’s Blog ,作者伍翀(云邪),Apache Flink Committer,阿里巴巴高级开发工程师。本文将从开发环境准备、创建 Maven 项目,编写 Flink 程序、运行程序等方面讲述如何迅速搭建第一个 Flink 应用。在本文中,我们将从零开始,教您如何构建第一个 Flink 应用程序。开发环境准备Flink 可以运行在 Linux, Max OS X, 或者是 Windows 上。为了开发 Flink 应用程序,在本地机器上需要有 Java 8.x 和 maven 环境。 如果有 Java 8 环境,运行下面的命令会输出如下版本信息: $ java -versionjava version "1.8.0_65"Java(TM) SE Runtime Environment (build 1.8.0_65-b17)Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)如果有 maven 环境,运行下面的命令会输出如下版本信息:$ mvn -versionApache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)Maven home: /Users/wuchong/dev/mavenJava version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jreDefault locale: zh_CN, platform encoding: UTF-8OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"另外我们推荐使用 ItelliJ IDEA (社区免费版已够用)作为 Flink 应用程序的开发 IDE。Eclipse 虽然也可以,但是 Eclipse 在 Scala 和 Java 混合型项目下会有些已知问题,所以不太推荐 Eclipse。下一章节,我们会介绍如何创建一个 Flink 工程并将其导入 ItelliJ IDEA。创建 Maven 项目我们将使用 Flink Maven Archetype 来创建我们的项目结构和一些初始的默认依赖。在你的工作目录下,运行如下命令来创建项目: ...

May 22, 2019 · 2 min · jiezi

OPPO数据中台之基石基于Flink-SQL构建实数据仓库

作者 | 张俊本文整理自 2019 年 4 月 13 日在深圳举行的 Flink Meetup 会议,分享嘉宾张俊,目前担任 OPPO 大数据平台研发负责人,也是 Apache Flink contributor。本文主要内容如下: OPPO 实时数仓的演进思路;基于 Flink SQL 的扩展工作;构建实时数仓的应用案例;未来工作的思考和展望。一.OPPO 实时数仓的演进思路1.1.OPPO 业务与数据规模 大家都知道 OPPO 是做智能手机的,但并不知道 OPPO 与互联网以及大数据有什么关系,下图概要介绍了 OPPO 的业务与数据情况: OPPO 作为手机厂商,基于 Android 定制了自己的 ColorOS 系统,当前日活跃用户超过 2 亿。围绕 ColorOS,OPPO 构建了很多互联网应用,比如应用商店、浏览器、信息流等。在运营这些互联网应用的过程中,OPPO 积累了大量的数据,上图右边是整体数据规模的演进:从 2012 年开始每年都是 2~3 倍的增长速度,截至目前总数据量已经超过 100PB,日增数据量超过 200TB。要支撑这么大的一个数据量,OPPO 研发出一整套的数据系统与服务,并逐渐形成了自己的数据中台体系。 1.2.OPPO 数据中台 今年大家都在谈数据中台,OPPO 是如何理解数据中台的呢?我们把它分成了 4 个层次: 最下层是统一工具体系,涵盖了"接入 - 治理 - 开发 - 消费"全数据链路;基于工具体系之上构建了数据仓库,划分成"原始层 - 明细层 - 汇总层 - 应用层",这也是经典的数仓架构;再往上是全域的数据体系,什么是全域呢?就是把公司所有的业务数据都打通,形成统一的数据资产,比如 ID-Mapping、用户标签等;最终,数据要能被业务用起来,需要场景驱动的数据产品与服务。以上就是 OPPO 数据中台的整个体系,而数据仓库在其中处于非常基础与核心的位置。 ...

May 15, 2019 · 4 min · jiezi

为什么说流处理即未来?

作者|Stephan Ewen整理|秦江杰本文整理自 Flink 创始公司 Ververica 联合创始人兼 CTO - Stephan Ewen 在 Flink Forward China 2018 上的演讲《Stream Processing takes on Everything》。这个演讲主题看似比较激进:流处理解决所有问题。很多人对于 Flink 可能还停留在最初的认知,觉得 Flink 是一个流处理引擎,实际上 Flink 可以做很多其他的工作,比如批处理、应用程序。在这个演讲中,Stephan 首先会简单说明他对 Flink 功能的观点,然后深入介绍一个特定领域的应用和事件处理场景。这个场景乍看起来不是一个流处理的使用场景,但是在 Stephan 看来,它实际上就是一个很有趣的流处理使用场景。Flink社区专刊下载地址第一期:不仅仅是流计算第二期:重新定义计算上图对为什么流处理可以处理一切作出诠释,将数据看做流是一个自然而又十分强大的想法。大部分数据的产生过程都是随时间生成的流,比如一个 Petabyte 的数据不会凭空产生。这些数据通常都是一些事件的积累,比如支付、将商品放入购物车,网页浏览,传感器采样输出。基于数据是流的想法,我们对数据处理可以有相应的理解。比如将过去的历史数据看做是一个截止到某一时刻的有限的流,或是将一个实时处理应用看成是从某一个时刻开始处理未来到达的数据。可能在未来某个时刻它会停止,那么它就变成了处理从开始时刻到停止时刻的有限数据的批处理。当然,它也有可能一直运行下去,不断处理新到达的数据。这个对数据的重要理解方式非常强大,基于这一理解,Flink 可以支持整个数据处理范畴内的所有场景。最广为人知的 Flink 使用场景是流分析、连续处理(或者说渐进式处理),这些场景中 Flink 实时或者近实时的处理数据,或者采集之前提到的历史数据并且连续的对这些事件进行计算。晓伟在之前的演讲中提到一个非常好的例子来说明怎么样通过对 Flink 进行一些优化,进而可以针对有限数据集做一些特别的处理,这使得 Flink 能够很好的支持批处理的场景,从性能上来说能够与最先进的批处理引擎相媲美。而在这根轴的另一头,是我今天的演讲将要说明的场景 – 事件驱动的应用。这类应用普遍存在于任何服务或者微服务的架构中。这类应用接收各类事件(可能是 RPC 调用、HTTP 请求),并且对这些事件作出一些响应,比如把商品放进购物车,或者加入社交网络中的某个群组。在我进一步展开今天的演讲之前,我想先对社区在 Flink 的传统领域(实时分析、连续处理)近期所做的工作做一个介绍。Flink 1.7 在 2018 年 11 月 30 日已经发布。在 Flink 1.7 中为典型的流处理场景加入了一些非常有趣的功能。比如我个人非常感兴趣的在流式 SQL 中带时间版本的 Join。一个基本想法是有两个不同的流,其中一个流被定义为随时间变化的参照表,另一个是与参照表进行 Join 的事件流。比如事件流是一个订单流,参照表是不断被更新的汇率,而每个订单需要使用最新的汇率来进行换算,并将换算的结果输出到结果表。这个例子在标准的 SQL 当中实际上并不容易表达,但在我们对 Streaming SQL 做了一点小的扩展以后,这个逻辑表达变得非常简单,我们发现这样的表达有非常多的应用场景。另一个在流处理领域十分强大的新功能是将复杂事件处理(CEP)和 SQL 相结合。CEP 应用观察事件模式。比如某个 CEP 应用观察股市,当有两个上涨后紧跟一个下跌时,这个应用可能做些交易。再比如一个观察温度计的应用,当它发现有温度计在两个超过 90 摄氏度的读数之后的两分钟里没有任何操作,可能会进行一些操作。与 SQL 的结合使这类逻辑的表达也变得非常简单。第三个 Flink 1.7 中做了很多工作的功能是 Schema 升级。这个功能和基于流的应用紧密相关。就像你可以对数据库进行数据 Schema 升级一样,你可以修改 Flink 表中列的类型或者重新写一个列。另外我想简单介绍的是流处理技术不仅仅是简单对数据进行计算,这还包括了很多与外部系统进行事务交互。流处理引擎需要在采用不同协议的系统之间以事务的方式移动数据,并保证计算过程和数据的一致性。这一部分功能也是在 Flink 1.7 中得到了增强。以上我对 Flink 1.7 的新功能向大家做了简单总结。下面让我们来看看今天我演讲的主要部分,也就是利用 Flink 来搭建应用和服务。我将说明为什么流处理是一个搭建应用和服务或者微服务的有趣技术。我将从左边这个高度简化的图说起,我们一会儿将聊一些其中的细节。首先我们来看一个理解应用简单的视角。如左图所示,一个应用可以是一个 Container,一个 Spring 应用,或者 Java 应用、Ruby 应用,等等。这个应用从诸如 RPC,HTTP 等渠道接收请求,然后依据请求进行数据库变更。这个应用也可能调用另一个微服务并进行下一步的处理。我们可以非常自然的想到进入到应用的这些请求可以看做是个事件组成的序列,所以我们可以把它们看做是事件流。可能这些事件被缓存在消息队列中,而应用会从消息队列中消费这些事件进行处理,当应用需要响应一个请求时,它将结果输出到另一个消息队列,而请求发送方可以从这个消息队列中消费得到所发送请求的响应。在这张图中我们已经可以看到一些有趣的不同。第一个不同是在这张图中应用和数据库不再是分开的两个实体,而是被一个有状态的流处理应用所代替。所以在流处理应用的架构中,不再有应用和数据库的连接了,它们被放到了一起。这个做法有利有弊,但其中有些好处是非常重要的。首先是性能上的好处是明显的,因为应用不再需要和数据库进行交互,处理可以基于内存中的变量进行。其次这种做法有很好并且很简单的一致性。这张图被简化了很多,实际上我们通常会有很多个应用,而不是一个被隔离的应用,很多情况下你的应用会更符合这张图。系统中有个接收请求的接口,然后请求被发送到第一个应用,可能会再被发到另一个应用,然后得到相应。在图中有些应用会消费中间结果的流。这张图已经展示了为什么流处理是更适合比较复杂的微服务场景的技术。因为很多时候系统中不会有一个直接接收用户请求并直接响应的服务,通常来说一个微服务需要跟其他微服务通信。这正如在流处理的架构中不同应用在创建输出流,同时基于衍生出的流再创建并输出新的流。到目前为止,我们看到的内容多少还比较直观。而对基于流处理技术的微服务架构而言,人们最常问的一个问题是如何保证事务性?如果系统中使用的是数据库,通常来说都会有非常成熟复杂的数据校验和事务模型。这也是数据库在过去许多年中十分成功的原因。开始一个事务,对数据做一些操作,提交或者撤销一个事务。这个机制使得数据完整性得到了保证(一致性,持久性等等)。那么在流处理中我们怎么做到同样的事情呢?作为一个优秀的流处理引擎,Flink 支持了恰好一次语义,保证了每个事件只会被处理一遍。但是这依然对某些操作有限制,这也成为了使用流处理应用的一个障碍。我们通过一个非常简单流处理应用例子来看我们可以做一些什么扩展来解决这个问题。我们会看到,解决办法其实出奇的简单。让我们以这个教科书式的事务为例子来看一下事务性应用的过程。这个系统维护了账户和其中存款余额的信息。这样的信息可能是银行或者在线支付系统的场景中用到的。假设我们想要处理类似下面的事务:如果账户 A 中的余额大于 100,那么从账户 A 中转账 50 元到账户 B。这是个非常简单的两个账户之间进行转账的例子。数据库对于这样的事务已经有了一个核心的范式,也就是原子性,一致性,隔离性和持久性(ACID)。这是能够让用户放心使用事务的几个基本保证。有了他们,用户不用担心钱在转账过程中会丢失或者其他问题。让我们用这个例子来放到流处理应用中,来让流处理应用也能提供和数据相同的 ACID 支持:原子性要求一个转账要不就完全完成,也就是说转账金额从一个账户减少,并增加到另一个账户,要不就两个账户的余额都没有变化。而不会只有一个账户余额改变。否则的话钱就会凭空减少或者凭空增加。一致性和隔离性是说如果有很多用户同时想要进行转账,那么这些转账行为之间应该互不干扰,每个转账行为应该被独立的完成,并且完成后每个账户的余额应该是正确的。也就是说如果两个用户同时操作同一个账户,系统不应该出错。持久性指的是如果一个操作已经完成,那么这个操作的结果会被妥善的保存而不会丢失。我们假设持久性已经被满足。一个流处理器有状态,这个状态会被 checkpoint,所以流处理器的状态是可恢复的。也就是说只要我们完成了一个修改,并且这个修改被 checkpoint 了,那么这个修改就是持久化的。让我们来看看另外三个例子。设想一下,如果我们用流处理应用来实现这样一个转账系统会发生什么。我们先把问题简化一些,假设转账不需要有条件,仅仅是将 50 元从账户 A 转到账户,也就是说账户 A 的余额减少 50 元而账户 B 的余额增加 50 元。我们的系统是一个分布式的并行系统,而不是一个单机系统。简单起见我们假设系统中只有两台机器,这两台机器可以是不同的物理机或者是在 YARN 或者 Kubernetes 上不同的容器。总之它们是两个不同的流处理器实例,数据分布在这两个流处理器上。我们假设账户 A 的数据由其中一台机器维护,而账户 B 的数据有另一台机器维护。现在我们要做个转账,将 50 元从账户 A 转移到账户 B,我们把这个请求放进队列中,然后这个转账请求被分解为对账户 A 和 B 分别进行操作,并且根据键将这两个操作路由到维护账户 A 和维护账户 B 的这两台机器上,这两台机器分别根据要求对账户 A 和账户 B 的余额进行改动。这并不是事务操作,而只是两个独立无意义的改动。一旦我们将转账的请求改的稍微复杂一些就会发现问题。下面我们假设转账是有条件的,我们只想在账户 A 的余额足够的情况下才进行转账,这样就已经有些不太对了。如果我们还是像之前那样操作,将这个转账请求分别发送给维护账户 A 和 B 的两台机器,如果 A 没有足够的余额,那么 A 的余额不会发生变化,而 B 的余额可能已经被改动了。我们就违反了一致性的要求。我们看到我们需要首先以某种方式统一做出是否需要更改余额的决定,如果这个统一的决定中余额需要被修改,我们再进行修改余额的操作。所以我们先给维护 A 的余额的机器发送一个请求,让它查看 A 的余额。我们也可以对 B 做同样的事情,但是这个例子里面我们不关心 B 的余额。然后我们把所有这样的条件检查的请求汇总起来去检验条件是否满足。因为 Flink 这样的流处理器支持迭代,如果满足转账条件,我们可以把这个余额改动的操作放进迭代的反馈流当中来告诉对应的节点来进行余额修改。反之如果条件不满足,那么余额改动的操作将不会被放进反馈流。这个例子里面,通过这种方式我们可以正确的进行转账操作。从某种角度上来说我们实现了原子性,基于一个条件我们可以进行全部的余额修改,或者不进行任何余额修改。这部分依然还是比较直观的,更大的困难是在于如何做到并发请求的隔离性。假设我们的系统没有变,但是系统中有多个并发的请求。我们在之前的演讲中已经知道,这样的并发可能达到每秒钟几十亿条。如图,我们的系统可能从两个流中同时接受请求。如果这两个请求同时到达,我们像之前那样将每个请求拆分成多个请求,首先检查余额条件,然后进行余额操作。然而我们发现这会带来问题。管理账户 A 的机器会首先检查 A 的余额是否大于 50,然后又会检查 A 的余额是否大于 100,因为两个条件都满足,所以两笔转账操作都会进行,但实际上账户 A 上的余额可能无法同时完成两笔转账,而只能完成 50 元或者 100 元的转账中的一笔。这里我们需要进一步思考怎么样来处理并发的请求,我们不能只是简单地并发处理请求,这会违反事务的保证。从某种角度来说,这是整个数据库事务的核心。数据库的专家们花了一些时间提供了不同解决方案,有的方案比较简单,有的则很复杂。但所有的方案都不是那么容易,尤其是在分布式系统当中。在流处理中怎么解决这个问题呢?直觉上讲,如果我们能够让所有的事务都按照顺序依次发生,那么问题就解决了,这也被成为可序列化的特性。但是我们当然不希望所有的请求都被依次顺序处理,这与我们使用分布式系统的初衷相违背。所以我们需要保证这些请求最后的产生的影响看起来是按照顺序发生的,也就是一个请求产生的影响是基于前一个请求产生影响的基础之上的。换句话说也就是一个事务的修改需要在前一个事务的所有修改都完成后才能进行。这种希望一件事在另一件事之后发生的要求看起来很熟悉,这似乎是我们以前在流处理中曾经遇到过的问题。是的,这听上去像是事件时间。用高度简化的方式来解释,如果所有的请求都在不同的事件时间产生,即使由于种种原因他们到达处理器的时间是乱序的,流处理器依然会根据他们的事件时间来对他们进行处理。流处理器会使得所有的事件的影响看上去都是按顺序发生的。按事件时间处理是 Flink 已经支持的功能。那么详细说来,我们到底怎么解决这个一致性问题呢?假设我们有并行的请求输入并行的事务请求,这些请求读取某些表中的记录,然后修改某些表中的记录。我们首先需要做的是把这些事务请求根据事件时间顺序摆放。这些请求的事务时间不能够相同,但是他们之间的时间也需要足够接近,这是因为在事件时间的处理过程中会引入一定的延迟,我们需要保证所处理的事件时间在向前推进。因此第一步是定义事务执行的顺序,也就是说需要有一个聪明的算法来为每个事务制定事件时间。在图上,假设这三个事务的事件时间分别是 T+2, T 和 T+1。那么第二个事务的影响需要在第一和第三个事务之前。不同的事务所做的修改是不同的,每个事务都会产生不同的操作请求来修改状态。我们现在需要将对访问每个行和状态的事件进行排序,保证他们的访问是符合事件时间顺序的。这也意味着那些相互之间没有关系的事务之间自然也没有了任何影响。比如这里的第三个事务请求,它与前两个事务之间没有访问共同的状态,所以它的事件时间排序与前两个事务也相互独立。而当前两个事务之间的操作的到达顺序与事件时间不符时,Flink 则会依据它们的事件时间进行排序后再处理。必须承认,这样说还是进行了一些简化,我们还需要做一些事情来保证高效执行,但是总体原则上来说,这就是全部的设计。除此之外我们并不需要更多其他东西。为了实现这个设计,我们引入了一种聪明的分布式事件时间分配机制。这里的事件时间是逻辑时间,它并不需要有什么现实意义,比如它不需要是真实的时钟。使用 Flink 的乱序处理能力,并且使用 Flink 迭代计算的功能来进行某些前提条件的检查。这些就是我们构建一个支持事务的流处理器的要素。我们实际上已经完成了这个工作,称之为流式账簿(Streaming Ledger),这是个在 Apache Flink 上很小的库。它基于流处理器做到了满足 ACID 的多键事务性操作。我相信这是个非常有趣的进化。流处理器一开始基本上没有任何保障,然后类似 Storm 的系统增加了至少一次的保证。但显然至少一次依然不够好。然后我们看到了恰好一次的语义,这是一个大的进步,但这只是对于单行操作的恰好一次语义,这与键值库很类似。而支持多行恰好一次或者多行事务操作将流处理器提升到了一个可以解决传统意义上关系型数据库所应用场景的阶段。Streaming Ledger 的实现方式是允许用户定义一些表和对这些表进行修改的函数。Streaming Ledger 会运行这些函数和表,所有的这些一起编译成一个 Apache Flink 的有向无环图(DAG)。Streaming Ledger 会注入所有事务时间分配的逻辑,以此来保证所有事务的一致性。搭建这样一个库并不难,难的是让它高性能的运行。让我们来看看它的性能。这些性能测试是几个月之前的,我们并没有做什么特别的优化,我们只是想看看一些最简单的方法能够有什么样的性能表现。而实际性能表现看起来相当不错。如果你看这些性能条形成的阶梯跨度,随着流处理器数量的增长,性能的增长相当线性。在事务设计中,没有任何协同或者锁参与其中。这只是流处理,将事件流推入系统,缓存一小段时间来做一些乱序处理,然后做一些本地状态更新。在这个方案中,没有什么特别代价高昂的操作。在图中性能增长似乎超过了线性,我想这主要是因为 JAVA 的 JVM 当中 GC 的工作原因导致的。在 32 个节点的情况下我们每秒可以处理大约两百万个事务。为了与数据库性能测试进行对比,通常当你看数据库的性能测试时,你会看到类似读写操作比的说明,比如 10% 的更新操作。而我们的测试使用的是 100% 的更新操作,而每个写操作至少更新在不同分区上的 4 行数据,我们的表的大小大约是两亿行。即便没有任何优化,这个方案的性能也非常不错。另一个在事务性能中有趣的问题是当更新的操作对象是一个比较小的集合时的性能。如果事务之间没有冲突,并发的事务处理是一个容易的事情。如果所有的事务都独立进行而互不干扰,那这个不是什么难题,任何系统应该都能很好的解决这样的问题。当所有的事务都开始操作同一些行时,事情开始变得更有趣了,你需要隔离不同的修改来保证一致性。所以我们开始比较一个只读的程序、一个又读又写但是没有写冲突的程序和一个又读又写并有中等程度写冲突的程序这三者之间的性能。你可以看到性能表现相当稳定。这就像是一个乐观的并发冲突控制,表现很不错。那如果我们真的想要针对这类系统的阿喀琉斯之踵进行考验,也就是反复的更新同一个小集合中的键。在传统数据库中,这种情况下可能会出现反复重试,反复失败再重试,这是一种我们总想避免的糟糕情况。是的,我们的确需要付出性能代价,这很自然,因为如果你的表中有几行数据每个人都想更新,那么你的系统就失去了并发性,这本身就是个问题。但是这种情况下,系统并没崩溃,它仍然在稳定的处理请求,虽然失去了一些并发性,但是请求依然能够被处理。这是因为我们没有冲突重试的机制,你可以认为我们有一个基于乱序处理天然的冲突避免的机制,这是一种非常稳定和强大的技术。我们还尝试了在跨地域分布的情况下的性能表现。比如我们在美国、巴西,欧洲,日本和澳大利亚各设置了一个 Flink 集群。也就是说我们有个全球分布的系统。如果你在使用一个关系型数据库,那么你会付出相当高昂的性能代价,因为通信的延迟变得相当高。跨大洲的信息交互比在同一个数据中心甚至同一个机架上的信息交互要产生大得多的延迟。但是有趣的是,流处理的方式对延迟并不是十分敏感,延迟对性能有所影响,但是相比其它很多方案,延迟对流处理的影响要小得多。所以,在这样的全球分布式环境中执行分布式程序,的确会有更差的性能,部分原因也是因为跨大洲的通信带宽不如统一数据中心里的带宽,但是性能表现依然不差。实际上,你可以拿它当做一个跨地域的数据库,同时仍然能够在一个大概 10 个节点的集群上获得每秒几十万条事务的处理能力。在这个测试中我们只用了 10 个节点,每个大洲两个节点。所以 10 个节点可以带来全球分布的每秒 20 万事务的处理能力。我认为这是很有趣的结果,这是因为这个方案对延迟并不敏感。我已经说了很多利用流处理来实现事务性的应用。可能听起来这是个很自然的想法,从某种角度上来说的确是这样。但是它的确需要一些很复杂的机制来作为支撑。它需要一个连续处理而非微批处理的能力,需要能够做迭代,需要复杂的基于事件时间处理乱序处理。为了更好地性能,它需要灵活的状态抽象和异步 checkpoint 机制。这些是真正困难的事情。这些不是由 Ledger Streaming 库实现的,而是 Apache Flink 实现的,所以即使对这类事务性的应用而言,Apache Flink 也是真正的中流砥柱。至此,我们可以说流处理不仅仅支持连续处理、流式分析、批处理或者事件驱动的处理,你也可以用它做事务性的处理。当然,前提是你有一个足够强大的流处理引擎。这就是我演讲的全部内容。本文作者:apache_flink阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

April 18, 2019 · 2 min · jiezi

Pick!闲鱼亿级商品库中的秒级实时选品

一、业务背景在电商运营工作中,营销活动是非常重要的部分,对用户增长和GMV都有很大帮助。对电商运营来说,如何从庞大的商品库中筛选出卖家优质商品并推送给有需要的买家购买是每时每刻都要思索的问题,而且这个过程需要尽可能快和实时。保证快和实时就可以提升买卖双方的用户体验,提高用户粘性。二、实时选品为了解决上面提到的问题,闲鱼研发了马赫系统。马赫是一个实时高性能的商品选品系统,解决在亿级别商品中通过规则筛选优质商品并进行投放的场景。有了马赫系统之后,闲鱼的运营同学可以在马赫系统上创建筛选规则,比如商品标题包含“小猪佩奇”、类目为“玩具”、价格不超过100元且商品状态为未卖出。在运营创建规则后,马赫系统会同时进行两步操作,第一步是从存量商品数据筛选符合条件的商品进行打标;第二步是对商品实时变更进行规则计算,实时同步规则命中结果。马赫系统最大的特点是快而实时,体现在命中规模为100w的规则可以在10分钟之内完成打标;商品本身变更导致的规则命中结果同步时间为1秒钟。运营可以通过马赫系统快速筛选商品向用户投放,闲鱼的流量也可以精准投给符合条件的商品并且将流量利用到最大化。那么马赫系统是如何解决这一典型的电商问题的呢,马赫系统和流计算有什么关系呢,这是下面要详细说明的部分。三、流计算流计算是持续、低延迟、事件触发的数据处理模型。流计算模型是使用实时数据集成工具,将数据实时变化传输到流式数据存储,此时数据的传输变成实时化,将长时间累积大量的数据平摊到每个时间点不停地小批量实时传输;流计算会将计算逻辑封装为常驻计算服务,一旦启动就一直处于等待事件触发状态,当有数据流入后会触发计算迅速得到结果;当流计算得到计算结果后可以立刻将数据输出,无需等待整体数据的计算结果。闲鱼实时选品系统使用的流计算框架是Blink,Blink是阿里巴巴基于开源流计算框架Flink定制研发的企业级流计算框架,可以认为是Flink的加强版,现在已经开源。Flink是一个高吞吐、低延迟的计算引擎,同时还提供很多高级功能。比如它提供有状态的计算,支持状态管理,支持强一致性的数据语义以及支持Event Time,WaterMark对消息乱序的处理等特性,为闲鱼实时选品系统的超低延时选品提供了有力支持。3.1、Blink之StateState是指流计算过程中计算节点的中间计算结果或元数据属性,比如在aggregation过程中要在state中记录中间聚合结果,比如Apache Kafka作为数据源时候,我们也要记录已经读取记录的offset,这些State数据在计算过程中会进行持久化(插入或更新)。所以Blink中的State就是与时间相关的,Blink任务的内部数据(计算数据和元数据属性)的快照。马赫系统会在State中保存商品合并之后的全部数据和规则运行结果数据。当商品发生变更后,马赫系统会将商品变更信息与State保存的商品信息进行合并,并将合并的信息作为入参运行所有规则,最后将规则运行结果与State保存的规则运行结果进行Diff后得到最终有效的运行结果。所以Blink的State特性是马赫系统依赖的关键特性。3.2、Blink之WindowBlink的Window特性特指流计算系统特有的数据分组方式,Window的创建是数据驱动的,也就是说,窗口是在属于此窗口的第一个元素到达时创建。当窗口结束时候删除窗口及状态数据。Blink的Window主要包括两种,分别为滚动窗口(Tumble)和滑动窗口(Hop)。滚动窗口有固定大小,在每个窗口结束时进行一次数据计算,也就是说滚动窗口任务每经过一次固定周期就会进行一次数据计算,例如每分钟计算一次总量。滑动窗口与滚动窗口类似,窗口有固定的size,与滚动窗口不同的是滑动窗口可以通过slide参数控制滑动窗口的新建频率。因此当slide值小于窗口size的值的时候多个滑动窗口会重叠,此时数据会被分配给多个窗口,如下图所示:Blink的Window特性在数据计算统计方面有很多使用场景,马赫系统主要使用窗口计算系统处理数据的实时速度和延时,用来进行数据统计和监控告警。3.3、Blink之UDXUDX是Blink中用户自定义函数,可以在任务中调用以实现一些定制逻辑。Blink的UDX包括三种,分别为:UDF - User-Defined Scalar FunctionUDF是最简单的自定义函数,输入是一行数据的任意字段,输出是一个字段,可以实现数据比较、数据转换等操作。UDTF - User-Defined Table-Valued FunctionUDTF 是表值函数,每个输入(单column或多column)返回N(N>=0)Row数据,Blink框架提供了少量的UDTF,比如:STRING_SPLIT,JSON_TUPLE和GENERATE_SERIES3个built-in的UDTF。UDAF - User-Defined Aggregate FunctionUDAF是聚合函数,输入是多行数据,输出是一个字段。Blink框架Built-in的UDAF包括MAX,MIN,AVG,SUM,COUNT等,基本满足了80%常用的集合场景,但仍有一定比例的复杂业务场景,需要定制自己的聚合函数。马赫系统中使用了大量的UDX进行逻辑定制,包括消息解析、数据处理等。而马赫系统最核心的商品数据合并、规则运行和结果Diff等流程就是通过UDAF实现的。四、秒级选品方案选品系统在项目立项后也设计有多套技术方案。经过多轮讨论后,最终决定对两套方案实施验证后决定最终实现方案。第一套方案是基于PostgreSQL的方案,PostgreSQL可以很便捷的定义Function进行数据合并操作,在PostgreSQL的trigger上定义执行规则逻辑。基于PostgreSQL的技术实现较复杂,但能满足功能需求。不过性能测试结果显示PostgreSQL处理小数据量(百万级)性能较好;当trigger数量多、trigger逻辑复杂或处理亿级别数据时,PostgreSQL的性能会有较大下滑,不能满足秒级选品的性能指标。因此基于PostgreSQL的方案被否决(在闲鱼小商品池场景中仍在使用)。第二套方案是基于Blink流计算方案,通过验证发现Blink SQL很适合用来表达数据处理逻辑而且Blink性能很好,综合对比之后最终选择Blink流计算方案作为实际实施的技术方案。为了配合使用流计算方案,马赫系统经过设计和解耦,无缝对接Blink计算引擎。其中数据处理模块是马赫系统核心功能模块,负责接入商品相关各类数据、校验数据、合并数据、执行规则和处理执行结果并输出等步骤,所以数据处理模块的处理速度和延时在很大程度上能代表马赫系统数据处理速度和延时。接下来我们看下数据处理模块如何与Blink深度结合将数据处理延迟降到秒级。数据处理模块结构如上图,包含数据接入层、数据合并层、规则运行层和规则运行结果处理层。每层都针对流计算处理模式进行了单独设计。4.1、数据接入层数据接入层是数据处理模块前置,负责对接多渠道各种类型的业务数据,主要逻辑如下:数据接入层对接多个渠道多种类型的业务数据;解析业务数据并做简单校验;统计各渠道业务数据量级并进行监控,包括总量和同比变化量;通过元数据中心获取字段级别的Metadata配置。元数据中心是用来保存和管理所有字段的MetaData配置信息组件。Metadata配置代表字段元数据配置,包括字段值类型,值范围和值格式等基础信息;根据Metadata配置进行字段级别数据校验;按照马赫定义的标准数据范式组装数据。这样设计的考虑是因为业务数据是多种多样的,比如商品信息包括数据库的商品表记录、商品变更的MQ消息和算法产生的离线数据,如果直接通过Blink对接这些业务数据源的话,需要创建多个Blink任务来对接不同类型业务数据源,这种处理方式太重,而且数据接入逻辑与Blink紧耦合,不够灵活。数据接入层可以很好的解决上述问题,数据接入层可以灵活接入多种业务数据,并且将数据接入与Blink解耦,最终通过同一个Topic发出消息。而Blink任务只要监听对应的Topic就可以连续不断的收到业务数据流,触发接下来的数据处理流程。4.2、数据合并层数据合并是数据处理流程的重要步骤,数据合并的主要作用是将商品的最新信息与内存中保存的商品信息合并供后续规则运行使用。数据合并主要逻辑是:监听指定消息队列Topic,获取业务数据消息;解析消息,并将消息内容按照字段重新组装数据,格式为{key:[timestamp, value]},key是字段名称,value是字段值,timestamp为字段数据产生时间戳;将组装后的数据和内存中保存的历史数据根据timestamp进行字段级别数据合并,合并算法为比较timestamp大小取最新字段值,具体逻辑见下图。数据合并有几个前提:内存可以保存存量数据;这个是Blink提供的特性,Blink可以将任务运行过程中产生的存量数据保存在内存中,在下一次运行时从内存中取出继续处理。合并后的数据能代表商品的最新状态;这点需要一个巧妙设计:商品信息有很多字段,每个字段的值是数组,不仅要记录实际值,还要记录当前值的修改时间戳。在合并商品信息时,按照字段进行合并,合并规则是取时间戳最大的值为准。举例来说,内存中保存的商品ID=1的信息是{“desc”: [1, “描述1”], “price”: [4, 100.5]},数据流中商品ID=1的信息是{“desc”: [2, “描述2”], “price”: [3, 99.5]},那么合并结果就是{“desc”: [2, “描述2”], “price”: [4, 100.5]},每个字段的值都是最新的,代表商品当前最新信息。当商品信息发生变化后,最新数据由数据接入层流入,通过数据合并层将数据合并到内存,Blink内存中保存的是商品当前最新的全部数据。4.3、规则运行层规则运行层是数据处理流程核心模块,通过规则运算得出商品对各规则命中结果,逻辑如下:规则运行层接受输入为经过数据合并后的数据;通过元数据中心获取字段级别Metadata配置;根据字段Metadata配置解析数据;通过规则中心获取有效规则列表,规则中心是指创建和管理规则生命周期的组件;循环规则列表,运行单项规则,将规则命中结果保存在内存;记录运行规则抛出异常的数据,并进行监控告警。这里的规则指的是运营创建的业务规则,比如商品价格大于50且状态为在线。规则的输入是经过数据合并后的商品数据,输出是true或false,即是否命中规则条件。规则代表的是业务投放场景,马赫系统的业务价值就是在商品发生变更后尽快判断是否命中之前未命中的规则或是不命中之前已经命中的规则,并将命中和不命中结果尽快体现到投放场景中。规则运行需利用Blink强大算力来保证快速执行,马赫系统当前有将近300条规则,而且还在快速增长。这意味着每个商品发生变更后要在Blink上运行成百上千条规则,闲鱼每天有上亿商品发生变更,这背后需要的运算量是非常惊人的。4.4、运行结果处理层读者读到这里可能会奇怪,明明经过规则运行之后直接把运行结果输出到投放场景就可以了,不需要运行结果处理层。实际上运行结果处理层是数据处理模块最重要的部分。因为在实际场景中,商品的变更在大部分情况只会命中很少一部分规则,而且命中结果也很少会变化。也就是说商品对很多规则的命中结果是没有意义的,如果将这些命中结果也输出的话,只会增加操作TPS,对实际结果没有任何帮助。而筛选出有效的运行结果,这就是运行结果处理层的作用。运行结果处理层逻辑如下:获取商品数据的规则运行结果;按照是否命中规则解析运行结果;将运行结果与内存中保存的历史运行结果进行diff,diff作用是排除新老结果中相同的命中子项,逻辑见下图。运行结果处理层利用Blink内存保存商品上一次变更后规则运行结果,并将当前变更后规则运行结果与内存中结果进行比较,计算出有效运行结果。举例来说,商品A上一次变更后规则命中结果为{“rule1”:true, “rule2”:true, “rule3”:false, “rule4”:false},当前变更后规则命中结果为{“rule1”:true, “rule2”:false, “rule3”:false, “rule4”:true}。因为商品A变更后对rule1和rule3的命中结果没有变化,所以实际有效的命中结果是{“rule2”:false, “rule4”:true},通过运行结果处理层处理后输出的是有效结果的最小集,可以极大减小无效结果输出,提高数据处理的整体性能和效率。4.5、难点解析虽然闲鱼实时选品系统在立项之初经过预研和论证,但因为使用很多新技术框架和流计算思路,在开发过程中遇到一些难题,包括设计和功能实现方面的,很多是设计流计算系统的典型问题。我们就其中一个问题与各位读者探讨-规则公式转换。4.5.1、规则公式转换这个问题的业务场景是:运营同学在马赫系统页面上筛选商品字段后保存规则,服务端是已有的老系统,逻辑是根据规则生成一段SQL,SQL的where条件和运营筛选条件相同。SQL有两方面的作用,一方面是作为离线规则,在离线数据库中执行SQL筛选符合规则的离线商品数据;另一方面是转换成在线规则,在Blink任务中对实时商品变更数据执行规则以判断是否命中。因为实时规则运行使用的是MVEL表达式引擎,MVEL表达式是类Java语法的,所以问题就是将离线规则的SQL转换成在线规则的Java表达式,两者逻辑需一致,并且需兼顾性能和效率。问题的解决方案很明确,解析SQL后将SQL操作符转换成Java操作符,并将SQL特有语法转成Java语法,例如A like ‘%test%‘转成A.contains(’test’)。这个问题的难点是如何解析SQL和将解析后的语义转成Java语句。经过调研之后给出了简单而优雅的解决方案,主要步骤如下:使用Druid框架解析SQL语句,转成一个二叉树,单独取出其中的where条件子树;通过后序遍历算法遍历where条件子树;将SQL操作符换成对应的Java操作符;目前支持且、或、等于、不等于、大于、大于等于、小于、小于等于、like、not like和in等操作。将SQL语法格式转成Java语法;将in语法改成Java的或语法,例如A in (‘hello’, ‘world’)转成(A == ‘hello’) || (A == ‘world’)。实际运行结果如下:代码逻辑如下(主要是二叉树后续遍历和操作符转换,不再详细解释):五、结论马赫系统上线以来,已经支持近400场活动和投放场景,每天处理近1.4亿条消息,峰值TPS达到50000。马赫系统已经成为闲鱼选品投放的重要支撑。本文主要阐述马赫系统中数据处理的具体设计方案,说明整体设计的来龙去脉。虽然闲鱼实时选品系统针对的是商品选品,但数据处理流计算技术方案的输入是MQ消息,输出也是MQ消息,不与具体业务绑定,所以数据处理流计算技术方案不只适用于商品选品,也适合其他类似实时筛选业务场景。希望我们的技术方案和设计思路能给你带来一些想法和思考,也欢迎和我们留言讨论,谢谢。参考资料闲鱼实时选品系统:https://mp.weixin.qq.com/s/8ROsZniYD7nIQssC14mn3wBlink:https://github.com/apache/flink/tree/blinkPostgreSQL:https://www.postgresql.org/druid:https://github.com/alibaba/druid本文作者:闲鱼技术-剑辛阅读原文本文为云栖社区原创内容,未经允许不得转载。

April 3, 2019 · 1 min · jiezi

利用blink CEP实现流计算中的超时统计问题

案例与解决方案汇总页:阿里云实时计算产品案例&解决方案汇总一. 背景介绍如<利用blink+MQ实现流计算中的延时统计问题>一文中所描述的场景,我们将其简化为以下案例:实时流的数据源结构如下:物流订单号支付时间仓接单时间仓出库时间LP12018-08-01 08:00 LP12018-08-01 08:002018-08-01 09:00 LP22018-08-01 09:10 LP22018-08-01 09:102018-08-01 09:50 LP22018-08-01 09:102018-08-01 09:502018-08-01 12:00我们期望通过以上数据源,按照支付日期统计,每个仓库的仓接单量、仓出库量、仓接单超2H未出库单量、仓接单超6H未出库单量。可以看出,其中LP1仓接单时间是2018-08-01 09:00,但一直到2018-08-01 12:00点之前,一直都没有出库,LP1满足仓接单超2H未出库的行为。该场景的难点就在于:订单未出库。而对于TT中的源消息流,订单未出库,TT就不会下发新的消息,不下发新的消息,blink就无法被触发计算。而针对上述的场景,对于LP1,我们需要在仓接单时间是2018-08-01 09:00+2H,也就是2018-08-01 11:00的之后,就要知道LP1已经仓接单但超2H未出库了。二. 解决方案本文主要是利用blink CEP来实现上述场景,具体实现步骤如下所述。第一步:在source DDL中定义event_timestamp,并定义sink,如下:—-定义sourcecreate table sourcett_dwd_ri( lg_order_code varchar comment ‘物流订单号’ ,ded_pay_time varchar comment ‘支付时间’ ,store_code varchar comment ‘仓库编码’ ,store_name varchar comment ‘仓库名称’ ,wms_create_time varchar comment ‘仓接单时间’ ,wms_consign_create_time varchar comment ‘仓出库时间’ ,evtstamp as case when coalesce(wms_create_time, ‘’) <> ’’ then to_timestamp(wms_create_time, ‘yyyy-MM-dd HH:mm:ss’) else to_timestamp(‘1970-01-01 00:00:00’, ‘yyyy-MM-dd HH:mm:ss’) end –构造event_timestamp,如果源表本身带有消息的occur_time,可直接选择occur_time作为event_timestamp ,WATERMARK FOR evtstamp AS withOffset(evtstamp, 10000) –设置延迟10秒处理)with( type=‘tt’ ,topic=‘dwd_ri’ ,accessKey=‘xxxxxx’ ,accessId=‘xxxxxx’ ,lengthCheck=‘PAD’ ,nullValues=’\N|’);—-定义sinkcreate table sink_hybrid_blink_cep( ded_pay_date varchar comment ‘支付日期’ ,store_code varchar comment ‘仓库编码’ ,store_name varchar comment ‘仓库名称’ ,wms_create_ord_cnt bigint comment ‘仓接单量’ ,wms_confirm_ord_cnt bigint comment ‘仓出库量’ ,wmsin_nowmsout_2h_ord_cnt bigint comment ‘仓接单超2小时未出库单量’ ,wmsin_nowmsout_6h_ord_cnt bigint comment ‘仓接单超6小时未出库单量’ ,sub_partition bigint comment ‘二级分区(支付日期)’ ,PRIMARY KEY (ded_pay_date, store_code, sub_partition))with( type=‘PetaData’ ,url = ‘xxxxxx’ ,tableName=‘blink_cep’ ,userName=‘xxxxxx’ ,password=‘xxxxxx’ ,bufferSize=‘30000’ ,batchSize=‘3000’ ,batchWriteTimeoutMs=‘15000’);第二步:根据blink CEP的标准语义进行改写,如下:create view blink_cep_v1asselect ‘仓接单-仓出库超时’ as timeout_type ,lg_order_code ,wms_create_time as start_time ,wms_consign_create_time as end_timefrom source_dwd_csn_whc_lgt_fl_ord_riMATCH_RECOGNIZE( PARTITION BY lg_order_code ORDER BY evtstamp MEASURES e1.wms_create_time as wms_create_time ,e2.wms_consign_create_time as wms_consign_create_time ONE ROW PER MATCH WITH TIMEOUT ROWS –重要,必须设置延迟也下发 AFTER MATCH SKIP TO NEXT ROW PATTERN (e1 -> e2) WITHIN INTERVAL ‘6’ HOUR EMIT TIMEOUT (INTERVAL ‘2’ HOUR, INTERVAL ‘6’ HOUR) DEFINE e1 as e1.wms_create_time is not null and e1.wms_consign_create_time is null ,e2 as e2.wms_create_time is not null and e2.wms_consign_create_time is not null)where wms_create_time is not null –重要,可以大大减少进入CEP的消息量and wms_consign_create_time is null –重要,可以大大减少进入CEP的消息量;第三步:根据blink的执行机制,我们通过源实时流sourcett_dwd_ri与超时消息流blink_cep_v1关联,来触发blink对超时消息进行聚合操作,如下:create view blink_cep_v2asselect a.lg_order_code as lg_order_code ,last_value(a.store_code ) as store_code ,last_value(a.store_name ) as store_name ,last_value(a.ded_pay_time ) as ded_pay_time ,last_value(a.wms_create_time ) as wms_create_time ,last_value(a.real_wms_confirm_time ) as real_wms_confirm_time ,last_value(case when coalesce(a.wms_create_time, ‘’) <> ’’ and coalesce(a.real_wms_confirm_time, ‘’) = ’’ and now() - unix_timestamp(a.wms_create_time,‘yyyy-MM-dd HH:mm:ss’) >= 7200 then ‘Y’ else ‘N’ end) as flag_01 ,last_value(case when coalesce(a.wms_create_time, ‘’) <> ’’ and coalesce(a.real_wms_confirm_time, ‘’) = ’’ and now() - unix_timestamp(a.wms_create_time,‘yyyy-MM-dd HH:mm:ss’) >= 21600 then ‘Y’ else ‘N’ end) as flag_02from (select lg_order_code as lg_order_code ,last_value(store_code ) as store_code ,last_value(store_name ) as store_name ,last_value(ded_pay_time ) as ded_pay_time ,last_value(wms_create_time ) as wms_create_time ,last_value(wms_consign_create_time) as real_wms_confirm_time from sourcett_dwd_ri group by lg_order_code ) aleft outer join (select lg_order_code ,count(*) as cnt from blink_cep_v1 group by lg_order_code ) bon a.lg_order_code = b.lg_order_codegroup by a.lg_order_code;insert into sink_hybrid_blink_cepselect regexp_replace(substring(a.ded_pay_time, 1, 10), ‘-’, ‘’) as ded_pay_date ,a.store_code ,max(a.store_name) as store_name ,count(case when coalesce(a.wms_create_time, ‘’) <> ’’ then a.lg_order_code end) as wmsin_ord_cnt ,count(case when coalesce(a.real_wms_confirm_time, ‘’) <> ’’ then a.lg_order_code end) as wmsout_ord_cnt ,count(case when a.flag_01 = ‘Y’ then a.lg_order_code end) as wmsin_nowmsout_2h_ord_cnt ,count(case when a.flag_02 = ‘Y’ then a.lg_order_code end) as wmsin_nowmsout_6h_ord_cnt ,cast(regexp_replace(SUBSTRING(ded_pay_time, 1, 10), ‘-’, ‘’) as bigint) as sub_partitionfrom blink_cep_v2 as t1where coalesce(lg_cancel_time, ‘’) = ‘‘and coalesce(ded_pay_time, ‘’) <> ‘‘group by regexp_replace(substring(ded_pay_time, 1, 10), ‘-’, ‘’) ,a.store_code;三. 问题拓展blink CEP的参数比较多,要完全看懂,着实需要一些时间,但CEP的强大是毋庸置疑的。CEP不仅可以解决物流场景中的超时统计问题,风控中的很多场景也是信手拈来。这里有一个风控中的场景,通过上述物流案例的用法,我们是否能推敲出这个场景的用法呢?风控案例测试数据如下:刷卡时间银行卡ID刷卡地点2018-04-13 12:00:001WW2018-04-13 12:05:001WW12018-04-13 12:10:001WW22018-04-13 12:20:001WW我们认为,当一张银行卡在10min之内,在不同的地点被刷卡大于等于两次,我们就期望对消费者出发预警机制。blink CEP是万能的么?答案是否定的,当消息乱序程度比较高的时候,实时性和准确性就成了一对矛盾的存在。要想实时性比较高,必然要求设置的offset越小越好,但offset设置比较小,就直接可能导致很多eventtime<watermark-offset的消息,直接被丢弃,准确性很难保证。比如,在CP回传物流详情的时候,经常回传的时间跟实操的时间差异很大(实操时间是10点,但回传时间是15点),如果以实操时间作为eventtime,可能就会导致这种差异很大的消息被直接丢掉,无法进入CEP,进而无法触发CEP后续的计算,在使用CEP的过程中,应该注意这一点。四. 作者简介花名:缘桥,来自菜鸟-CTO-数据部-仓配数据研发,主要负责菜鸟仓配业务的离线和实时数据仓库建设以及创新数据技术和工具的探索和应用。本文作者:付空阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

March 6, 2019 · 3 min · jiezi

利用blink+MQ实现流计算中的超时统计问题

案例与解决方案汇总页:阿里云实时计算产品案例&解决方案汇总一. 背景介绍菜鸟的物流数据本身就有链路复杂、实操节点多、汇总维度多、考核逻辑复杂的特点,对于实时数据的计算存在很大挑战。经过仓配ETL团队的努力,目前仓配实时数据已覆盖了绝大多数场景,但是有这样一类特殊指标:“晚点超时指标”(例如:出库超6小时未揽收的订单量),仍存在实时汇总计算困难。原因在于:流计算是基于消息触发计算的,若没有消息到达到则无法计算,这类指标恰好是要求在指定的超时时间计算出有多少未达到的消息。然而,这类指标对于指导实操有着重要意义,可以告知运营小二当前多少订单积压在哪些作业节点,应该督促哪些实操人员加快作业,这对于物流的时效KPI达成至关重要。之前的方案是:由产品前端根据用户的请求查询OLAP数据库,由OLAP从明细表出结果。大促期间,用户请求量大,加之数据量大,故对OLAP的明细查询造成了比较大的压力。二. 解决方案2.1 问题定义“超时晚点指标” 是指,一笔订单的两个相邻的实操节点node_n-1 、node_n 的完成时间 time_n-1、time_n,当满足 : time_n is null && current_time - time_n-1 > kpi_length 时,time_flag_n 为 true , 该笔订单计入 超时晚点指标的计数。如下图,有一笔订单其 node_1 为出库节点,时间为time_1 = ‘2018-06-18 00:00:00’ ,运营对出库与揽收之间考核的时长 kpi_length = 6h, 那么当前自然时间 current_time > ‘2018-06-18 06:00:00’ 时,且node_2揽收节点的time_2 为null,则该笔订单的 timeout_flag_2 = true , “出库超6小时未揽收订单量” 加1。由于要求time_2 为null,即要求没有揽收消息下发的情况下让流计算做汇总值更新,这违背了流计算基于消息触发的基本原理,故流计算无法直接算出这种“超时晚点指标”。决问题的基本思路是:在考核时刻(即 kpi_time = time_n-1+kpi_length )“制造”出一条消息下发给流计算,触发汇总计算。继续上面的例子:在考核时刻“2018-06-18 06:00:00”利用MetaQ定时消息功能“制造”出一条消息下发给流计算汇总任务,触发对该笔订单的 time_out_flag_2 的判断,增加汇总计数。同时,还利用 Blink 的Retraction 机制,当time_2 由null变成有值的时候,Blink 可以对 time_out_flag_2 更新,重新计数。2.2 方案架构如上图所示:Step1: Blink job1 接收来自上游系统的订单数据,做清洗加工,生成订单明细表:dwd_ord_ri,利用TT下发给Blink job2 和 Blink job3。Step2:Blink job2 收到 dwd_ord_ri后,对每笔订单算出考核时刻 kpi_time = time_n-1+kpi_length,作为MetaQ消息的“TIMER_DELIVER_MS” 属性,写入MetaQ。MetaQ的定时消息功能,可以根据用户写入的TIMER_DELIVER_MS 在指定时刻下发给消费者,即上图中的Blink job3。Step3:Blink job3 接收 TT、MetaQ 两个消息源,先做Join,再对time_flag判断,最后做Aggregate计算。同一笔订单,dwd_ord_ri、timing_msg任意一个消息到来,都会触发join,time_flag判断,aggregate重新计算一遍,Blink的Retraction可对结果进行实时更新。2.3 实现细节本方案根据物流场景中多种实操节点、多种考核时长的特点,从Blink SQL代码 和 自定义Sink两方面做了特殊设计,从而实现了灵活配置、高效开发。(1) Blink job2 — 生成定时消息关键Blink SQL 代码如下。约定每条record的第一个字段为投递时间列表,即MetaQ向消费者下发消息的时刻List,也就是上面所说的多个考核时刻。第二个字段为保序字段,比如在物流场景中经常以订单code、运单号作为保序主键。该代码实现了对每个出库的物流订单,根据其出库时间,向后延迟6小时(21600000毫秒)、12小时(43200000毫秒)、24小时(86400000毫秒)由MetaQ向消费者下发三个定时消息。create table metaq_timing_msg(deliver_time_list varchar comment ‘投递时间列表’, – 约定第一个字段为投递时间listlg_code varchar comment ‘物流订单code’, – 约定第二字段为保序主键node_name varchar comment ‘节点名称’,node_time varchar comment ‘节点时间’,)WITH(type = ‘custom’,class = ‘com.alibaba.xxx.xxx.udf.MetaQTimingMsgSink’,tag = ‘store’,topic = ‘blink_metaq_delay_msg_test’,producergroup = ‘blinktest’,retrytimes = ‘5’,sleeptime = ‘1000’);insert into metaq_timing_msgselectconcat_ws(’,’,cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 21600000) as varchar), –6小时cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 43200000) as varchar), –12小时cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 86400000) as varchar) –24小时) as deliver_time_list,lg_code,‘wms’ as node_name,store_out_time as node_timefrom(selectlg_code,FIRST_VALUE(store_out_time) as store_out_timefrom srctablegroup by lg_code)bwhere store_out_time is not null ;(2) Blink 自定义Sink — MetaQTimingMsg SinkBlink的当前版本还不支持 MetaQ的定时消息功能的Sink,故利用 Blink的自定义Sink功能,并结合菜鸟物流数据的特点开发了MetaQTimingMsg Sink。关键代码如下(实现 writeAddRecord 方法)。@Overridepublic void writeAddRecord(Row row) throws IOException {Object deliverTime = row.getField(0);String[] deliverTimeList = deliverTime.toString().split(",");for(String dTime:deliverTimeList){ String orderCode = row.getField(1).toString(); String key = orderCode + “_” + dTime; Message message = newMessage(row, dTime, key); boolean result = sendMessage(message,orderCode); if(!result){ LOG.error(orderCode + " : " + dTime + " send failed"); } }}private Message newMessage(Row row,String deliverMillisec,String key){ //Support Varbinary Type Insert Into MetaQ Message message = new Message(); message.setKeys(key); message.putUserProperty(“TIMER_DELIVER_MS”,deliverMillisec); int arity = row.getArity(); Object[] values = new Object[arity]; for(int i=0;i<arity;i++){ values[i]=row.getField(i); } String lineStr=org.apache.commons.lang3.StringUtils.join(values, FIELD_DELIMITER); try { byte[] bytes = lineStr.getBytes(ENCODING); message.setBody(bytes); message.setWaitStoreMsgOK(true); } catch (UnsupportedEncodingException e) { LOG.error(“create new message error”,e); } return message;}private boolean sendMessage(Message message,String orderCode){ long retryTime = 0; boolean isSendSuccess = true; if(message != null){ message.setTopic(topicName); message.setTags(tagName); } SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { …. // 针对物流订单code的hash算法 return list.get(index.intValue()); } },orderCode); if(!result.getSendStatus().equals(SendStatus.SEND_OK)){ LOG.error("" + orderCode +" write to metaq result is " + result.getSendStatus().toString()); isSendSuccess = false; } return isSendSuccess;}}(3)Blink job3 — 汇总计算关键Blink SQL 代码如下,统计了每个仓库的“出库超6小时未揽收物理订单”、“出库超12小时未揽收物理订单”、“出库超24小时未揽收物理订单”的汇总值。代码中使用了“stringLast()”函数处理来自dwd_ord_ri的每条消息,以取得每个物流订单的最新出库揽收情况,利用Blink Retraction机制,更新汇总值。create view dws_store_view as select t1.store_code, max(t1.store_name) as store_name, count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,‘yyyy-MM-dd HH:mm:ss’) >= 21600 then t2.lg_code end ) as tms_not_collect_6h_ord_cnt, —出库超6小时未揽收物流订单量 count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,‘yyyy-MM-dd HH:mm:ss’) >= 43200 then t2.lg_code end ) as tms_not_collect_12h_ord_cnt,—出库超6小时未揽收物流订单量 count(case when length(trim(t1.store_out_time)) = 19 and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,‘yyyy-MM-dd HH:mm:ss’) >= 86400 then t2.lg_code end ) as tms_not_collect_24h_ord_cnt —出库超6小时未揽收物流订单量from ( select lg_code, coalesce(store_code,’-1’) as store_code, store_name, store_out_time, tms_collect_time from ( select lg_code, max(store_code) as store_code, max(store_name) as store_name, stringLast(store_out_time) as store_out_time, stringLast(tms_collect_time)as tms_collect_time, from dwd_ord_ri group by lg_code ) a ) t1left outer join ( select lg_code, from timing_msg where node_name = ‘wms’ group by lg_code) t2on t1.lg_code = t2.lg_codegroup by t1.store_code ;三. 方案优势3.1 配置灵活我们从“Blink SQL 代码” 和“自定义MetaQ” 两个方面设计,用户可以根据具体的业务场景,在Blink SQL的一个view里就能实现多种节点多种考核时间的定时消息生成,而不是针对每一个实操节点的每一种定时指标都要写一个view,这样大大节省了代码量,提升了开发效率。例如对于仓库节点的出库超6小时未揽收、超12小时未揽收、超24小时未揽收,这三个指标利用上述方案,仅需在Blink job2的中metaq_timing_msg的第一个字段deliver_time_list中拼接三个kpi_length,即6小时、12小时、24小时为一个字符串即可,由MetaQTimingMsg Sink自动拆分成三条消息下发给MetaQ。对于不同的节点的考核,仅需在node_name,node_time填写不同的节点名称和节点实操时间即可。3.2 主键保序如2.3节所述,自定义的Sink中 实现了MetaQ的 MessageQueueSelector 接口的 select() 方法,同时在Blink SQL 生成的MetaQ消息默认第二个字段为保序主键字段。从而,可以根据用户自定义的主键,保证同一主键的所有消息放在同一个通道内处理,从而保证按主键保序,这对于流计算非常关键,能够实现数据的实时准确性。3.3 性能优良让专业的团队做专业的事。个人认为,这种大规模的消息存储、消息下发的任务本就应该交给“消息中间件”来处理,这样既可以做到计算与消息存储分离,也可以方便消息的管理,比如针对不同的实操节点,我们还可以定义不同的MetaQ的tag。另外,正如2.2节所述,我们对定时消息量做了优化。考虑到一笔订单的属性字段或其他节点更新会下发多条消息,我们利用了Blink的FIRST_VALUE函数,在Blink job2中同一笔订单的的一种考核指标只下发一条定时消息,大大减少了消息量,减轻了Blink的写压力,和MetaQ的存储。四. 自我介绍马汶园 阿里巴巴 -菜鸟网络—数据部 数据工程师菜鸟仓配实时研发核心成员,主导多次仓配大促实时数据研发,对利用Blink的原理与特性解决物流场景问题有深入思考与理解。本文作者:付空阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

March 6, 2019 · 3 min · jiezi

安排!活动素材的亿级用户精准投放

1.背景随着闲鱼用户快速增长,运营活动越来越趋于精细和个性化,运营会根据用户偏好为其投放合适的活动,如下图所示在闲鱼首页商品展示时,会在商品的列表中插入活动Banner,通过这些活动banner引导用户进入到相应活动会场,实现会场导流。闲鱼投放系统负责闲鱼运营活动的配置、管理、投放。主要解决了以下几个问题1.配置环境隔离问题,根据开发规范,任何线上业务必须先进行线下环境发布,测试验证通过后再发布上线。所以提供的管理平台需要拥有线上和线下环境隔离的能力。2.检索中的性能问题,在同一资源位下会配置多个活动,每次检索时需要把该资源位下的所有活动拉出来,按照条件进行筛选出符合要求的活动,这个过程会随着资源位下的活动增多检索遇到性能瓶颈。3.人群管理问题,在活动中会配置投放的人群,每次检索活动时人群作为活动的检索条件,需要验证用户是否属于当前活动的人群,所以需要解决用户和人群关系的管理。4.AB测试问题,运营投放的活动往往需要进行AB测试比较不同策略的表现,此时需要提供AB测试的能力。下面将通过介绍闲鱼投放系统设计、技术方案设计和实现过程,进一步阐述如何解决上面提出的四个问题。2.系统架构设计闲鱼投放系统是一个配置管理和配置检索的系统,换句话讲他不生产任何活动素材,他只是活动素材的搬运工。下面介绍闲鱼投放的系统设计。如上图所示闲鱼投放系统共分为了活动素材层、投放配置层、业务流程层和应用层四个层次。活动素材层是对在闲鱼投放系统中所有素材源的汇总,目前闲鱼投放中主要汇集了三种素材鲁班素材、马赫素材、TPP素材,鲁班素材提供了用户个性化Banner的能力,他的原理是根据用户的行为获取到偏好的商品,然后把商品图和素材模板组合为一个Banner;马赫素材提供规则圈选商品,他的原理是利用规则引擎把规则转换为SQL语句,然后利用该SQL在商品表中捞出符合规则的商品,同时马赫利用流计算能力对增量商品也实现了实时的规则圈选。TPP素材提个性化商品推荐,例如首页的商品推荐和猜你喜欢的商品推荐,TPP作为个性化推荐平台,可以根据不同的算法实现不同的推荐策略。投放配置层是开放给运营能力的汇总,包含活动配置、环境隔离、数据报表三种能力。活动配置是对一个资源位下所有投放行为的具体配置,如下图所示每个资源位会投放多个活动,活动与活动之间需要进行排期,每个投放活动中包含人群和素材两类信息,人群用来确定该活动投放的对象,素材用来确定该活动投放的内容,同时在人群下支持AB的能力。环境隔离是为了能够区分线上和线下业务,保证线上的投放环境在线下充分验证后再进行上线。数据报表是对所有投放活动关键指标的数据汇总。业务流程层是闲鱼投放系统的关键,主要负责投放活动的检索,根据调用方传入的用户信息和资源位信息,返回该资源位下符合该用户的活动。具体的检索逻辑如上图所示,首先在DB中查询当前资源位下的所有在线活动,然后依次过滤每个活动下的人群信息是否符合当前用户,从符合该用户的活动列表中选取一个活动,如果该活动下有AB测试,需要请求AB测试平台获取AB测试中配置的素材信息,最后返回该活动下的素材内容,客户端拿到活动素材后进行展示。应用层是对客户端能力的汇总,包括获取素材、素材样式展示、数据埋点。素材展示是直接与用户交互的部分,需要前端提供多种展示样式,数据埋点是为了验证AB策略和收集活动关键指标数据。3. 技术方案设计在上文中我们提到闲鱼投放面临四个需要解决的问题,分别是环境隔离问题、活动检索问题、人群管理问题、AB能力问题。下面将分别从这几个问题出发介绍解决的方法。3.1 人群管理问题人群管理使用的是集团内的奥格人群平台,他为我们提供了人群圈选和人群验证的能力,在很大程度上解放了闲鱼投放的人群管理,下面简单的介绍一下该平台的实现原理。如上图所示展示了奥格几个核心功能点,实现原理是这样的,首先平台会提供给用户可选择的规则,然后利用规则引擎把所选的规则,生成SQL查询语句和流计算规则,SQL查询语句用来离线圈选用户和流计算规则用来实时筛选新增用户,通过离线规则和在线规则实现了奥格的人群圈选,在人群验证阶段,首先利用倒排检索的思想,用户和人群的关系利用倒排数据结构标识,该方法解决了单用户与多人群关系验证的效率问题,最后利用多级缓存和热点数据本地缓存的方式解决人群检索RT问题。3.2 AB测试管理问题AB测试是用来验证方案的常用方法,常用的AB测试方案大多是用户唯一属性取模的方式按比例划分用户,但是会面临很多复杂的问题1.按照用户的id进行取模计算,对于未登录用户处理是一个常被忽略的问题。2.测试白名单管理,在AB测试时需要把特定人员划分到特定测试桶里。3.多个AB正交测试,如果有多个AB测试,此时需要正交测试时会出现更复杂的情况。在闲鱼投放中使用了集团的一休AB平台,一休提供基于用户、设备等多维度AB策略,同时支持白名单与正交AB测试的复杂场景,在AB基本能力的基础上提供了数据分析的能力,实现了调用到数据管理的一体化。3.3环境隔离问题解决环境隔离问题主要是为了方便测试,先在线下看效果,然后再把数据配置到线上。为了实现环境隔离迭代两次技术方案。首先介绍第一个方案,依照总体功能设计我希望平台中每个模块都可以灵活复用,可以利用已有模块,快速搭建出满足业务要求的投放活动,所以从业务角度进行了抽象,把能拆分的模块尽可能的抽象出来,最终的实体关系如下图所示。从业务逻辑角度共抽象了6个实体分别是资源位(Resource)、活动(Activity)、人群(Crowd)、素材(Data Source)、资源位和活动的关系(Resource Plan)、活动和人群素材的关系(Activity Plan)实体,每个模块之间可以按照下图的关系进行自由组合成一个投放活动。在该方案中利用每个实体中的env字段解决环境隔离问题,无论是在投放活动配置还是在检索过程中,只可以利用相同env字段的实体,该方法完全实现了环境隔离,但是在实际的应用中效果却不是很好,因为利用一份数据表中的env字段实现环境隔离,所以线上和线下对应的Resource Plan和Activity Plan关系表中关联的实体ID不同,那么将无法实现线下配置直接拷贝到线上,此时需要在线下和线上两次配置,由于配置过于复杂增加错误风险。下面介绍第二个方案,第二个技术方案中对方案一中提出的问题进行优化。具体的设计如下图所示:如上图所示,实体对象由6个转换为4个,下面一次介绍这些实体和如何解决环境隔离问题。首先介绍新引入的Data Schema实体,DataSchema是由开发同学负责,提供了一个配置好的JSON配置模板,他与Resource进行关联,意味着当前Resource下的所有DataSource都将按照该DataSchema提供的JSON模板进行配置,同时在解析时也按照当前的DataSchema进行解析Resource不再区分线上和线下环境,因为Resource无论是线上和线下他总是存在的并且不会改变的,所以区分线上和线下是没有必要的。DataSource不再用env字段区分线上和线下环境,利用preData和onlineData进行区分线上和线下配置,由于引入了DataSchema模板,所以彻底解放了DataSource,他不再需要进行繁琐的配置,只需按照DataSchema把所有的需要字段都配置到对应的Schema中即可。这样在线上和线下DataSource是一条数据主键不再改变。Activity实体是DataSource和Resource的关系实体,同时包括活动的人群、起止时间等属性。由于DataSource和Resource实体线上和线下环境中主键ID都不会改变,那么意味着Activity可以把线下的配置直接同步到线上,在同步过程中需要做的是如果线上没有配置就插入一条如果存在就更新。那么怎么映射Activity线下和线上的关系呢,在Activity里面引入了mapId字段,线下的Activity实体在mapId中存储线上Activity实体的主键Id,利用这种映射关系实现了线下和线上的映射。具体的如上图所示,通过这种表和表之间映射关系,实现了环境隔离问题,同时简化了业务中的实体,让配置更简单更易用。3.4活动检索问题在实际应用中,我们遇到了检索能力的性能瓶颈,根据每次检索时都需要拉出当前资源位下的全部活动,然后按照起止时间、人群作为过滤条件,筛选出满足当前用户的活动列表。以上过程中每次检索都会发生与数据库的IO操作。当资源位和访问QPS增多时,数据库IO操作将成倍数增长,此时会成为检索的瓶颈,所以在以上的技术方案中,需要一个完备的缓存方案支撑检索的正常运行。按照常规的缓存设计方案进行了如下的缓存方案设计。所有的查询都是先进行缓存查询,如果未命中再查询数据库,把查询到的数据回写到缓存中。对于所有的更新操作,都是先更新数据库,然后再失效缓存,在更新活动时,需要在失效活动缓存的同时,也要失效该活动对应资源位下活动列表的缓存。但是在使用过程中遇到了一个问题,资源位下的活动列表存储采用了kv结构,key为资源位ID,value为活动列表的JSON序列化,当资源位下的活动增多时value也会随着膨胀最后超出阈值,所以把活动对象进行了简化仅存储活动Id和人群Id。优化后检索过程将有所变化如下图所示:4.总结与展望4.1总结通过以上的整体功能设计、技术方案设计、代码实现,介绍了一个投放平台从设计到实现过程中遇到的问题点和解决方案。目前投放平台已经在闲鱼的用户实时触达、首页feeds投放、淘宝闲鱼小程序投放中使用,完美支持运营根据人群精准投放活动。4.2展望闲鱼素材投放平台但仍有需要持续完善的地方,首先是精准人群的个性化,例如在首页的投放中,针对圈选的人群透出的Banner图片都是一样的,目前我们的投放最小粒度是人群未来将会做到个人。然后是投放能力自优化,目前活动针对资源位的争夺还是利用权重、人群、起止时间作为前置条件,未来将会通过投放数据回流利用算法计算其关键指标实现投放的自优化。同时闲鱼素材投放将对接集团内部的更多优秀的素材提供源丰富闲鱼的活动。本文作者:闲鱼技术-齐悟阅读原文本文为云栖社区原创内容,未经允许不得转载。

March 1, 2019 · 1 min · jiezi

【kafka KSQL】游戏日志统计分析(3)

接上篇文章 【kafka KSQL】游戏日志统计分析(2),本文主要通过实例展示KSQL的连接查询功能。创建另一个topicbin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic prop-normalized往新topic中写入数据bin/kafka-console-producer –broker-list localhost:9092 –topic prop-normalized>{“user__name”:“lzb”, “prop__id”:“id1”}从prop-normalized主题创建StreamCREATE STREAM PROP_USE_EVENT \ (user__name VARCHAR, \ prop__id VARCHAR ) \ WITH (KAFKA_TOPIC=‘prop-normalized’, \ VALUE_FORMAT=‘json’);重新设置ROWKEY为user__nameCREATE STREAM PROP_USE_EVENT_REKEY AS \ SELECT * FROM PROP_USE_EVENT \ PARTITION BY user__name;查询完成3局对局且没有使用过道具的所有玩家查询出所有玩家的对局情况,并创建表USER_SCORE_TABLE(前面已经创建过了):CREATE TABLE USER_SCORE_TABLE AS \ SELECT username, COUNT() AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \ FROM USER_SCORE_EVENT \ WHERE reason = ‘game’ \ GROUP BY username;查询出所有玩家的道具使用情况,并创建表USER_PROP_TABLE:CREATE TABLE USER_PROP_TABLE AS \ SELECT username, COUNT() \ FROM PROP_USE_EVENT_REKEY \ GROUP BY username;使用LEFT JOIN进行左关联查询:SELECT s.username AS username \FROM USER_SCORE_TABLE s \LEFT JOIN USER_PROP_TABLE p \ON s.username = p.username; ...

January 9, 2019 · 1 min · jiezi

阿里巴巴,果然开始拥有“预测未来”的能力了

顶灯闪烁,笛声响彻。救护车载着病人,冲向茫茫车海,在时间的赛道上狂奔。高德地图、GPS 卫星导航、路面磁感线圈、1300 个路口摄像头同时开动,为这辆救护车勘探最快路线;GPS 传回实时数据,后台根据辅助数据纠偏,锚定救护车每一刻的精确位置;救护车将要经过的沿途,车辆情况被实时计算。确保路口绿灯提前亮起,在救护车通过之前,刚好所有社会车辆已经行驶一空。这不是演习,这是杭州城市大脑每天都在执行的任务。依靠计算,一辆救护车到达医院的速度,平均缩短了 50%。在这座城市,靠鸣笛和闯红灯开道的悲壮彻底成为历史。说人同蝼蚁,其实并不为过。两百多万辆车奔跑在城市里,他们的行踪像风里的落叶一样叵测。但通过对 1300个路口的摄像头的实时计算,城市大脑就可以精确地预测出未来十五分钟、未来半小时那哪个路段将会拥堵,从而第一时间指挥路口信号灯“变换姿势”。计算在帮人类追赶时间。中哥今天要说的,就是这个精致而坚固的“大数据实时计算引擎”。你可能从未听说过这个引擎,甚至在此刻之前都不知道它的存在,但你很可能早已成为这个引擎服务的一员:一年一度的双11,无数人涌进天猫,每个人都能用 0.1 秒搜索到自己理想的商品,在智能推荐中发现适合的宝贝,背后正是依赖这个引擎;双11庆典现场,大屏上那个跳动的总成交量数字,只是背后所有数据的冰山一角。几十亿种商品的实时库存、价格、优惠数据得以分秒不慢地同步给屏幕前的你,也同样依赖这个引擎从某种意义上来说,只要给这个计算引擎足够的资源,无论面对多么庞大复杂的系统,我们都可以用几乎忽略不计的时间看到真相——这大大快于人类最聪明的大脑。这是我们亲手创造的“先知”。重器难成。为了这个先知一般的“大数据实时计算引擎”,阿里巴巴最核心的技术人,已经耗费了将近五年时间。让人感慨的是,这个承载了一个个城市的交通,扛起了一条条生产线,担负了一个国家十几亿人购物的强大引擎之所以的诞生在阿里巴巴,最初并不是为了满足什么需要,而仅仅是因为它“看上去很美”。这是一个鲜为人知的故事。(1)1999年,阿里巴巴在杭州成立。同样在1999年,蒋晓伟正在美国攻读理论物理博士。作为一个初三就立志要探索宇宙秘密的年轻人,到目前为止他的人生堪称完美。就在一个崭新的物理学家即将出炉的时候,命运开始展现它的波云诡谲。蒋晓伟突然被自己的导师“忽悠”到了一家非常有希望的互联网初创公司。理由是:“在30岁之前先财富自由,以后爱怎么学物理就怎么学物理。”一年之后,互联网泡沫破裂。然而,蒋晓伟却留在了这片战场。2002年,他加入微软,2010年他加入 Facebook。弹指挥间,直到回国加入阿里巴巴之前,他已经从物理学家成功转型成为数据库和计算资源调度系统专家。他还记得,自己加入阿里的时间是 2014年12月29日。这是一年中可以办理入职的最后一天。“为什么选最后一天?”“因为看上去比较有美感。”“。。。”目测,蒋晓伟是我见过的第一个用物理公式般的美感对待人生的人。甚至,他给自己起的花名都想叫做“量子”,后来思考了一下,觉得量子不太像个人名,才改为谐音“量仔”。蒋晓伟蒋晓伟入职的是阿里巴巴集团搜索团队。你可能会问:纳尼?阿里巴巴还有搜索团队?当然有,而且还极其重要。举个搜索引擎的日常:当你在淘宝搜索框里输入“杜蕾斯”的时候,搜索引擎就马上行动,从亿万卖家出售中的宝贝里帮你找到合适的 TT(及其他产品),然后按照推荐顺序排列在搜索结果里。注意,有趣的硬核要来了:如果,商家的 TT 价格永远不改,库存永远无限,优惠促销方案永远不变,那么搜索团队只需要做一个最简单的查询系统就行了。但是,现实中商家会随时调整价格和优惠,某一款激情大颗粒也可能因为太受欢迎,上架十秒就卖到缺货。在淘宝网上,你会发现真实的状态是:每时每刻都有无数卖家的产品参数在改动。所以,搜索引擎的挑战就是,要根据每时每刻最新的数据库来瞬间算出最适合呈现给你的搜索结果。相信我,只有用最新鲜的数据算出的结果,才能让屏幕对面的你露出心满意足的表情:面对这种现实,一个最稳妥的方式就是,搜索引擎用把现在的数据库全部算一遍,给出结果。但是,这会耗费大量的计算力。毕竟这一秒相对于上一秒来说,可能发生参数变动的宝贝只有十个,而没有参数变动的宝贝有十万个。那么,你自然会想:“有没有一种方法,让我只计算改动的部分,再通过特别的数学运算和之前的结果融合,就能达到和计算全量数据一样的效果呢?”有的,这就叫“流式计算”。打个最简单的比方:你负责把椰汁平分给10个妹纸。刚开始你有10瓶椰汁,于是你一人分了一个。后来,你又得到了10瓶椰汁,这时候椰汁的总数变成了 20 瓶,平均每个妹纸应该得到两个。但你没有必要把之前分给妹纸的椰汁收回来,重新每人给两个;而是可以让每个妹纸手上拿着之前的那瓶椰汁的基础上,每人再补发一瓶。通过这个例子,我猜你已经感受到了“流式计算”的激荡。当然,实际的数据库运算比“分椰汁”复杂得多。需要说明的是,当时在阿里巴巴内部,并不是没有流式计算引擎,各部门都根据自己的需求研发了特定的流式计算引擎,只不过,大多引擎只用来解决各自部门的问题,没有通用性。 很多业务都开发了各自的流式计算引擎但蒋晓伟突然发现,流式计算背后隐藏着一个神奇的事实:既然只计算增量,就能得知全量的结果;那么就可以永远用计算增量的方式来表达计算全量。也就是说:增量计算等效于全量计算;流式计算等效于批处理计算,实时计算等效于离线计算!也就是说,如果按照这个构想做出一套完整功能的“流式计算引擎”,就可以一统江湖,运转在阿里巴巴所有的技术底层。这可是一份不小的产业啊!蒋晓伟越想越鸡冻。然鹅,让他激动的最主要原因竟然是:“这个引擎太完美了!”他发现,其实自己身体里的那个“物理学家”一直都在。物理追求的终极就是“大一统理论”——用一套机制解决所有问题。没想到人生峰回路转,在计算机领域也给发现了一个“大一统”的机会。老实说,蒋晓伟老湿傅这个想法有点危险。危险在哪呢?首先,如果把当时搜索业务需要的流式计算比作汽车发动机的话,蒋晓伟想要研制的发动机,是豪华到可以用到下一代宇宙飞船上的“核能发动机”。自己团队支持的这摊子业务目前根本不需要这么好的引擎。其次,研究这个引擎的基本动力居然是“美感”。出于美感开发一个计算引擎,这种动机天然就有一种理想主义气质。。。能不能研究成,那只有天知道。再说,面对这么宏大的任务,手下能用来做研发的团队,只有五个人。况且这五个兄弟还有日常的任务,人手极度短缺。“但马老师不是说了么,梦想还是要有的,万一实现了呢?”刚刚加入阿里的蒋晓伟倒是决心已定。(2)蒋晓伟“能用”的团队,全员都在北京。这个小分队的老大叫做王峰。王峰是个老阿里了,2006年加入阿里巴巴,在阿里北京的雅虎中国团队做搜索,后来又做过一淘和淘宝搜索。此时此刻,他和北京的几个兄弟主要负责一个开放搜索项目的离线系统。 听到蒋晓伟对于“流式计算引擎”的描述,王峰内心惊呼“卧槽”。对于一个合格技术宅来说,一个好的技术构想比萌妹子更能让他动心。蒋晓伟和王峰一合计,事情很简单:脚踩两只船,那基本没戏。要么就趁早死心,放弃新引擎研发;要么就大家就把旧工作完全交出去,破釜沉舟干票大的。王峰的决定是,干!现在的王峰,笑起来一幅波澜不惊,当年内心也是慌得一批。王峰回忆,领导们觉得很不可思议。因为交出原有的业务,北京这个小团队相当于“失业”了。而新的研究——流式计算引擎——当时只是个构想,连技术方向也没有,代码更是一行都还没写。对于王峰来说,这相当于一次破釜沉舟的内部创业,前途未卜,凶险异常。事实也证明,别人的担心都是对的。一开始团队努着劲儿写了三个月代码,仍然没办法达到蒋晓伟理想中的通用性,连他本人都有点心虚。“我刚来阿里巴巴,就忽悠兄弟们把之前的项目都放弃了,要是最后证明我的构想是个坑,那不是害了别人么。。。”他想。焦急之中,已经到了 2015 年夏天,蒋晓伟突然在业内著名的大数据峰会 Hadoop Sumit 的论坛上看到有人发表了一个惊悚的评论:感觉 Flink 出来之后,Hadoop 就显得不怎么需要了。。。Hadoop 是当年最火的大数据分布式架构,这个 Flink 是个神马,根本没听过啊。但是当蒋晓伟、王峰和团队研究完技术资料之后突然发现,这种“用流式计算来等效一切计算”的理念不就和我们想开发的那套引擎一模一样吗?蒋晓伟仰天长啸:真是天助我也!既然已经有开源的技术,那么我们只要在此之上继续开发流计算引擎就好了啊!这里多介绍一句。Flink 是一个流式计算的开源框架,2010 年诞生于德国研究中心和柏林工业大学,2014年被捐赠给 Apache 基金会,并由创始公司 DataArtisans 继续运营。Flink 的 Logo 是一只眼神里有故事的松鼠。简单来说,2015年的时候,Flink 刚刚“出道”一年,几乎没有人知道,更没有人大规模使用。就像一个刚刚毕业的大学生,看上去很有潜力,但“稳定性”和“实用性”都缺乏事实验证。就这样,这帮阿里巴巴的技术专家,成为了全球第一批使用 Flink 框架做大数据引擎研发的人,蒋晓伟一瞬间就给自己的引擎起好了名字——“Blink”。这是英文眨眼的意思。”一眨眼,所有东西都计算好了!“2015年底,搜索部门要向阿里巴巴 CTO 行癫汇报。每人20分钟时间,结果蒋晓伟上去讲 Blink,沉浸在对这个“完美引擎”的想象中,一下就说了40分钟。作为阿里巴巴所有核心技术的掌门人,行癫素来对新技术很敏感。他听懂了蒋晓伟的技术路线,内心也觉得相当靠谱。但这毕竟是搜索团队自己“偷偷”搞的项目,这帮兄弟究竟可以坚持走多远,行癫心里也没底。于是鼓励蒋晓伟说:“那就等你们明年做出来,我们再看!”阿里巴巴 CTO 行癫 张建锋(3)说到底,Blink 是一个通用引擎。它就像一个万能发动机,可以装载到轿车、卡车、飞机、火箭任何地方。蒋晓伟手握这台“万能发动机”的1.0版本,到处去找车实验。他盯上的“第一批车”,就是搜索业务中的使用场景。简单科普一下:搜索业务的机器学习平台内部代号叫“保时捷”(还真是一辆车。。。),可以根据你浏览商品的时间和动作,实时判断出你可能会对什么感兴趣,从而在下一秒就能给你智能推荐可能喜欢的商品。这是阿里巴巴非常有技术含量的一个应用。实际上,机器学习平台当时已经“心有所属”,配有一台流式计算引擎——之前王峰带领搜索团队自研的 iStream。iStream 是专门为搜索设计的,虽然目前可以很好地完成任务,但结构简单,不具有特别强的通用性。机器学习算法团队的一位负责人仁基,技术思想非常超前,非常巧的是,他同样是个执着于“美感”的人。他相信,未来 Flink 很可能会成为下一代机器学习算法重要的底层计算框架,于是在 Blink 系统研发的早期,就把团队里一百多位算法工程师的力量都用来配合蒋晓伟。“一两百人的团队,被我一个人折腾。”回忆到这里,蒋晓伟露出了羞赧的表情。说得很美好,结果真拿来 Blink 一用,动不动就躺尸。。。说实话,算法工程师没有义务为 Blink 的技术问题买单。毕竟算法工程师是“生产汽车的”,而 Blink 这个“发动机”质量不稳定,导致人家的汽车备受诟病,可以说相当冤枉了。所以那几个月一百多位算法工程师的日常就是各种吐槽“疯子”蒋晓伟。后来蒋晓伟才知道,这些吐槽,全都被仁基扛下来。仁基尽自己一切所能,在保护着这个弱小的 Blink。终于,2016年5月,第一个基于 Blink 的机器学习小功能“A/B Testing”上线。虽然还存在一些青涩的小毛病,但所有的技术人都看到了,Blink 已经像会呼吸的小兽一样,泛出诱人的引擎光泽。最激动的,当然是蒋晓伟本人。他把自己在 Flink 上成功的应用作为一个演讲,投给了当年的 Hadoop Sumit 大会。非常巧,Flink 的创始人 Kostas 和 Stephan 也在同一个大会上有一个演讲。他们两拨人实际是那次 Hadoop 大会上唯二的 Flink 演讲。Kostas 提前看到了议程,顿感相见恨晚,于是主动联系了蒋晓伟,希望他能用团队研究的成果影响社区。“本来之前是想自己玩玩的,我们连阿里都不敢影响,还敢影响社区?”蒋晓伟说。但是 Kostas 和 Stephan 觉得这群阿里人的尝试简直不要太酷,特别支持。蒋晓伟深受感动,“从那时候开始就觉得,我们不仅得把阿里内部的业务做好,还要为 Flink 社区做贡献,把 Flink 社区做好。”就这样,蒋晓伟和团队就跟组织“接上了头”,成为了 Flink 社区的核心成员。Flink 创始人 Kostas这么帅还来搞技术可以说是相当想不开了(4)在搜索团队内部证明了 Blink 能力,又得到了 Flink 社区的认可,蒋晓伟终于有资格正视自己的“野心”了。他提出要让 Blink 支撑“双11”上的实时机器学习任务,对方同意了。也就是说,双11当天,数亿人在淘宝天猫搜索商品,他们的每次查看,点击,都会影响个性化的智能推荐,在下一秒就能看到为自己量身定做的宝贝推荐。而这背后的实时计算,都要由 Blink 来支撑。然而抬眼一看,夏天已经到了,距离双11只有不到半年了。整个九、十月份,Blink 和机器学习系统的联调都处在各种花式崩溃之中。Blink 还小,压根就没见过双十一这种“人类狂欢”的阵仗。出现了一个死结:一旦超大规模数据进来,Blink 的性能立刻大幅下降。要知道,在 AI 领域,性能就是功能。性能大幅下降的 Blink 分分钟就把人工智能坑成“人工智障”。老程序猿都知道,数据规模是对一个系统最大的考验。一个系统承受不住大规模的数据浪潮,有可能证明这个架构就是无解的。如果真是架构缺陷,那么解决方案只有一个:放弃。带领团队攻坚的王峰回忆,那几天“自己已经崩溃了”。十一假期,所有团队的人都从北京冲到了杭州,别说休假,连觉都不睡了。六七个人就在工位上吃住,寻找究竟是哪个节点出了问题。即使是面对这样的情况,蒋晓伟、王峰,还有其他同事都完全相信,Flink 架构是完美的,问题一定是局部的可解的,只是我们还没找到它。终于,问题找到了!是不同层级算子之间的调度模式需要优化。解决这个问题之后,系统能处理的数据量立刻跃升。十月中旬,Blink 正式切上线。本以为劫波渡尽,没成想又是一大堆系统配合的问题接踵而来。蒋晓伟记得,将近11月,Blink 还有一些问题没搞定。这边基础引擎不搞定,算法团队就没办法在它的基础上调优双11的算法。到最后,算法团队的老大都直接找到蒋晓伟,着急地质问:“你们究竟是怎么回事啊?”现在想想,他的意思可能是想让我别折腾,直接换回去年的旧系统。但我的情商低,当时没听明白。就是一门心思地组织大家调优 Blink。。。蒋晓伟回忆。终于赶在11月前,Blink 完成了联调。原则上,从11月1日开始,双11的系统就要封闭代码,谁都不能动了。但是,这是 Blink 第一次承担这么重大的任务,为了万无一失,相关团队又提了很多冗余性的建议。王峰记得很清楚,一直到11月10日,还有几个小时双11就开始了,代码还最后改了几行,最终封闭。人事已尽,唯听天命。11月11日,巨大的数据像海啸一样涌向 Blink,蒋晓伟和王峰都捏了一把汗。然而,这个年轻的引擎应对自如。第二天,Blink 在阿里巴巴一炮而红。2016年“双11”交易额定格在1207亿(5)你以为故事结束了么?图样图森破。紧随而来的 2017 年对于蒋晓伟来说,简直不要更刺激。意识到大数据引擎这么重要,阿里巴巴集团决定调整组织架构,集全公司之力发展大数据引擎,由原阿里云的首席科学家周靖人组建计算平台事业部,在流式计算方面,把公司发展最好的三个引擎团队合三为一。周靖人他也是阿里巴巴达摩院的“禅师”之一这三个引擎分别是:阿里中间件团队的 JStorm、阿里云的 Galaxy、阿里巴巴搜索团队的 Blink。得知大牛周靖人负责整合三个团队,正在美国参加 Flink 官方大会 Flink Foward 的蒋晓伟和王峰内心有点波澜。他们知道,三个队伍合并之后,很可能在三条技术路线之中选择一条。蒋晓伟当然觉得自己的开源技术路线技术前景最好。但平心而论,Galaxy 的框架同样非常优秀。更关键的问题在于,Galaxy 一直是周靖人团队的成果。虽然在阿里巴巴不会出现因为亲疏远近而偏袒某个技术路线,但不可否认周靖人一定对于 Galaxy 更为熟悉。那时的蒋晓伟,和这个即将成为新领导的周靖人完全不熟悉,他完全无法预测将会发生什么。我担心,不会一回到国内,就没工作了吧。。。。蒋晓伟回忆。回国之后,周靖人来找蒋晓伟,蒋晓伟的心已经快跳到嗓子眼了。周靖人说:“我想把整合之后的团队交给你来负责,你们三人一起商量未来的技术路线,你觉得怎么样?”这意味着,蒋晓伟突然拥有了80人的豪华阵容。那一瞬间他在心里默念:“稳了!”只要不是强制采用某个技术路线,他就有信心说服 Galaxy 和 JStorm 的负责人。技术摆在这里,孰优孰劣是能讲得清道理的。蒋晓伟回忆,三个技术负责人的“谈判”整整维持了一周。大家都知道,这次技术路线的抉择,将会影响阿里巴巴未来十年甚至更远的技术发展,谁都不敢掉以轻心。谈到最后,争夺的焦点就集中在 Blink 和 Galaxy 之间。Flink 的开源生态,最终说服了Galaxy 的支持者。此时的 Flink 已经不像两年那样鲜有人问津,而是已经形成了巨大的社区,中国已经有腾讯、滴滴、美团等公司开始用 Flink 建造自己的流式计算引擎。在这个社区里,会有无数国内外大牛对 Flink 的代码做贡献。建立在这个开源基座上的架构,也会发展得更快速。至此,Blink 正式成为了阿里巴巴计算引擎的王牌军。Flink 社区逐渐声势浩荡(6)王牌军可不是白当的。2017年双十一,Blink 领到了自己的艰巨任务——支持全集团(阿里巴巴、阿里云、菜鸟)的流式计算任务。王峰告诉我,其实2016年双11 Blink 承担的搜索任务,已经是一个重头戏,有过这个经历垫底,再适配很多系统的时候只不过是麻烦一点而已。唯独有一样:Blink 要接管后台所有的交易数据的实时计算任务。交易数据计算,是淘宝天猫业务的最核心。也是支撑背后支付、物流的核心依据。很多其他的计算都要基于订单数据的结果。这就像面包店的面粉一样,无论你做什么蛋糕,都需要面粉。如果面粉的供应出问题,那整个面包店就要关门了。所以无论面临多大的订单量,交易数据计算必须稳定、快速、实时。一旦出现错误,损失无可估量。每年双十一狂欢晚会上的那块大屏幕上显示的实时成交数字,也是由订单数据汇总而成的。也就是说,如果 Blink 当天挂掉,不仅对淘宝天猫的运转影响巨大,还会导致一个略为明显的结果:成交量大屏一直维持“0”,一秒把人丢到全球无死角。2014、2015、2016 这三年,这个核心任务都是由兄弟引擎 Galaxy 来承担的。所有人都想到一个稳妥的方案:2017年“双11”让 Blink 和准备退役的 Galaxy 来个双备份,如果 Blink 临时废掉,还可以用 Galaxy 作为备份顶上,至少不会丢人。然鹅,2016年双11的成交量是1207亿元,按照历年经验推测,2017年的成交量八成是会超过1500亿的(事实证明确实如此,达到了1682亿)。而根据 Galaxy 的技术架构,如果不做大量繁琐的优化,很可能顶不住。初出茅庐的 Blink,就这样成为 2017 年双11媒体大屏“全球指定唯一必须顶上不干不行合作伙伴”。。。双11 当天,两条 Blink 链路互为备份。“虽然成功率基本是100%,但万里有一,假设 Blink 本身设计存在未知的缺陷,或者两条备份链路的机器硬件同时坏掉,都可能导致灾难。”蒋晓伟回忆。在双11到来前一周,王峰带领兄弟们已经把 Blink 引擎调整到无以复加的好状态。蒋晓伟想了想,又派同样是 Facebook 回来的大牛工程师大沙去天竺法喜寺烧了一炷香。。。2017年11月11日零点。狂欢现场。时钟敲响零点,然后出现五秒倒计时。按照流程,留给 Blink 的计算时间只有这五秒。也就是说,00:00:05 的时候,无论如何大屏幕都会切到 Blink 给出的双11前五秒交易总额。这五秒,几乎是蒋晓伟人生当中最漫长的五秒。1、2、3。。。第三秒的时候,蒋晓伟面前的监视器跳出了实时成交数据!再两秒之后,实时交易数据被投上大屏,穹顶之下,欢声雷动。蒋晓伟知道,现场观众并不一定理解大屏运行原理,内心也并没有特地把一份掌声送给幕后的流式计算引擎团队。但那一刻,他热泪盈眶。这几年兄弟们付出的努力值了。168,269,635,159。每一个数字,对蒋晓伟和兄弟们都意味着岁月和付出。(7)经过两年双11的考验,已经没人怀疑 Blink 是阿里巴巴最强悍的计算引擎之一。所以,不仅阿里巴巴集团所有用到流式计算的场景都会选用 Blink,Blink 还开始对外提供服务。虽然在蒋晓伟看来,各个场景的计算都可以用 Blink 来解决,但目前被应用最多的场景有如下几个:1、实时统计分析。在电商行业,尤其是促销的场景中,巨大的网络流量涌来,形势变幻莫测。每一秒的库存统计、订单报表,都能揭示出用户的行为规律。对这些数据进行实时分析,就能随时调整促销策略。2、在线机器学习。用户的行为会展现出他的性格和偏好,用机器学习分析一个人浏览商品的姿势,就能为他精准推荐可能感兴趣的商品。但是,可能一个用户只浏览一分钟,如果在这个时间段内没有能够吸引他的商品,它就会退出。所以必须在一秒钟之内,对他刚才的动作进行实时学习,才能保证他第一时间看到感兴趣的宝贝。3、实时金融风控。在金融领域,技术就是金钱。每成功阻断一次欺诈交易,就等于挽回了真金白银。通过对一个账户实时行为的分析,就可以知道现在它有没有进行危险交易,从而在第一时间阻断。4、IoT 边缘计算。在工厂中,每台生产线都会随时产生数据,如果可以实时对这些数据进行分析,就可以减少生产线的损坏几率,提高产品的良品率。根据参数实时调整生产线如此,才有了开头一幕所说:阿里云承建的城市大脑,可以利用 Blink 来预测道路拥堵,为救护车开拓生命道路。根据阿里云首席科学家闵万里博士的介绍:2018年,城市大脑第一次出国,被部署在马来西亚吉隆坡,把救护车到达现场的时间缩短了 48.9%。借助工业大脑,流式计算实时判断生产线的健康状况,帮助世界第一大光伏企业协鑫光伏提高了良品率1%,每年可以节省上亿元的无谓浪费。2018年12月20日,阿里巴巴将 Flink 的旗舰会议 Flink Foward 第一次引入中国,现场座无虚席。蒋晓伟、王峰和流式计算团队的每一个人,在过去的三年都亲眼见证了 Flink 从踽踽独行到集结成军。Flink Forward 2018 北京为了感谢社区的帮助,在这次会议上周靖人宣布,在未来会把基于 Flink 修改的 Blink 流式计算引擎开源。从2019年1月开始,所有人都可以查阅这个支持了双11、支持了城市大脑、支持了工业IoT等无数顶级计算的引擎代码。也就是在这一年,王峰正式接替蒋晓伟,成为流式计算的新掌门。而蒋晓伟则朝着他的“完美梦想”更进一步,带着一帮兄弟在此基础上研究“带有流式计算引擎的数据存储系统”——交互式查询系统,让这个引擎能够解决更多通用的计算问题。带有流式计算引擎的数据存储系统,听起来有些不知所云。其实,这个世界上最经典的这类系统,其实就是我们的大脑。我们一生中会接受各种信息,这些信息共同构成大脑的资料库,帮助我们预测未来。每当有新的信息进来,我们都会根据这一点点信息增量微调我们对于未来的预测。这种调整,毫无疑问是实时的。我们的祖先不小心触摸野火,从那一刻开始就会告诉自己和家人小心火焰。我们依靠对世界的万亿次反馈,发现了万有引力,发现了相对论,发现了量子力学。正是千万人实时更新的预测能力,构成了我们的文明,也书写了我们的历史。以前,所有关于未来的预测都在我们的脑海里,如今,我们终于有机会在躯体之外,利用人类的武器——计算力——建造起一个硕大的预测引擎。角落里,这些技术英雄笑起来安静而羞涩。但正因他们存在,人类面对未来,再也不是手无寸铁。本文作者:赵慧阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

January 7, 2019 · 1 min · jiezi

【kafka KSQL】游戏日志统计分析(2)

接上一篇文章【kafka KSQL】游戏日志统计分析(1),展示一下KSQL WINDOW 功能的使用。测试用日志数据:{“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_高手2区_500_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“lza”}, {“balance”:69532,“delta”:-60,“username”:“lzb”}, {“balance”:972120,“delta”:-60,“username”:“lzc”}, {“balance”:23129,“delta”:180,“username”:“lze”}],“reason”:“game”}KSQL三种Window统计每2分钟内完成对局大于等于3局的玩家根据时间窗口(Tumbling window)建立table:CREATE TABLE users_per_minute AS \ SELECT username, COUNT() AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum , WINDOWSTART() AS win_start, WINDOWEND() AS win_end \ FROM USER_SCORE_EVENT \ WINDOW TUMBLING (SIZE 2 MINUTE) \ WHERE reason = ‘game’ \ GROUP BY username;过滤出game_count大于3局的玩家:SELECT username, game_count, win_start, win_end FROM users_per_minute WHERE game_count >= 3;输出:lze | 6 | 1546744320000 | 1546744440000lzc | 6 | 1546744320000 | 1546744440000lza | 6 | 1546744320000 | 1546744440000lzb | 6 | 1546744320000 | 1546744440000lzb | 3 | 1546744440000 | 1546744560000lzc | 3 | 1546744440000 | 1546744560000lza | 3 | 1546744440000 | 1546744560000lze | 3 | 1546744440000 | 1546744560000统计曾在10分钟之内完成过3局牌局的玩家不限定某个特定的10分钟,只要在某个10分钟之内完成了即可。创建HOPPING WINDOW时间窗口Table:CREATE TABLE users_hopping_10_minute AS \ SELECT username, COUNT() AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum , TIMESTAMPTOSTRING(WindowStart(), ‘yyyy-MM-dd HH:mm:ss’) AS win_start, TIMESTAMPTOSTRING(WindowEnd(), ‘yyyy-MM-dd HH:mm:ss’) AS win_end \ FROM USER_SCORE_EVENT \ WINDOW HOPPING (SIZE 10 MINUTE, ADVANCE BY 30 SECONDS) \ WHERE reason = ‘game’ \ GROUP BY username;过滤出game_count大于等于3的玩家SELECT username \FROM users_hopping_10_minute \WHERE game_count >= 3 \GROUP BY username; ...

January 6, 2019 · 1 min · jiezi

Apache Flink,流计算?不仅仅是流计算!

阿里妹导读:2018年12月下旬,由阿里巴巴集团主办的Flink Forward China在北京国家会议中心举行。Flink Forward是由Apache软件基金会授权的全球范围内的Flink技术大会,2015年开始在德国柏林举办,今年第一次进入中国。今天,计算平台事业部的资深技术专家莫问,将带领我们重温这场大数据技术的饕餮盛宴,感受Apache Flink 作为下一代大数据计算引擎的繁荣生态。Flink Forward China 大会邀请到了来自阿里巴巴、腾讯、华为、滴滴、美团点评、字节跳动、爱奇艺、去哪儿、Uber、DellEMC、DA(Flink 创始公司)等国内外知名企业以及Apache软件基金会的嘉宾为大家分享了Apache Flink的成长历程、应用场景和发展趋势。Flink Forward China 2018 嘉宾PPT及演讲视频:https://github.com/flink-china/flink-forward-china-2018参与有道,如何更“好”地贡献 Apache 项目上午大会由Apache软件基金会的秘书长Craig Russell开场,Craig首先分享了Apache开源之道,以及开源社区的精神和体制,然后以Apache Flink项目的成长经历为背景,向大家介绍了如何创建以及管理一个Apache开源项目,如何为Apache开源项目做贡献,并跟随开源项目一起成长和收获。通过Craig的分享,我们也更详细地了解到了Apache Flink的发展经历。Flink早期起源于德国柏林工业大学的一个研究项目Stratosphere,并于2014年4月捐献给Apache软件基金会,同时重新定位品牌为Flink,经过8个月孵化期,在2014年12月成功从Apache软件基金会毕业,成为Apache顶级项目,从此开始在大数据领域航行。经过最近4年的持续快速发展,Apache Flink社区已经培养出了42名Committer和19名PMC Member,不断加入的新鲜血液为Apache Flink社区持续贡献代码,并推动社区健康快速的发展。云上计算普惠科技在Craig分享后,阿里巴巴集团副总裁、搜索事业部与计算平台事业部负责人周靖人进行了主题演讲。靖人首先向大家介绍了阿里巴巴大数据云上计算的现状和趋势,让大家看到了阿里巴巴大数据业务场景的超大规模,以及未来更大的挑战。为了更好地支持阿里巴巴未来大数据的发展,阿里大数据发展策略一方面要进一步提升计算力和智能化,增强企业级服务能力。同时也要加强技术的生态化建设,大力支持并推动开源技术社区的发展,兼容行业生态标准,发展生态伙伴联盟,推动生态建设。目前阿里巴巴已经参与贡献230+开源项目,具备8000+合作伙伴和2000+ ISV,云上生态也已经突破1000,000开发人员。在大数据领域,阿里巴巴最近几年对Apache Flink社区进行了持续大力的投入,贡献超过15w行代码,主导建立了Flink China中文社区,加速Flink在国内的生态建设,并于今年开始在北京、杭州、上海、深圳等地多次组织Flink Meetup,促进国内Flink技术人员更方便的分享交流。靖人在分享的最后宣布了阿里巴巴内部Flink版本(Blink)将于2019年1月正式开源,本次开源内部版本的目标主要是希望让广大Flink用户能提前享受到阿里巴巴对Flink的改进和贡献。阿里巴巴同时会尽快将Blink中对Flink的各项改进和优化贡献给Flink社区,坚持对Apache Flink一个社区的拥抱和支持。Apache Flink,如何重新定义计算?在靖人宣布阿里巴巴开源内部Flink版本(Blink)后,阿里巴巴集团研究员蒋晓伟分享了Apache Flink在阿里巴巴内部的成长路线以及技术演进之路。阿里巴巴从2015年开始调研Flink,并于2016年第一次在搜索场景中上线Flink,在经过搜索大数据场景的检验后,2017年Flink开始在阿里巴巴集团范围内支持各项实时计算业务, 到目前为止阿里巴巴基于Flink打造的实时计算平台,已经支持了包括淘宝、天猫、支付宝、高德、飞猪、优酷、菜鸟、饿了么等所有阿里巴巴集团下的所有子公司的数据业务,并通过阿里云向中小企业提供一站式实时计算服务。在2018年的双11中,阿里实时计算平台已经实现了峰值每秒17亿次,当天万亿级的消息处理能力。Apache Flink目前在阿里巴巴内部最典型的业务场景是实时BI,阿里巴巴内部有着海量的在线交易以及用户数据,实时看到各个维度的数据统计可以及时地感知并指导阿里巴巴的运营。下图是一个典型的阿里实时BI流程,阿里的在线服务系统和数据库会实时产生大量日志数据并进入消息队列,FlinkJob会从消息队列中实时读取处理这些数据,然后将各种统计分析结果实时更新到KV/Table存储系统中,例如:HBase,终端用户可以通过Dashboard实时看到各种维度的数据统计分析结果。在双11当天,各种维度的实时数据报表是指导双11决策的依据,其中最为关键的就是全球直播的实时GMV成交额。Flink已经连续两年支持阿里巴巴双11实时GMV大屏,一个看似简单的数字,其背后实际上需要大量Flink计算任务平稳、精准地运行支撑。Flink在阿里巴巴另一个典型的应用场景是在线机器学习,传统的离线机器学习方法需要T+1的分析用户历史行为,训练出模型,当第二天模型上线后就已经是过去式,用户当前的需求和预期可能已经完全改变。为了给用户更好的购物消费体验,阿里巴巴的机器学习系统早已经进化到在线学习时代,例如:当一个用户在搜索完一个Query,浏览结果页时,或者点击查看部分商品时,阿里巴巴的在线学习系统已经可以利用这个间隙了解到这个用户当时的意图和偏好,并在下次用户Query时给出更好的排序,并向用户推荐更合适的商品,这种方式不仅可以进一步提升业务效率,同时也能为用户带来更好的产品体验,尤其是在双11这种大促场景,用户的行为时效性都是很短的,只有通过实时在线学习方式,才能做出更加精确的个性化预测和推荐。在线学习系统的优势在于可以实时收集并处理用户的行为数据,从而进行实时流式的特征计算和在线训练,并将模型的增量更新实时同步回在线系统,形成数据闭环,通过不断迭代自动优化系统效率和用户体验。在阿里的业务规模下,整个在线学习流程将会面对海量的用户数据规模、和极其复杂的计算挑战,但在Flink的驱动下,整个流程可以在秒级完成。通过以上两种经典场景可以看出阿里巴巴实时业务场景在各方面的挑战都很大,直接将Flink社区版本在阿里上线使用是不现实的,因此阿里巴巴实时计算团队这两年也对Flink进行了全面的优化、改进和功能扩展,其中有些功能和改进已经推回到了Flink社区。在Flink Runtime领域,阿里巴巴贡献了:全新的分布式系统架构:一方面对Flink的Job调度和资源管理进行了解耦,使得Flink可以原生运行在YARN,K8S之上;另一方面将Flink的Job调度从集中式转为了分布式,使得Flink集群规模可以更大的扩展。完善的容错机制:Flink默认在任何task和master失败后,都会整个Job 重启,阿里巴巴提出的region-based failover策略以及job manager failover/ha机制,让Flink可以运行地更加可靠稳定;大量的性能优化:Flink早期只提供全量Checkpoint机制,这在阿里巴巴大规模State场景下无法正常运行,阿里巴巴提出了增量Checkpoint机制,让Flink即使在TB级State场景下也可以高效运行;Flink Job经常在内部算子或者UDF中访问外部存储系统,例如:mysql,hbase,redis等,一旦出现个别query被卡住,整个task就被卡住,并通过反压影响到整个job,阿里巴巴提出了async IO机制,大幅降低了同步IO访问带来的影响。 此外,阿里巴巴贡献了credit-based的全新网络流控机制,使得Flink网络数据传输性能得到了显著提升。在Flink SQL领域,阿里巴巴贡献了全新的Streaming SQL语义和功能。例如:Agg Retraction,UDX支持,DDL支持和大量的Connector适配。在阿里巴巴,我们发现很多经典的业务场景都是同时具备实时流处理和离线批处理两种需求,而且流处理和批处理中的业务逻辑几乎是一样的,但用户需要开发两套代码,两套集群资源部署,导致额外的成本。例如阿里巴巴的商品搜索索引构建流程,白天需要将商品的更新信息流式同步到搜索引擎中,让用户可以在搜索引擎中看到实时的商品信息,晚上需要将全量的阿里巴巴商品进行批处理构建全量索引,这就是传统的Lambda架构。阿里巴巴的解法是希望提供一套批流融合计算引擎,让用户只需开发一套业务代码,就可以在实时和离线两种场景下复用,这也是在2015年阿里巴巴选择Flink作为未来大数据引擎的初衷。 Flink基于流处理机制实现批流融合相对Spark基于批处理机制实现批流融合的思想更自然,更合理,也更有优势,因此阿里巴巴在基于Flink支持大量核心实时计算场景的同时,也在不断改进Flink的架构,使其朝着真正批流融合的统一计算引擎方向前进。在Flink Runtime领域,阿里巴巴提出了全新的Operator Framework/API设计,使其能够同时适应批流两种算子特性;同时在Job调度和网络Shuffle两种核心机制上,都实现了灵活的插件化机制,使其能够适应批流不同场景的需求。在Flink SQL领域,阿里巴巴提出了全新的Query Execution和Optimizer架构,利用高效的二级制数据结构,更加合理的内存利用方式,更细粒度的Codegen机制以及更加丰富的优化器策略,使得Streaming 和Batch SQL都有了非常大的性能提升。经过大量架构改进和性能优化后,阿里巴巴内部Flink版本(Blink)在批处理上也实现了重大成果突破,在1T,10T和30T的TPC-DS的Benchmark中,Blink的性能数据均明显超出Spark,并且性能优势在数据量不断增加的趋势下越来越明显,这也从结果上验证了Flink基于流做批的架构优势。目前,阿里巴巴的内部Flink版本(Blink)已经开始支持内部批流融合的应用场景,例如阿里巴巴的搜索推荐算法平台,流式和批量的特征以及训练流程都已经统一基于Flink在运行。蒋晓伟在分享的最后给出了对Flink未来的一些展望,他认为Flink除了批流融合,还有很多新的方向值得去扩展,例如:Flink可以进一步加强在机器学习和图计算生态上的投入,从而在AI浪潮中实现新的突破。此外,Flink天然具备基于事件驱动的处理思想,天然的反压和流控机制,以及自带状态管理和弹性扩缩容的能力,这些优势都在促使基于Flink构建微服务框架成为一种新的思想和解决方案。总结蒋晓伟老师的分享,Apache Flink过去虽然在流计算领域已经获得很大的成功,但Flink并没有停滞,而是正在不断在突破自己的边界,Flink不仅仅是Streaming Engine,也不仅仅是Bigdata Engine,未来更希望努力成为Application Engine。流处理即未来接下来来自DA(Flink创始公司)的CTO - Stephan Ewen也对Flink的发展趋势给出类似的观点。Stephan认为“Streaming Takes on Everything”即流处理是一切计算的基础, Flink一方面需要朝着离线方向发展,实现批流融合大数据计算能力,另一方面也需要朝着更加实时在线方向发展,支持Event-Driven Application。前面已经重点阐述了Flink在批流融合计算方面的进展,接下来我们重点介绍下Flink在Event-Driven Application方向的思路。传统的应用服务架构一般是Online App +Database的架构,Online App负责接收用户Request,然后进行内部计算,最后将Result返回给用户,Application的内部状态数据存储在Database中;在Flink的event-drivenApplication架构中,可以认为Flink Source接收Request, Sink返回Result,JobGraph进行内部计算,状态数据都存储在State中。传统应用服务架构需要自己负责分布式和弹性管理,并由Database负责数据一致性管理;而Flink在这两方面是存在天然优势的,因为Flink天然是分布式系统,可以自己管理弹性伸缩,此外Flink内置了状态管理和exactly once一致性语义,因此基于Flink可以更方便、高效实现Transactional Application。城市级实时计算的力量在Apache Flink社区大神Stephan Ewen的分享后,来自阿里云的AI首席科学家闵万里向大家分享了实时计算在阿里云智慧城市中发挥的力量,通过分享多个真实应用案例,让大家对实时技术有了更多的体感和认识。在城市大脑的业务场景中,不仅要能实时处理来自各种传感器收集到的信息,对现实世界发生的事情进行响应,同时也要对未来将要发生的事情进行预测,例如:接下来那里可能要发生交通拥堵,从而提前做出干预,这才是更大的价值。整个城市大脑的架构都运行在阿里云基础设施之上,Apache Flink承担了核心实时计算引擎的角色,负责处理各种结构化和非结构化数据。在2018年9月的云栖大会上,阿里云发布了杭州城市大脑2.0,覆盖杭州420平方公里,可以监控到超过150万辆在途行驶机动车的实况信息,这个看似简单的事情在过去是很难做到的,现在我们通过1300多个路口的摄像头、传感器以及高德App的实时信息,通过Flink进行三流合一的处理,就可以实时感知到整个城市交通的脉搏信息,并通过进一步分析可以得出延误、安全等交通指数,预测感知城市的态势发展。在杭州,城市大脑通过实时分析4000多个交通摄像头采集的视频流,可以实时监控路上车辆的异常事件,例如:车辆超速、逆行和擦碰等,并将这些异常事件实时同步到交警指挥中心进行实时报警,目前杭州的交通事件报警已经有95%来自城市大脑自动通报的,这背后都是通过Flink进行各种复杂的计算逻辑实时算出来的。实时计算让交警处理交通故障的方式从过去的被动等待变成了主动处理,从而大幅提升城市交通的效率,为老百姓带来实实在在的好处。这50%,关乎生死2018年,城市大脑第一次走出国门,来到马来西亚吉隆坡,基于实时大数据对交通进行智能调度,它可以根据救护车的行驶信息,以及沿途路况信息,智能调整红绿灯,为救护车开辟绿色快速通道,这项技术为救护车节省了近50%的时间到达医院,这50%的时间可能意味着人的生和死,在这里技术显得不再骨感,实时计算的力量也许可以挽救生命。在工业生产IOT场景中,大量设备的传感器都收集了海量的指标数据,这些信息过去都被暂存2个月后丢弃了,唯一的用途就是在出现生产故障时拿来分析用,在有了大数据实时计算能力后,这些指标都可以被实时监控起来,作为及时调控生产流程的依据。协鑫光伏是全球最大的光伏切片企业,阿里云利用实时设备监控,帮助其提高了1%的良品率,每年可以增加上亿元的收入。滴滴实时计算平台架构与实践Keynote最后一位嘉宾是来自滴滴出行的研究员罗李,大家都知道滴滴出行是一个实时出行平台和交易引擎,它的数据和场景天然是实时的,各种网约车服务产生的数据都需要实时处理和分析。滴滴的实时业务场景主要包括实时风控、实时发券、实时异常检测,实时交易、服务和工单监控,以及实时乘客、司机和订单特征处理等。滴滴实时计算平台发展已经经历了三个阶段,第一阶段是各个业务方自建小集群,造成集群和资源碎片化问题;第二阶段由公司统一建立了大集群,提供统一的平台化服务,降低了集群资源和维护成本;第三阶段是通过Flink SQL方式提供平台化服务,通过SQL语言优势进一步降低业务开发成本,提升开发效率。滴滴现阶段基于Apache Flink引擎建设的实时计算平台以开源的Hadoop技术体系作为平台底座,并通过DataStream, SQL和CEP三种API向滴滴内部业务提供实时计算服务,同时在平台层也已经具备相对完善的WebIDE、数据血缘管理、监控报警和多组合隔离等机制。在滴滴实时业务的快速发展推动下,其实时计算集群已经达到千台规模,每天运行2000+流计算任务,可以处理PB级的数据。滴滴在搭建Flink实时计算平台的过程中,在内部也对Flink做了一些改进,例如在 Stream SQL领域扩展了DDL,丰富了 UDF,支持了TTL的双流Join和维表Join等;在CEP领域,增加了更多算子支持和规则动态修改能力等,其中部分优化已经推回了社区。最后,罗李介绍了滴滴实时计算平台的未来规划,主要方向在于进一步推广Stream SQL提升业务开发效率,推动CEP在更多业务场景落地,同时完成公司内部原有Spark Streaming向Flink的迁移,并发力IOT领域。在下午的几个分会场中,来自阿里巴巴、腾讯、华为、滴滴、美团点评、字节跳动、爱奇艺、去哪儿、Uber、EMC、DA(Flink 创始公司)的多位嘉宾和讲师都围绕Flink技术生态和应用场景进行了分享和交流。从分享的内容上可以看出,BAT三家中阿里巴巴和腾讯都已经完全拥抱了Flink;美团、滴滴和字节跳动(TMD)三家新兴互联网企业在实时计算场景也都已经以Flink作为主流技术方向开始建设,滴滴在Keynote上分享已经令人印象深刻,美团的实时计算集群也已经突破4000台规模,字节跳动(头条和抖音的母公司)的Flink生产集群规模更是超过了1w台的惊人规模 。由此可见Apache Flink的技术理念已经在业界得到了大量认可,基于Flink的实时计算解决方案开始在国内占据主流趋势。下一步Flink需要一方面继续完善流计算能力,争取在IOT等更多场景落地,与此同时进一步加强在批流融合能力上的全面突破,并完善在机器学习和AI生态上的建设,以及在event-driven的application和微服务场景上进行更长远的探索。本文作者: 莫问阅读原文本文来自云栖社区合作伙伴“阿里技术”,如需转载请联系原作者。 ...

January 3, 2019 · 1 min · jiezi

阿里重磅开源Blink:为什么我们等了这么久?

摘要: 阿里巴巴计算平台事业部研究员蒋晓伟深入分享Flink和Blink的关系以及未来发展。推荐阅读。12月20日,由阿里巴巴承办的 Flink Forward China 峰会在北京国家会议中心召开,来自阿里、华为、腾讯、美团点评、滴滴、字节跳动等公司的技术专家与参会者分享了各公司基于 Flink 的应用和实践经验。感兴趣的开发者可以看云栖社区的对于大会的主会+5场分论坛的直播与视频点播。会议进行中,看到AI前线对蒋晓伟的采访。正如许多开发者所关心的Flink和Blink的关系(云栖社区2016年文章:阿里蒋晓伟谈流计算和批处理引擎Blink,以及Flink和Spark的异同与优势),如今有了更新的方向。本篇AI前线的专访讲述的极为清晰。特别转载,共享。*今年,实时流计算技术开始步入主流,各大厂都在不遗余力地试用新的流计算框架,实时流计算引擎和 API 诸如 Spark Streaming、Kafka Streaming、Beam 和 Flink 持续火爆。阿里巴巴自 2015 年开始改进 Flink,并创建了内部分支 Blink,目前服务于阿里集团内部搜索、推荐、广告和蚂蚁等大量核心实时业务。在大会的主题演讲上,阿里巴巴集团副总裁周靖人宣布,阿里巴巴内部 Flink 版本 Blink 将于 2019 年 1 月正式开源! 阿里希望通过 Blink 开源进一步加深与 Flink 社区的联动,并推动国内更多中小型企业使Flink。Flink Forward China会上,AI 前线对阿里巴巴计算平台事业部研究员蒋晓伟(花名量仔)进行了独家专访,他与我们分享了关于下一代实时流计算引擎的看法,并针对 Blink 的重要新特性、开源后 Blink 与 Flink 之间的关系、Blink 后续规划等问题进行了解答。阿里巴巴与 Flink随着人工智能时代的降临和数据量的爆发,在典型的大数据业务场景下,数据业务最通用的做法是:选用批处理的技术处理全量数据,采用流式计算处理实时增量数据。在很多的业务场景之下,用户的业务逻辑在批处理和流处理之中往往是相同的。但是,用户用于批处理和流处理的两套计算引擎是不同的。因此,用户通常需要写两套代码。毫无疑问,这带来了一些额外的负担和成本。阿里巴巴的商品数据处理就经常需要面对增量和全量两套不同的业务流程问题,所以阿里巴巴就在想:能不能有一套统一的大数据引擎技术,用户只需要根据自己的业务逻辑开发一套代码。这样在各种不同的场景下,不管是全量数据还是增量数据,亦或者实时处理,一套方案即可全部支持,这就是阿里巴巴选择 Flink 的背景和初衷。彼时的 Flink 不管是规模还是稳定性尚未经历实践,成熟度有待商榷。阿里巴巴实时计算团队决定在阿里内部建立一个 Flink 分支 Blink,并对 Flink 进行大量的修改和完善,让其适应阿里巴巴这种超大规模的业务场景。简单地说,Blink 就是阿里巴巴开发的基于开源 Flink 的阿里巴巴内部版本。阿里巴巴基于 Flink 搭建的平台于 2016 年正式上线,并从阿里巴巴的搜索和推荐这两大场景开始实现。目前阿里巴巴所有的业务,包括阿里巴巴所有子公司都采用了基于 Flink 搭建的实时计算平台。目前,这套基于 Flink 搭建的实时计算平台不仅服务于阿里巴巴集团内部,而且通过阿里云的云产品 API 向整个开发者生态提供基于 Flink 的云产品支持。以下内容整理自 AI 前线对蒋晓伟的采访。开源的时机AI 前线:为什么选择现在将 Blink 开源?这其中有哪些考量?什么样的时机才是开源最合适的时机?蒋晓伟: 在我看来,有几个因素:第一个因素是,这几年我们一直试图把阿里对 Flink 的改进推回社区,但社区有自己的步伐,很多时候可能无法把我们的变更及时推回去。对于社区来说,需要达成共识,才能更好地保证开源项目的质量,但同时就会导致推入的速度慢一些。经过这几年积累,我们这边和社区之间的差距已经变得比较大了。Blink 有一些很好的新功能,比如批处理功能,在社区版本是没有的。在过去这段时间里,我们不断听到有人问,Blink 什么时候能开源、是不是能开源这样的呼声。我们有两种方法,一种就是慢慢地推回去再给用户用。但我们认为这样等下去对社区不是最好的。我们还是希望尽快把我们的代码拿出来,尽量让大家都能用起来。所以最近这半年,我们一直都在准备把代码整理好去进行开源。选择在这个时间点开源有几个好处:第一个好处是我们所开源的这些代码在阿里内部经过像双一十、双十二这样巨大流量的检验,让我们对它的质量有更大的信心,这是非常大的好处;第二个好处,Flink Forward 大会是第一次在中国举办,在这样一个场合开源表明了阿里对 Flink 社区坚定的支持,这是一个比较好的场合。主要是基于这些考虑。选 Blink 还是 Flink?这不会是一个问题AI 前线:开源的 Blink 版本会和阿里巴巴内部使用的 Blink 保持一致吗?蒋晓伟: 即将开源的是阿里巴巴双十二的上线版本,还会有一些小的改进。AI 前线:Blink 开源后,两个开源项目之间的关系会是怎样的?未来 Flink 和 Blink 也会由不同的团队各自维护吗?蒋晓伟: 开源的意思是,我们愿意把 Blink 的代码贡献出来,但这两个项目是一个项目。有一件事情需要澄清一下,我们将公开 Blink 的所有代码,让大家都可以看到,但与此同时,我们会跟社区一起努力,通过讨论决定 Blink 以什么样的方式进入 Flink 是最合适的。因为 Flink 是一个社区的项目,我们需要经过社区的同意才能以分支的形式进入 Flink,或者作为变更 Merge 到项目中。我想强调一下,我们作为社区的一员需要跟社区讨论才能决定这件事情。Blink 永远不会成为另外一个项目,如果后续进入 Apache 一定是成为 Flink 的一部分,我们没有任何兴趣另立旗帜,我们永远是 Flink 的一部分,也会坚定地支持 Flink。我们非常愿意把 Blink 的代码贡献给所有人,所以明年 1 月份我们会先将 Blink 的代码公开,但这期间我们也会和社区讨论,以什么样的形式进入 Flink 是最合适的、怎么贡献是社区最希望的方式。我们希望,在 Blink 开源之后,和社区一起努力,把 Blink 好的地方逐步推回 Flink,成为 Flink 的一部分,希望最终 Flink 和 Blink 变成一个东西,阿里巴巴和整个社区一起来维护。而不是把它分成两个东西,给用户选择的困难,这不是我们想要的。因此未来用户也不会面临已经部署了 Flink、是否要把 Flink 迁移到 Blink 的问题,企业选型时也不需要在 Flink 和 Blink 之间抉择,Blink 和 Flink 会是同一个项目。Blink 开源只有一个目的,就是希望 Flink 做得更好。Blink 改进了什么?AI 前线:能不能重点介绍一下即将开源的 Blink 版本有哪些比较重要的新技术特性?与 Flink 最新发布版本相比,阿里的 Blink 做了哪些方面的优化和改进?蒋晓伟: 阿里巴巴实时计算团队不仅对 Flink 在性能和稳定性上做出了很多改进和优化,同时在核心架构和功能上也进行了大量创新和改进。过去两年多,有很多更新已经推回给社区了,包括 Flink 新的分布式架构等。目前我们的 Blink 版本跟社区版本还有几点差异,第一个是稳定性方面,我们做了一些优化,在某些场景会比社区版本更加稳定,特别是在大规模场景。另外还有一个比较大的不一样是我们全新的 Flink SQL 技术栈,它在功能上,特别是在批处理的功能上比社区版本强大很多。它支持现在标准 SQL 几乎所有的语法和语义。另外,在性能上,无论是在流式 SQL 还是批 SQL,我们的版本在性能上都有很大的优势。特别是在批 SQL 的性能方面,当前 Blink 版本是社区版本性能的 10 倍以上,跟 Spark 相比,在 TPCDS 这样的场景 Blink 的性能也能达到 3 倍以上。如果用户对批处理或者对 SQL 有着比较强的需求,我们这个版本会用户可以得到很多好处。Blink 在阿里内部的应用AI 前线:请介绍一下 Blink 在阿里内部的使用情况。目前 Blink 在阿里的大数据架构中扮演什么样的角色?在阿里内部主要用于哪些业务和应用场景?蒋晓伟: 现在阿里的大数据平台上,所有的实时计算都已经在使用 Blink;同时,除了实时计算以外,在一些流批一体化的场景也会用 Blink 来做批处理;我们在机器学习场景也有一个探索,叫做 Alink,这个项目是对 Flink Machine Learning Library 的改进,其中实现了大量的算法,都是基于 Flink 做实时机器学习的算法,Alink 在很多场景已经被证明在规模上有很大的优势。同时,我们在图计算场景也有一些探索。AI 前线:目前阿里内部有多少部门在使用 Blink?蒋晓伟: 前段时间我们刚刚做过统计,阿里的技术部门大约有 70% 都在使用 Blink。Blink 一直是在用户的反馈之中成长起来的,对于内部用户反馈的数据倾斜、资源使用率、易用性方面的问题,Blink 都做了针对性的改进。现在 Blink 用的最多的场景主要还是实时计算方面,阿里还有一些业务现在相对比较新,还没有进入实时计算的领域,等这些业务进入实时计算领域时也会使用 Blink。在批处理方面,阿里内部也有一个自研的批处理引擎叫做 MaxCompute,MaxCompute 也会拥抱 Flink 生态,在语法和语义上做和 Flink 兼容的工作。未来,整个阿里的计算体系和平台都会融入同一个生态。后续规划AI 前线:接下来阿里对于 Blink 还有哪些规划?包括技术改进、落地应用、更新维护、社区等几个方面。蒋晓伟: 从技术上说,今天我们公布了 Flink 在批处理上的成果,接下来,我们会对技术持续投入,我们希望每几个月就能看到技术上有一个比较大的亮点。下一波亮点应该是机器学习场景。要把机器学习支持好,有一系列的工作要做,包括引擎的功能、性能和易用性。这些工作我们已经在内部的讨论和进行之中,接下来几个月,大家应该会看到一些成果。我们也在和社区讨论一些事情。除了机器学习之外,我们在图计算方面也有一些探索,包括对增量迭代更好的支持。做完这些之后,可以认为 Flink 作为大数据的计算引擎已经比较完备了。同时,我们也重点去做 Flink 的生态,包括 Flink 与其他系统之间的关系、易用性等。Flink 要真正做好,不仅需要它本身功能强大,还需要把整个生态做得非常强大。这部分我们甚至会跟一些 ISV 合作,看看是不是能够在 Flink 之上提供更好的解决方案,进一步降低用户的使用门槛。在社区方面,我们希望能够把把 Blink 完全融入 Flink 社区,一起做 Flink 社区的运营,让 Flink 真正在中国、乃至全世界大规模地使用起来。在应用方面,实时流计算其实有很多很有潜力的应用场景,但有一些可能大家不是非常熟悉,我们会对这些场景做一些推广。以实时机器学习为例,它往往能够给我们带来比一般的机器学习更大的效果提升。去年,实时强化学习给我们在搜索上带来了 20% 以上的提升。除此之外,在安全领域(比如实时的 Fraud Detection)、监控报警方面,还有 IoT 领域,实时流计算都有非常广泛的应用场景。这些 Flink 现在可能已经做了,但是大家还没有意识到,Flink 能够给大家带来这样的商业上的好处。AI 前线:Blink 开源之后,后续阿里在这基础上做的变更和更新会以什么样的方式推回社区版本?蒋晓伟: 我们理想的方式是,阿里内部的版本是社区的 Flink 版本加上一些定制化的插件,不需要对 Flink 本身做修改,而是对 Flink 做增加。比如跟阿里内部系统交互的部分跟社区是不适用的,就会保持在内部,我们希望这些修改不动 Flink 代码,而是用插件的方式加在 Flink 上面。最终的方式就是,对于所有公司都有用的修改会在 Flink 代码本身做修改,使所有使用 Flink 的公司都能从中获利,而对接阿里内部系统的部分就只在阿里内部使用。下一代实时流计算引擎之争AI 前线:先在很多人提到实时流计算引擎,都会拿 Spark 和 Flink 来做对比,您怎么看待下一代实时流计算引擎之争?未来实时流计算引擎最重要的发展方向是什么?蒋晓伟:Spark 和 Flink 一开始 share 了同一个梦想,他们都希望能够用同一个技术把流处理和批处理统一起来,但他们走了完全不一样的两条路,前者是用以批处理的技术为根本,并尝试在批处理之上支持流计算;后者则认为流计算技术是最基本的,在流计算的基础之上支持批处理。正因为这种架构上的不同,今后二者在能做的事情上会有一些细微的区别。比如在低延迟场景,Spark 基于微批处理的方式需要同步会有额外开销,因此无法在延迟上做到极致。在大数据处理的低延迟场景,Flink 已经有非常大的优势。经过我们的探索,Flink 在批处理上也有了比较大的突破,这些突破都会反馈回社区。当然,对于用户来说,多一个选择永远是好的,不同的技术可能带来不同的优势,用户可以根据自己业务场景的需求进行选择。未来,在大数据方向,机器学习正在逐渐从批处理、离线学习向实时处理、在线学习发展,而图计算领域同样的事情也在发生,比如实时反欺诈通常用图计算来做,而这些欺诈事件都是实时地、持续不断地发生,图计算也在变得实时化。但是 Flink 除了大数据领域以外,在应用和微服务的场景也有其独特的优势。应用和微服务场景对延迟的要求非常苛刻,会达到百毫秒甚至十毫秒级别,这样的延迟只有 Flink 的架构才能做到。我认为应用和微服务其实是非常大的领域,甚至可能比大数据更大,这是非常激动人心的机会。上面这些都是我们希望能够拓宽的应用领域。AI 前线:在技术方面,Spark 和 Flink 其实是各有千秋,但在生态和背后支持的公司上面,Flink 是偏弱的,那么后续在生态和企业支持这块,阿里会如何帮助 Flink?蒋晓伟: 这次阿里举办 Flink Forward China 就是想推广 Flink 生态的重要举动之一。除了 Flink Forward China 大会,我们还会不定期举办各种线下 Meetup,投入大量精力打造中文社区,包括将 Flink 的英文文档翻译成中文、打造 Flink 中文论坛等。在垂直领域,我们会去寻找一些合作伙伴,将 Flink 包装在一些解决方案中提供给用户使用。AI 前线:关于开源项目的中立性问题。阿里现在在大力地推动 Flink 开源项目的应用和社区的发展,但业界其他公司(尤其是与阿里在其他业务上可能有竞争的公司)在考虑是否采用 Flink 的时候可能还是会对社区的中立性存在一些疑虑,对于这一点,阿里是怎么考虑的?蒋晓伟: 阿里本身会投入非常大的力量推动 Flink 社区的发展和壮大,但我们也非常希望有更多企业、更多人加入社区,和阿里一起推动社区发展,这次阿里承办 Flink Forward China 峰会就是想借此机会让更多公司参与进来。光阿里一家是无法把 Flink 生态做起来的。我希望大家能够看到我们在做的事情,然后消除这样的疑虑。我们会用自己的行动表明,我们是真的希望把 Flink 的社区做大,在这件事情上,我们并不会有私心。本文作者:阿里云头条阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

December 21, 2018 · 2 min · jiezi

Flink SQL 核心解密 —— 提升吞吐的利器 MicroBatch

之前我们在 Flink SQL 中支持了 MiniBatch, 在支持高吞吐场景发挥了重要作用。今年我们在 Flink SQL 性能优化中一项重要的改进就是升级了微批模型,我们称之为 MicroBatch,也叫 MiniBatch2.0。在设计和实现 Flink 的流计算算子时,我们一般会把“面向状态编程”作为第一准则。因为在流计算中,为了保证状态(State)的一致性,需要将状态数据存储在状态后端(StateBackend),由框架来做分布式快照。而目前主要使用的RocksDB,Niagara状态后端都会在每次read和write操作时发生序列化和反序列化操作,甚至是磁盘的 I/O 操作。因此状态的相关操作通常都会成为整个任务的性能瓶颈,状态的数据结构设计以及对状态的每一次访问都需要特别注意。微批的核心思想就是缓存一小批数据,在访问状态状态时,多个同 key 的数据就只需要发生一次状态的操作。当批次内数据的 key 重复率较大时,能显著降低对状态的访问频次,从而大幅提高吞吐。MicroBatch 和 MiniBatch 的核心机制是一样的,就是攒批,然后触发计算。只是攒批策略不太一样。我们先讲解触发计算时是如何节省状态访问频次的。微批计算MicroBatch 的一个典型应用场景就是 Group Aggregate。例如简单的求和例子:SELECT key, SUM(value) FROM T GROUP BY key如上图所示,当未开启 MicroBatch 时,Aggregate 的处理模式是每来一条数据,查询一次状态,进行聚合计算,然后写入一次状态。当有 N 条数据时,需要操作 2*N 次状态。当开启 MicroBatch 时,对于缓存下来的 N 条数据一起触发,同 key 的数据只会读写状态一次。例如上图缓存的 4 条 A 的记录,只会对状态读写各一次。所以当数据的 key 的重复率越大,攒批的大小越大,那么对状态的访问会越少,得到的吞吐量越高。攒批策略攒批策略一般分成两个维度,一个是延时,一个是内存。延时即控制多久攒一次批,这也是用来权衡吞吐和延迟的重要参数。内存即为了避免瞬间 TPS 太大导致内存无法存下缓存的数据,避免造成 Full GC 和 OOM。下面会分别介绍旧版 MiniBatch 和 新版 MicroBatch 在这两个维度上的区别。MiniBatch 攒批策略MiniBatch 攒批策略的延时维度是通过在每个聚合节点注册单独的定时器来实现,时间分配策略采用简单的均分。比如有4个 aggregate 节点,用户配置 10s 的 MiniBatch,那么每个节点会分配2.5s,例如下图所示:但是这种策略有以下几个问题:用户能容忍 10s 的延时,但是真正用来攒批的只有2.5秒,攒批效率低。拓扑越复杂,差异越明显。由于上下游的定时器的触发是纯异步的,可能导致上游触发微批的时候,下游也正好触发微批,而处理微批时会一段时间不消费网络数据,导致上游很容易被反压。计时器会引入额外的线程,增加了线程调度和抢锁上的开销。MiniBatch 攒批策略在内存维度是通过统计输入条数,当输入的条数超过用户配置的 blink.miniBatch.size 时,就会触发批次以防止 OOM。但是 size 参数并不是很好评估,一方面当 size 配的过大,可能会失去保护内存的作用;而当 size 配的太小,又会导致攒批效率降低。MicroBatch 攒批策略MicroBatch 的提出就是为了解决 MiniBatch 遇到的上述问题。MicroBatch 引入了 watermark 来控制聚合节点的定时触发功能,用 watermark 作为特殊事件插入数据流中将数据流切分成相等时间间隔的一个个批次。实现原理如下所示:MicroBatch 会在数据源之后插入一个 MicroBatchAssigner 的节点,用来定时发送 watermark,其间隔是用户配置的延时参数,如10s。那么每隔10s,不管数据源有没有数据,都会发一个当前系统时间戳的 watermark 下去。一个节点的当前 watermark 取自所有 channel 的最小 watermark 值,所以当聚合节点的 watermark 值前进时,也就意味着攒齐了上游的一个批次,我们就可以触发这个批次了。处理完这个批次后,需要将当前 watermark 广播给下游所有 task。当下游 task 收齐上游 watermark 时,也会触发批次。这样批次的触发会从上游到下游逐级触发。这里将 watermark 作为划分批次的特殊事件是很有意思的一点。Watermark 是一个非常强大的工具,一般我们用来衡量业务时间的进度,解决业务时间乱序的问题。但其实换一个维度,它也可以用来衡量全局系统时间的进度,从而非常巧妙地解决数据划批的问题。因此与 MiniBatch 策略相比,MicroBatch 具有以下优点:相同延时下,MicroBatch 的攒批效率更高,能攒更多的数据。由于 MicroBatch 的批次触发是靠事件的,当上游触发时,下游不会同时触发,所以不像 MiniBatch 那么容易引起反压。解决数据抖动问题(下一小节分析)我们利用一个 DAU 作业进行了性能测试对比,在相同的 allowLatency(6秒)配置的情况下,MicroBatch 能得到更高的吞吐,而且还能得到与 MiniBatch 相同的端到端延迟!另外,仍然是上述的性能测试对比,可以发现运行稳定后 MicroBatch 的队列使用率平均值在 50% 以下,而 MiniBatch 基本是一直处于队列满载下。说明 MicroBatch 比 MiniBatch 更加稳定,更不容易引起反压。MicroBatch 在内存维度目前仍然与 MiniBatch 一样,使用 size 参数来控制条数。但是将来会基于内存管理,将缓存的数据存于管理好的内存块中(BytesHashMap),从而减少 Java 对象的空间成本,减少 GC 的压力和防止 OOM。防止数据抖动所谓数据抖动问题是指,两层 AGG 时,第一层 AGG 发出的更新消息会拆成两条独立的消息被下游消费,分别是retract 消息和 accumulate 消息。而当第二层 AGG 消费这两条消息时也会发出两条消息。从前端看到就是数据会有抖动的现象。例如下面的例子,统计买家数,这里做了两层打散,第一层先做 UV 统计,第二级做SUM。SELECT day, SUM(cnt) totalFROM ( SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, MOD(buy_id, 1024))GROUP BY day当第一层count distinct的结果从100上升到101时,它会发出 -100, +101 的两条消息。当第二层的 SUM 会依次收到这两条消息并处理,假设此时 SUM 值是 900,那么在处理 -100 时,会先发出 800 的结果值,然后处理 +101 时,再发出 901 的结果值。从用户端的感受就是买家数从 900 降到了 800 又上升到了 901,我们称之为数据抖动。而理论上买家数只应该只增不减的,所以我们也一直在思考如何解决这个问题。数据抖动的本质原因是 retract 和 accumulate 消息是一个事务中的两个操作,但是这两个操作的中间结果被用户看到了,也就是传统数据库 ACID 中的隔离性(I) 中最弱的 READ UNCOMMITTED 的事务保障。要从根本上解决这个问题的思路是,如何原子地处理 retract & accumulate 的消息。如上文所述的 MicroBatch 策略,借助 watermark 划批,watermark 不会插在 retract & accumulate 中间,那么 watermark 就是事务的天然分界。按照 watermark 来处理批次可以达到原子处理 retract & accumulate 的目的。从而解决抖动问题。适用场景与使用方式MicroBatch 是使用一定的延迟来换取大量吞吐的策略,如果用户有超低延迟的要求的话,不建议开启微批处理。MicroBatch 目前对于无限流的聚合、Join 都有显著的性能提升,所以建议开启。如果遇到了上述的数据抖动问题,也建议开启。MicroBatch默认关闭,开启方式:# 攒批的间隔时间,使用 microbatch 策略时需要加上该配置,且建议和 blink.miniBatch.allowLatencyMs 保持一致blink.microBatch.allowLatencyMs=5000# 使用 microbatch 时需要保留以下两个 minibatch 配置blink.miniBatch.allowLatencyMs=5000# 防止OOM,每个批次最多缓存多少条数据blink.miniBatch.size=20000后续优化MicroBatch 目前只支持无限流的聚合和 Join,暂不支持 Window Aggregate。所以后续 Window Aggregate 会重点支持 MicroBatch 策略,以提升吞吐性能。另一方面,MicroBatch 的内存会考虑使用二进制的数据结构管理起来,提升内存的利用率和减轻 GC 的影响。本文作者:jark阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

December 7, 2018 · 2 min · jiezi