想学Nodejsstream先有必要搞清楚

什么是stream定义流的英文stream,流(Stream)是一个抽象的数据接口,Node.js中很多对象都实现了流,流是EventEmitter对象的一个实例,总之它是会冒数据(以 Buffer 为单位),或者能够吸收数据的东西,它的本质就是让数据流动起来。可能看一张图会更直观: 注意:stream不是node.js独有的概念,而是一个操作系统最基本的操作方式,只不过node.js有API支持这种操作方式。linux命令的|就是stream。 为什么要学习stream视频播放例子小伙伴们肯定都在线看过电影,对比定义中的图-水桶管道流转图,source就是服务器端的视频,dest就是你自己的播放器(或者浏览器中的flash和h5 video)。大家想一下,看电影的方式就如同上面的图管道换水一样,一点点从服务端将视频流动到本地播放器,一边流动一边播放,最后流动完了也就播放完了。 说明:视频播放的这个例子,如果我们不使用管道和流动的方式,直接先从服务端加载完视频文件,然后再播放。会造成很多问题 因内存占有太多而导致系统卡顿或者崩溃因为我们的网速 内存 cpu运算速度都是有限的,而且还要有多个程序共享使用,一个视频文件加载完可能有几个g那么大。读取大文件data的例子有一个这样的需求,想要读取大文件data的例子 使用文件读取 const http = require('http');const fs = require('fs');const path = require('path');const server = http.createServer(function (req, res) { const fileName = path.resolve(__dirname, 'data.txt'); fs.readFile(fileName, function (err, data) { res.end(data); });});server.listen(8000);使用文件读取这段代码语法上并没有什么问题,但是如果data.txt文件非常大的话,到了几百M,在响应大量用户并发请求的时候,程序可能会消耗大量的内存,这样可能造成用户连接缓慢的问题。而且并发请求过大的话,服务器内存开销也会很大。这时候我们来看一下用stream实现。 const http = require('http');const fs = require('fs');const path = require('path');const server = http.createServer(function (req, res) { const fileName = path.resolve(__dirname, 'data.txt'); let stream = fs.createReadStream(fileName); // 这一行有改动 stream.pipe(res); // 这一行有改动});server.listen(8000);使用stream就可以不需要把文件全部读取了再返回,而是一边读取一边返回,数据通过管道流动给客户端,真的减轻了服务器的压力。 ...

August 19, 2019 · 2 min · jiezi

从-Spark-Streaming-到-Apache-Flink-实时数据流在爱奇艺的演进

本文将为大家介绍Apache Flink在爱奇艺的生产与实践过程。你可以借此了解到爱奇艺引入Apache Flink的背景与挑战,以及平台构建化流程。主要内容如下: 爱奇艺在实时计算方面的的演化和遇到的一些挑战爱奇艺使用Flink的User Case爱奇艺Flink平台化构建流程爱奇艺在Flink上的改进未来工作爱奇艺简介 爱奇艺在2010年正式上线,于2018年3月份在纳斯达克上市。我们拥有规模庞大且高度活跃的用户基础,月活跃用户数5.65亿人,在在线视频领域名列第一。在移动端,爱奇艺月度总有效时长59.08亿小时,稳居中国APP榜第三名。 一、爱奇艺在实时计算方面的演化和遇到的一些挑战1. 实时计算在爱奇艺的演化过程 实时计算是基于一些实时到达、速率不可控、到达次序独立不保证顺序、一经处理无法重放除非特意保存的无序时间序列的数据的在线计算。 因此,在实时计算中,会遇到数据乱序、数据延时、事件时间与处理时间不一致等问题。爱奇艺的峰值事件数达到1100万/秒,在正确性、容错、性能、延迟、吞吐量、扩展性等方面均遇到不小的挑战。 爱奇艺从2013年开始小规模使用storm,部署了3个独立集群。在2015年,开始引入Spark Streaming,部署在YARN上。在2016年,将Spark Streaming平台化,构建流计算平台,降低用户使用成本,之后流计算开始在爱奇艺大规模使用。在2017年,因为Spark Streaming的先天缺陷,引入Flink,部署在独立集群和YARN上。在2018年,构建Streaming SQL与实时分析平台,进一步降低用户使用门槛。 2. 从Spark Streaming到Apache Flink 爱奇艺主要使用的是Spark Streaming和Flink来进行流式计算。Spark Streaming的实现非常简单,通过微批次将实时数据拆成一个个批处理任务,通过批处理的方式完成各个子Batch。Spark Streaming的API也非常简单灵活,既可以用DStream的java/scala API,也可以使用SQL定义处理逻辑。但Spark Streaming受限于微批次处理模型,业务方需要完成一个真正意义上的实时计算会非常困难,比如基于数据事件时间、数据晚到后的处理,都得用户进行大量编程实现。爱奇艺这边大量使用Spark Streaming的场景往往都在于实时数据的采集落盘。 Apache Flink框架的实时计算模型是基于Dataflow Model实现的,完全支持Dataflow Model的四个问题:What,支持定义DAG图;Where:定义各类窗口(固定窗口、滑动窗口和Session窗口);When:支持灵活定义计算触发时间;How:支持丰富的Function定义数据更新模式。和Spark Streaming一样,Flink支持分层API,支持DataStream API,Process Function,SQL。Flink最大特点在于其实时计算的正确性保证:Exactly once,原生支持事件时间,支持延时数据处理。由于Flink本身基于原生数据流计算,可以达到毫秒级低延时。 在爱奇艺实测下来,相比Spark Streaming,Apache Flink在相近的吞吐量上,有更低的延时,更好的实时计算表述能力,原生实时事件时间、延时数据处理等。 二、在爱奇艺使用Flink的一些案例下面通过三个Use Case来介绍一下,爱奇艺具体是怎么使用Flink的,包括海量数据实时ETL,实时风控,分布式调用链分析。 1. 海量数据实时ETL 在爱奇艺这边所有用户在端上的任何行为都会发一条日志到nginx服务器上,总量超过千万QPS。对于具体某个业务来说,他们后续做实时分析,只希望访问到业务自身的数据,于是这中间就涉及一个数据拆分的工作。 在引入Flink之前,最早的数据拆分逻辑是这样子的,在Ngnix机器上通过“tail -f /xxx/ngnix.log | grep "xxx"”的方式,配置了无数条这样的规则,将这些不同的数据按照不同的规则,打到不同的业务kafka中。但这样的规则随着业务线的规模的扩大,这个tail进程越来越多,逐渐遇到了服务器性能瓶颈。 于是,我们就有了这样一个设想,希望通过实时流计算将数据拆分到各个业务kafka。具体来说,就是Nginx上的全量数据,全量采集到一级Kafka,通过实时ETL程序,按需将数据采集到各个业务Kafka中。当时,爱奇艺主的实时流计算基本均是基于Spark Streaming的,但考虑到Spark Streaming延迟相对来说比较高,爱奇艺从这个case展开开始推进Apache Flink的应用。 海量数据实时ETL的具体实现,主要有以下几个步骤: 解码:各个端的投递日志格式不统一,需要首先将各个端的日志按照各种解码方式解析成规范化的格式,这边选用的是JSON风控:实时拆分这边的数据都会过一下风控的规则,过滤掉很大一部分刷量日志。由于量级太高,如果将每条日志都过一下风控规则,延时会非常大。这边做了几个优化,首先,将用户数据通过DeviceID拆分,不同的DeviceID拆分到不同的task manager上,每个task manager用本地内存做一级缓存,将redis和flink部署在一起,用本地redis做二级缓存。最终的效果是,每秒redis访问降到了平均4k,实时拆分的P99延时小于500ms。拆分:按照各个业务进行拆分采样、再过滤:根据每个业务的拆分过程中根据用户的需求不同,有采样、再过滤等过程 2. 实时风控 防机器撞库盗号攻击是安全风控的一个常见需求,主要需求集中于事中和事后。在事中,进行超高频异常检测分析,过滤用户异常行为;在事后,生成IP和设备ID的黑名单,供各业务实时分析时进行防刷使用。 ...

June 21, 2019 · 1 min · jiezi

Vue-组件间通信方式

Vue 是数据驱动的视图框架,那么组件间的数据通信是必然的事情,那么组件间如何进行数据传递呢? 首先组件间通信有父子组件、兄弟组件、堂兄弟组件、叔侄组件等,分类太多可能不好理解,我们暂且分为: 父子组件通信子父组件通信非父子组件通信 兄弟组件通信非兄弟组件通信(不是直属关系,如堂兄组件、叔侄组件等)后续的组件间通信方式的例子就会根据这些分类进行说明。 Vue 本身提供哪几种通信方式?首先 Vue 灵感源于 angular,支持双向绑定,Vue 本质还是单向数据流。跟 React 一样,组件间最基本的数据流是通过 prop 向子组件传递数据。 这里列举一下 Vue 本身支持的通信方式: prop$emit这个其实类似 React 的 props 回调。 provide / inject如果你熟悉 React,这与 React 的 context 特性很相似。 那么有人说 $attrs 、$listener 呢?这些严格意义上不能归纳为数据流的通信方式,这些只是辅助属性,本人也不建议过多的使用这些 $ 属性,除了一些有必要的场景。 propprop 是 Vue 三大核心概念之一,prop 在组件中无处不在。prop 只可以从上一级组件传递到下一级组件(父子组件),即所谓的单向数据流。而且 prop 只读,不可被修改,所有修改都会失效并警告。 可以先阅读官网的 通过 Prop 向子组件传递数据 的教程。 父子组件通信这里也编写了一个简单的例子 http://jsrun.net/wXyKp/edit。 子父组件通信不是说是单向数据流吗,怎么还可以使用 prop 进行子父组件通信?这样想是对的,prop 是无法向上传递数据,但是我们可以使用回调啊。数据流的确向上走了,但是这并不违反单向数据流的思想,这个并不会使得数据流混乱,还是比较清晰。 这个 prop 回调方式,在 React 会经常使用。但是在 Vue 却很少使用,因为组件可以自定义事件,即后面的 $emit 组件间通信方式(其实就是订阅发布模式)。 ...

May 23, 2019 · 1 min · jiezi

5分钟从零构建第一个-Apache-Flink-应用

摘要:在本文中,我们将从零开始,教您如何构建第一个Apache Flink (以下简称Flink)应用程序。开发环境准备Flink 可以运行在 Linux, Max OS X, 或者是 Windows 上。为了开发 Flink 应用程序,在本地机器上需要有 Java 8.x 和 maven 环境。 如果有 Java 8 环境,运行下面的命令会输出如下版本信息: $ java -versionjava version "1.8.0_65"Java(TM) SE Runtime Environment (build 1.8.0_65-b17)Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)如果有 maven 环境,运行下面的命令会输出如下版本信息: $ mvn -versionApache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)Maven home: /Users/wuchong/dev/mavenJava version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jreDefault locale: zh_CN, platform encoding: UTF-8OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"另外我们推荐使用 ItelliJ IDEA (社区免费版已够用)作为 Flink 应用程序的开发 IDE。Eclipse 虽然也可以,但是 Eclipse 在 Scala 和 Java 混合型项目下会有些已知问题,所以不太推荐 Eclipse。下一章节,我们会介绍如何创建一个 Flink 工程并将其导入 ItelliJ IDEA。 ...

May 9, 2019 · 2 min · jiezi

实时计算无线数据分析

案例与解决方案汇总页:阿里云实时计算产品案例&解决方案汇总本文为您介绍实时计算在无线数据分析中的应用。阿里云实时计算可以为无线App的数据分析场景实时化助力,帮助您做到实时化分析手机AP的各项指标,包括App版本分布情况、Crash检测和等。阿里云移动数据分析 (Mobile Analytics,下面简称MAN) 是阿里云推出的一款移动App数据统计分析产品,提供通用的多维度用户行为分析,支持日志自主分析,助力移动开发者实现基于大数据技术的精细化运营、提升产品质量和体验、增强用户黏性。在流式处理部分,MAN采用阿里云实时计算作为底层的流式大数据分析引擎,为移动数据分析的客户提供一整套实时化的移动App分析报表服务。如下图。MAN目前在阿里云上用户数已经超过数百家。结合阿里云大数据平台(数加),未来MAN将会开放更多实时化、个性化,甚至自定义分析逻辑的无线分析功能给用户,可以极大拓展整个产品功能覆盖面。MAN现有全流程的数据流如下:数据采集开发者选择使用阿里云移动数据分析提供的SDK,将其内置到其App的应用安装包中。该SDK会针对不同手机操作系统提供数据采集组件,将用户手机数据、行为数据采集并录入MAN的后台系统,进行数据分析。数据上报MAN的后台提供了一整套SDK数据上报服务,将负责收集使用SDK的手机上报的数据信息。上报服务系统将进行简单去噪处理后投送到DataHub。说明 未来DataHub将直接提供手机端数据上报SDK,实际上MAN后台的上报服务环节可以完全省略(将去噪处理也可以移到实时计算处理),进一步减少MAN的机器成本。实时计算阿里云实时计算将持续订阅上述DataHub的流式数据,持续读取并计算各类App指标,并立即将各时间段结果数据写入在线的RDS/OTS系统中。数据展现MAN提供一整套的运营指标体系,让开发者快速了解用户来自哪里,访问了哪些页面,停留了多长时间,用户终端及网络环境如何,应用程序卡顿或崩溃的实时反馈,其中Crash分析能精确到设备粒度,查看具体设备的详细Crash信息。 说明 以上数据采集自线下DEMO数据,不代表真实数据情况。本文作者:付空 阅读原文本文为云栖社区原创内容,未经允许不得转载。

April 16, 2019 · 1 min · jiezi

PAI通过流式机器学习算法解决实时热点新闻挖掘案例

摘要: (本实验会用到流式机器学习算法,正处于邀测状态,需要申请开通)PAI地址:https://data.aliyun.com/produ…:https://data.aliyun.com/paionlinelearning打开新闻客户端,往往会收到热点新闻推送相关的内容。(机器学习PAI Online Learning模块上线邀测,目前只支持华北2(北京)区域使用,本实验会用到流式机器学习算法)PAI地址:https://data.aliyun.com/product/learn邀测申请地址:https://data.aliyun.com/paionlinelearning打开新闻客户端,往往会收到热点新闻推送相关的内容。新闻客户端作为一个承载新闻的平台,实时会产生大量的新闻,如何快速挖掘出哪些新产生的新闻会成为成为热点新闻,决定着整个平台的新闻推荐质量。如何从平台中海量的新闻素材中找到最有潜力成为热点的新闻需要使用机器学习相关的算法,传统做法是将每天获取的历史咨询下载并且离线训练模型,再将生成的热点发现模型推上线供第二日使用。但是这种离线训练所生成的模型往往缺乏时效性的属性,因为每天热点新闻都是实时产生的,用过去的模型预测实时产生的数据显然是缺乏对数据时效性的理解。针对这种场景,PAI平台开创性的提出来Online-Learning的解决方案,通过流式算法和离线算法的结合,既能够发挥离线训练对大规模数据的强大处理能力,又能够发挥流式机器学习算法对实时模型的更新能力,做到流批同跑,完美解决模型时效性的问题。今天就以实时热点新闻挖掘案例为例,为大家介绍PAI OnlineLearning的解决方案。实验流程1.切换新版进入PAI后,点击“体验新版”按钮即可开启试用(目前OnlineLearning只支持新版,且与旧版不兼容)可在模板中一键创建类似于本文介绍的案例,数据和流程都已经内置,开箱即用模板打开,点击运行后效果(模板目前为简化版本)2.实验流程介绍(注:PAI中离线计算组件用蓝色标识,流式计算组件由绿色标识,流式组件相连将形成计算组,因为流式组件需要多个组件的运行停止状态一致)步骤1:离线模型训练本文使用的数据是3万条来自UCI开放数据集提供的新闻文本数据。地址:https://archive.ics.uci.edu/ml/datasets/Online+News+Popularity数据组成:包含新闻的URL以及产生时间,另外还包含了58个特征以及1个目标值,目标值“share”是新闻的分享次数,建模过程中将share字段利用sql组件处理成一个二分类问题,新闻share次数超过10000次为热点新闻,小于10000次为非热门新闻特征的组成如下图所示:利用逻辑回归模型训练生成一个二分类模型,这个模型用来评估新闻是否会成为热点新闻。(注:目前PAI OnlineLearning只支持逻辑回归算法)步骤2:离线模型转换成流式模型通过“模型转换”组件,可以将离线生成的逻辑回归模型转换成流式算法可读取的流式模型。步骤3:流式模型训练从步骤3开始就进入了流式算法组件的步骤,PAI平台提供多种流式数据源,本案例以Datahub为例。Datahub地址:https://datahub.console.aliyun.com/datahubDatahub是一种流式数据对列,支持JAVA、PYTHON等多种语言采集方式,在具体使用过程中可以通过Datahub链接用户实时产生的数据以及PAI的训练服务。注意:Datahub输入的数据流格式需要与离线训练的数据流的字段完全一致,这样才可以对离线的模型进行实时更新。Ftrl训练组件:左侧输入的是转化为流式的离线模型,右侧输入是流式数据表FTRL算法基本等同于流式的逻辑回归算法,在使用过程中需要按照LR算法配置参数,需要注意”模型保存时间间隔参数“的配置,这个参数决定了实时计算产生模型的时间周期。新版PAI已经内置了大量流式算法组件:PMML模型生成组件:将输出的模型转化成PMML格式OSS文件导出:将模型导出到用户自己的OSS中,可以自己设置名称的前缀和后缀,生成模型可在OSS中查看,如下图步骤4:流式模型评估流式模型评估指的是利用评估数据对Ftrl训练生成的模型进行评估,输出的评估指标也可以写入OSS,评估指标与模型一一对应。每个模型和评估指标都有一个ID,如果ID一致,说明模型和评估指标是对应关系,如下图:评估指标是一个json格式文件,包含精确率、准确率、混淆矩阵等指标:步骤5:流式预测结果实时导出可以利用实时生成的模型做实时数据预测,实时的预测结果可以写出到datahub中,如下图:同时如果输入的预测数据集包含label,还可以添加分类评估组件,可以打开组关系中的最右边按钮:打开实时的流式预测结果评估页面:3.模型使用介绍通过以上步骤已经产生了新闻热点预测模型,生成的模型已经存入OSS,可以直接在PAI-EAS在线预测服务引擎进行部署也可以下载下来在本地预测引擎使用。新闻数据进来后先要做特征工程(同”步骤1:离线模型训练“中的特征处理方式),然后将特征工程处理结果输入”热点新闻挖掘服务“,将会返回新闻是否是热点新闻。总结通过本文的案例,实现了将离线历史数据生成LR模型推送到实时训练环境,再利用实时生成的数据对模型进行更新, 这种实时训练的架构可以完美解决实时热点新闻对于新闻推荐模型的影响问题。欢迎大家试用并给出建议。本文作者:傲海阅读原文本文为云栖社区原创内容,未经允许不得转载。

January 23, 2019 · 1 min · jiezi

阿里专家杜万:Java响应式编程,一文全面解读

本篇文章来自于2018年12月22日举办的《阿里云栖开发者沙龙—Java技术专场》,杜万专家是该专场第四位演讲的嘉宾,本篇文章是根据杜万专家在《阿里云栖开发者沙龙—Java技术专场》的演讲视频以及PPT整理而成。摘要:响应式宣言如何解读,Java中如何进行响应式编程,Reactor Streams又该如何使用?热衷于整合框架与开发工具的阿里云技术专家杜万,为大家全面解读响应式编程,分享Spring Webflux的实践。从响应式理解,到Reactor项目示例,再到Spring Webflux框架解读,本文带你进入Java响应式编程。演讲嘉宾简介:杜万(倚贤),阿里云技术专家,全栈工程师,从事了12年 Java 语言为主的软件开发工作,热衷于整合框架与开发工具,Linux拥趸,问题终结者。合作翻译《Elixir 程序设计》。目前负责阿里云函数计算的工具链开发,正在实践 WebFlux 和 Reactor 开发新的 Web 应用。本次直播视频精彩回顾,戳这里!https://yq.aliyun.com/live/721PPT下载地址:https://yq.aliyun.com/download/3187以下内容根据演讲嘉宾视频分享以及PPT整理而成。本文围绕以下三部分进行介绍:1.Reactive2.Project Reactor3.Spring Webflux一.Reactive1.Reactive Manifesto下图是Reactive Manifesto官方网站上的介绍,这篇文章非常短但也非常精悍,非常值得大家去认真阅读。响应式宣言是一份构建现代云扩展架构的处方。这个框架主要使用消息驱动的方法来构建系统,在形式上可以达到弹性和韧性,最后可以产生响应性的价值。所谓弹性和韧性,通俗来说就像是橡皮筋,弹性是指橡皮筋可以拉长,而韧性指在拉长后可以缩回原样。这里为大家一一解读其中的关键词:1)响应性:快速/一致的响应时间。假设在有500个并发操作时,响应时间为1s,那么并发操作增长至5万时,响应时间也应控制在1s左右。快速一致的响应时间才能给予用户信心,是系统设计的追求。2)韧性:复制/遏制/隔绝/委托。当某个模块出现问题时,需要将这个问题控制在一定范围内,这便需要使用隔绝的技术,避免连锁性问题的发生。或是将出现故障部分的任务委托给其他模块。韧性主要是系统对错误的容忍。3)弹性:无竞争点或中心瓶颈/分片/扩展。如果没有状态的话,就进行水平扩展,如果存在状态,就使用分片技术,将数据分至不同的机器上。4)消息驱动:异步/松耦合/隔绝/地址透明/错误作为消息/背压/无阻塞。消息驱动是实现上述三项的技术支撑。其中,地址透明有很多方法。例如DNS提供的一串人类能读懂的地址,而不是IP,这是一种不依赖于实现,而依赖于声明的设计。再例如k8s每个service后会有多个Pod,依赖一个虚拟的服务而不是某一个真实的实例,从何实现调用1 个或调用n个服务实例对于对调用方无感知,这是为分片或扩展做了准备。错误作为消息,这在Java中是不太常见的,Java中通常将错误直接作为异常抛出,而在响应式中,错误也是一种消息,和普通消息地位一致,这和JavaScript中的Promise类似。背压是指当上游向下游推送数据时,可能下游承受能力不足导致问题,一个经典的比喻是就像用消防水龙头解渴。因此下游需要向上游声明每次只能接受大约多少量的数据,当接受完毕再次向上游申请数据传输。这便转换成是下游向上游申请数据,而不是上游向下游推送数据。无阻塞是通过no-blocking IO提供更高的多线程切换效率。2.Reactive Programming响应式编程是一种声明式编程范型。下图中左侧显示了一个命令式编程,相信大家都比较熟悉。先声明两个变量,然后进行赋值,让两个变量相加,得到相加的结果。但接着当修改了最早声明的两个变量的值后,sum的值不会因此产生变化。而在Java 9 Flow中,按相同的思路实现上述处理流程,当初始变量的值变化,最后结果的值也同步发生变化,这就是响应式编程。这相当于声明了一个公式,输出值会随着输入值而同步变化。响应式编程也是一种非阻塞的异步编程。下图是用reactor.ipc.netty实现的TCP通信。常见的server中会用循环发数据后,在循环外取出,但在下图的实现中没有,因为这不是使用阻塞模型实现,是基于非阻塞的异步编程实现。响应式编程是一种数据流编程,关注于数据流而不是控制流。下图中,首先当页面出现点击操作时产生一个click stream,然后页面会将250ms内的clickStream缓存,如此实现了一个归组过程。然后再进行map操作,得到每个list的长度,筛选出长度大于2的,这便可以得出多次点击操作的流。这种方法应用非常广泛,例如可以筛选出双击操作。由此可见,这种编程方式是一种数据流编程,而不是if else的控制流编程。之前有提及消息驱动,那么消息驱动(Message-driven)和事件驱动(Event-driven)有什么区别呢。1)消息驱动有确定的目标,一定会有消息的接受者,而事件驱动是一件事情希望被观察到,观察者是谁无关紧要。消息驱动系统关注消息的接受者,事件驱动系统关注事件源。2)在一个使用响应式编程实现的响应式系统中,消息擅长于通讯,事件擅长于反应事实。3.Reactive StreamsReactive Streams提供了一套非阻塞背压的异步流处理标准,主要应用在JVM、JavaScript和网络协议工作中。通俗来说,它定义了一套响应式编程的标准。在Java中,有4个Reactive Streams API,如下图所示:这个API中定义了Publisher,即事件的发生源,它只有一个subscribe方法。其中的Subscriber就是订阅消息的对象。作为订阅者,有四个方法。onSubscribe会在每次接收消息时调用,得到的数据都会经过onNext方法。onError方法会在出现问题时调用,Throwable即是出现的错误消息。在结束时调用onComplete方法。Subscription接口用来描述每个订阅的消息。request方法用来向上游索要指定个数的消息,cancel方法用于取消上游的数据推送,不再接受消息。Processor接口继承了Subscriber和Publisher,它既是消息的发生者也是消息的订阅者。这是发生者和订阅者间的过渡桥梁,负责一些中间转换的处理。Reactor Library从开始到现在已经历经多代。第0代就是java包Observable 接口,也就是观察者模式。具体的发展见下图:第四代虽然仍然是RxJava2,但是相比第三代的RxJava2,其中的小版本有了不一样的改进,出现了新特性。Reactor Library主要有两点特性。一是基于回调(callback-based),在事件源附加回调函数,并在事件通过数据流链时被调用;二是声明式编程(Declarative),很多函数处理业务类似,例如map/filter/fold等,这些操作被类库固化后便可以使用声明式方法,以在程序中快速便捷使用。在生产者、订阅者都定义后,声明式方法便可以用来实现中间处理者。二.Project ReactorProject Reactor,实现了完全非阻塞,并且基于网络HTTP/TCP/UDP等的背压,即数据传输上游为网络层协议时,通过远程调用也可以实现背压。同时,它还实现了Reactive Streams API和Reactive Extensions,以及支持Java 8 functional API/Completable Future/Stream /Duration等各新特性。下图所示为Reactor的一个示例:首先定义了一个words的数组,然后使用flatMap做映射,再将每个词和s做连接,得出的结果和另一个等长的序列进行一个zipWith操作,最后打印结果。这和Java 8 Stream非常类似,但仍存在一些区别:1)Stream是pull-based,下游从上游拉数据的过程,它会有中间操作例如map和reduce,和终止操作例如collect等,只有在终止操作时才会真正的拉取数据。Reactive是push-based,可以先将整个处理数据量构造完成,然后向其中填充数据,在出口处可以取出转换结果。2)Stream只能使用一次,因为它是pull-based操作,拉取一次之后源头不能更改。但Reactive可以使用多次,因为push-based操作像是一个数据加工厂,只要填充数据就可以一直产出。3)Stream#parallel()使用fork-join并发,就是将每一个大任务一直拆分至指定大小颗粒的小任务,每个小任务可以在不同的线程中执行,这种多线程模型符合了它的多核特性。Reactive使用Event loop,用一个单线程不停的做循环,每个循环处理有限的数据直至处理完成。在上例中,大家可以看到很多Reactive的操作符,例如flatMap/concatWith/zipWith等,这样的操作符有300多个,这可能是学习这个框架最大的压力。如何理解如此繁多的操作符,可能一个归类会有所帮助:1)新序列创建,例如创建数组类序列等;2)现有序列转换,将其转换为新的序列,例如常见的map操作;3)从现有的序列取出某些元素;4)序列过滤;5)序列异常处理。6)与时间相关的操作,例如某个序列是由时间触发器定期发起事件;7)序列分割;8)序列拉至同步世界,不是所有的框架都支持异步,再需要和同步操作进行交互时就需要这种处理。上述300+操作符都有如下所示的弹珠图(Marble Diagrams),用表意的方式解释其作用。例如下图的操作符是指,随着时间推移,逐个产生了6个元素的序列,黑色竖线表示新元素产生终止。在这个操作符的作用下,下方只取了前三个元素,到第四个元素就不取了。这些弹珠图大家可以自行了解。三.Spring Webflux1.Spring Webflux框架Spring Boot 2.0相较之前的版本,在基于Spring Framework 5的构建添加了新模块Webflux,将默认的web服务器改为Netty,支持Reactive应用,并且Webflux默认运行在Netty上。而Spring Framework 5也有了一些变化。Java版本最低依赖Java 8,支持Java 9和Java 10,提供许多支持Reactive的基础设施,提供面向Netty等运行时环境的适配器,新增Webflux模块(集成的是Reactor 3.x)。下图所示为Webflux的框架:左侧是通常使用的框架,通过Servlet API的规范和Container进行交互,上一层是Spring-Webmvc,再上一层则是经常使用的一些注解。右侧为对应的Webflux层级,只要是支持NIO的Container,例如Tomcat,Jetty,Netty或Undertow都可以实现。在协议层的是HTTP/Reactive Streams。再上一层是Spring-Webflux,为了保持兼容性,它支持这些常用的注解,同时也有一套新的语法规则Router Functions。下图显示了一个调用的实例:在Client端,首先创建一个WebClient,调用其get方法,写入URL,接收格式为APPLICATION_STREAM_JSON的数据,retrieve获得数据,取得数据后用bodyToFlux将数据转换为Car类型的对象,在doOnNext中打印构造好的Car对象,block方法意思是直到回调函数被执行才可以结束。在Server端,在指定的path中进行get操作,produces和以前不同,这里是application/stream+json,然后返回Flux范型的Car对象。传统意义上,如果数据中有一万条数据,那么便直接返回一万条数据,但在这个示例返回的Flux范型中,是不包含数据的,但在数据库也支持Reactive的情况下,request可以一直往下传递,响应式的批量返回。传统方式这样的查询很有可能是一个全表遍历,这会需要较多资源和时间,甚至影响其他任务的执行。而响应式的方法除了可以避免这种情况,还可以让用户在第一时间看到数据而不是等待数据采集完毕,这在架构体验的完整性上有了很大的提升。application/stream+json也是可以让前端识别出,这些数据是分批响应式传递,而不会等待传完才显示。现在的Java web应用可以使用Servlet栈或Reactive栈。Servlet栈已经有很久的使用历史了,而现在又增加了更有优势的Reactive栈,大家可以尝试实现更好的用户体验。2.Reactive编程模型下图中是Spring实现的一个向后兼容模型,可以使用annotation来标注Container。这是一个非常清晰、支持非常细节化的模型,也非常利于同事间的交流沟通。下图是一个Functional编程模型,通过写函数的方式构造。例如下图中传入一个Request,返回Response,通过函数的方法重点关注输入输出,不需要区分状态。然后将这些函数注册至Route。这个模型和Node.js非常接近,也利于使用。3.Spring Data框架Spring Data框架支持多种数据库,如下图所示,最常用的是JPA和JDBC。在实践中,不同的语言访问不同的数据库时,访问接口是不一样的,这对编程人员来说是个很大的工作量。Spring Data便是做了另一层抽象,使你无论使用哪种数据库,都可以使用同一个接口。具体特性这里不做详谈。下图展示了一个Spring Data的使用示例。只需要写一个方法签名,然后注解为Query,这个方法不需要实现,因为框架后台已经采用一些技术,直接根据findByFirstnameAndLastname就可以查询到。这种一致的调用方式无疑提供了巨大的方便。现在Reactive对Spring Data的支持还是不完整的,只支持了MongoDB/Redis/Cassandra和Couchbase,对JPA/LDAP/Elasticsearch/Neo4j/Solr等还不兼容。但也不是不能使用,例如对JDBC数据库,将其转为同步即可使用,重点在于findAll和async两个函数,这里不再展开详述,具体代码如下图所示:Reactive不支持JDBC最根本的原因是,JDBC不是non-blocking设计。但是现在JavaOne已经在2016年9月宣布了Non-blocking JDBC API的草案,虽然还未得到Java 10的支持,但可见这已经成为一种趋势。四.总结Spring MVC框架是一个命令式逻辑,方便编写和调试。Spring WebFlux也具有众多优势,但调试却不太容易,因为它经常需要切换线程执行,出现错误的栈可能已经销毁。当然这也是现今Java的编译工具对WebFlux不太友好,相信以后会改善。下图中列出了Spring MVC和Spring WebFlux各自的特性及交叉的部分。最后也附上一些参考资料。本文作者:李博bluemind阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

December 28, 2018 · 1 min · jiezi

在数据采集器中用TensorFlow进行实时机器学习

摘要: 本文学习如何通过发布的最新TensorFlow Evaluator版本使用TensorFlow(TF)模型进行预测和分类。最新DataOps平台的真正价值,只有在业务用户和应用程序能够从各种数据源来访问原始数据和聚合数据,并且及时地产生数据驱动的认识时,才能够实现。利用机器学习(Machine Learning),分析师和数据科学家可以利用历史数据,以及实时地使用类似TensorFlow(TF)这样的技术,以做出更好的数据驱动业务的线下决策。在本文中,你将学习如何利用TensorFlow模型在StreamSets Data Collector3.5.0和StreamSets Data Collector Edge中最新发布的TensorFlow Evaluator进行预测和分类。在深入讨论细节之前,我们来看一些基本概念。机器学习(Machine Learning)亚瑟·塞缪尔把它描述为:“不需要明确地编写程序而使计算机有能力学习的研究领域。”随着机器学习领域的最新发展,计算机现在有能力做出预测,甚至比人类做的还要好,并且感觉可以解决任何问题。让我们先回顾一下机器学习都解决了什么样的问题吧。通常来说,机器学习被分为两大类:监督学习(Supervised Learning)“监督学习是学习一个函数的机器学习任务,该函数基于输入-输出的实例,将输入映射到输出。”—维基百科(Wikipedia)。它涉及到构建一个精准的模型,当历史数据被标记为一些结果的时候,模型就可以预测出结果了。用监督学习解决的常见业务问题:二元分类(学习预测一个分类值)顾客会购买一个特定产品吗?癌症是恶性的还是良性的?多级分类(学习预测一个分类值)给定的一段文本是否带有病毒、恐吓或淫秽内容?这是山鸢尾、蓝旗鸢尾还是北美鸢尾的物种?回归(学习预测一个连续值)一个代售房子的预测价格是多少?明天旧金山的气温是多少?无监督学习无监督学习允许我们在知道很少,或是完全不知道输出应该是什么样子的情况下处理问题。它涉及在之前数据上的标签是不可用的情况下创建模型。在这类的问题中,通过对基于数据中变量之间的关系进行数据聚类来导出结构。无监督学习的两种常见方法是K-均值聚类(K-means clustering)和DBSCAN。注意:Data Collector和Data Collector Edge中的TensorFlow Evaluator目前仅支持监督学习模型。神经网络与深度学习神经网络是机器学习算法的一种,可以学习和使用受人脑结构启发而来的计算模型。与其它机器学习算法,如决策树、逻辑回归等相比,神经网络具有较高的准确性。Andrew Ng在传统人工神经网络的背景下对深度学习进行了描述。在题为“深度学习、自我学习与无监督特征学习”的演讲中,他把深度学习的思想描述为:“利用了大脑结构的模仿, 希望:让学习算法更好地、更容易地使用;在机器学习和人工智能领域取得革命性的进展;我相信这是我们朝着真正的人工智能前进的最好办法。”常见的神经网络和深度学习应用包括:计算机视觉/图像识别/目标检测语言识别/自然语言处理(NLP)推荐系统(产品、婚介等)异常检测(网络安全等)TensorFlowTensorFlow是为深度神经网络设计的开源机器学习框架,由Google Brain Team开发的。TensorFlow支持在Windows和Mac操作系统上的可伸缩和便携式的训练,包括CPU、GPU和TPU。迄今为止,它是GitHub上最流行的和最活跃的机器学习项目。Data Collector中的TensorFlow随着TensorFlow Evaluator的引入,你现在能够创建管道(pipelines),以获取数据或特征,并在一个可控的环境中生成预测结果或分类,而不必发起对作为Web服务而提供和公布的机器学习模型的HTTP或REST API的调用。例如,Data Collector管道现在可以实时地检测欺诈交易或在文本上执行自然语言处理,因为数据在被存储到最终目的地之前,为了进一步的处理或做决策,正在经过各个阶段。另外,使用Data Collector Edge,你可以在Raspberry Pi和其它运行在所支持的平台上的设备上运行已经启用了的TensorFlow机器学习管道。例如,在高风险地区检测洪水等自然灾害发生的概率,以防止对人们财产的破坏。乳腺癌分类让我们考虑将乳腺癌肿瘤分类成恶性还是良性的例子。乳腺癌是一个经典的数据集,可以作为scikit-learn的一部分。要了解如何在Python中使用该数据集训练和导出一个简单的TensorFlow模型,请查看我在GitHub上的代码。正如你将要看到的那样,模型创建和训练被保持在最小范围,并且非常简单,只有几个隐藏层。最需要注意的重要方面是如何使用TensorFlow SavedModelBuilder来导出和保存模型。注意:要在Data Collector或Data Collector Edge中使用TensorFlow模型,首先应该在你选择支持的开发语言里,如Python,和交互式环境中,如Jupiter Notebook,使用TensorFlow的SavedModelBuilder导出和保存模型。一旦使用TensorFlow的SavedModelBuilder训练并导出了模型,那么在数据流管道中使用它进行预测或分类就非常简单了 — 只要模型保存在Data Collector或Data Collector Edge可访问的位置上即可。管道概述在深入了解细节之前,可以看下管道是什么样的:管道细节目录源:这将从.csv文件中加载乳腺癌的记录数据(注意:这个输入数据源可以非常简单地替换为其它的来源,包括Kafka、AWS S3、MySQL等等);字段转换器:这个处理器将转换供模型所使用的所有输入的乳腺癌记录特征数据,从String类型转换到Float类型(mean_radius,mean_texture,mean_perimeter,mean_area,mean_smoothness,mean_compactness,mean_concavity,mean_concave_points,mean_symmetry,mean_fractal_dimension,radius_error,texture_error,perimeter_error,area_error,smoothness_error,compactness_error,concavity_error,concave_points_error,symmetry_error,fractal_dimension_error,worst_radius,worst_texture,worst_perimeter,worst_area,worst_smoothness,worst_compactness,worst_concavity,worst_concave_points,worst_symmetry,worst_fractal_dimension) ;TensorFlow Evaluator:模型的保存路径:指定要使用的预训练的TensorFlow模型的位置;模型标签:设置为“serve”,因为元图(在我们导出的模型中)要用于服务中。有关详细信息,请参见tag_constants.py和相关的TensorFlow API documentation;输入配置:指定在训练和导出模型期间配置的输入张量信息(请见Train model and save/export it using TensorFlow SavedModelBuilder部分);输出配置:指定在训练和导出模型期间配置的输出张量信息(请见Train model and save/export it using TensorFlow SavedModelBuilder部分);输出字段:我们想保存分类值的输出记录字段;Expression Evaluator:-该处理器评估模型输出或分类值为0或1(存储在输出的字段TF_Model_Classification之中) ,并用Benign或Malignantrespectively这两个值创建一个新的记录字段“Condition”;Stream Selector:该处理器评估癌症状况(良性或恶性)并发送记录到各自的Kafka生产者;Kafka Producers:输入记录以及模型的输出或者分类值被有条件地发送给两个Kafka生产者以获得进一步地处理和分析;*TensorFlow Evaluator配置注意:一旦TensorFlow Evaluator产生了模型输出结果,本实例中采用的管道阶段是可选的,并且可以根据用例的需要与其它处理器和目标进行互换。管道执行在预览管道上,乳腺癌数据记录的输入通过了上面所述的数据流管道过程,包括服务于我们的TensorFlow模型。发送给Kafka生产者的最终输出记录数据(如上所示)包括用于分类的模型所使用的乳腺癌特征,在用户定义的字段TF_Model_Classification中模型输出值为0或1,以及由Expression Evaluator创建的Condition字段中表示相应的癌症状况是良性或恶性。总结本文说明了在Data Collector 3.5.0中使用最新发布的TensorFlow Evaluator。一般来说,这个评估器将允许你提供预训练的TensorFlow模型,用于生成预测结果和分类结果,而无需编写任何自己的代码。本文作者:【方向】阅读原文本文为云栖社区原创内容,未经允许不得转载。

December 3, 2018 · 1 min · jiezi