共计 4476 个字符,预计需要花费 12 分钟才能阅读完成。
在线机器学习与离线相比,在模型更新的时效性,模型的迭代周期,业务试验成果等方面有更好的体现。所以将机器学习从离线迁徙到在线曾经成为晋升业务指标的一个无效的伎俩。
在线机器学习中,样本是要害的一环。本文将给大家具体的介绍微博是如何用 Flink 来实现在线样本生成的。
为何抉择 Flink 来做在线的样本生成?
在线样本生成对样本的时效性和准确性都有极高的要求。同样对作业的稳定性及是否容灾也都有严格的指标要求。基于这个前提,咱们对目前较为风行的几种实时计算框架(Storm 0.10, Spark 2.11, Flink 1.10)进行了剖析比拟,论断如下:
因而,咱们决定应用 Flink 来作为在线样本生成的实时流计算框架。
如何实现?
在线样本生成,简略形容一个业务场景:对用户的曝光数据和点击数据实时的做关联,关联后将数据输入到 Kafka 中,给上游的在线训练作业用。
首先咱们要确定两个数据流关联的工夫窗口。这一步个别倡议先离线对两个数据流的日志做关联,通过离线的形式对两份数据在不同的工夫范畴内做 join,来判断在线须要的工夫窗口。比方业务承受的最低关联比例是 85%,并且通过离线测试确认 20 分钟内两个数据流能够关联 85% 的数据,那么就能够采纳 20 分钟作为工夫窗口。这里的关联比例和窗口工夫实际上是在准确性和实时性之间的一个 trade-off。
确定工夫窗口后,咱们并没有应用 Flink 的 time window 来实现多个数据流的 join,而是抉择采纳 union + timer 形式来实现。这里次要思考两点:第一、Flink 自带的 join 操作不反对多个数据流。第二、应用 timer+state 来实现,自定义水平更高,限度更少,也更不便。
接下来,咱们把样本生成过程细分为:
① 输出数据流
个别咱们的数据源包含 Kafka,Trigger,MQ 等。Flink 须要从数据源中实时的读取日志。
② 输出数据流的格式化和过滤
- 读取日志后,对数据做格式化,并且过滤掉不须要的字段和数据。
- 指定样本 join 的 key。例如:用户 id 和 内容 id 作 key。
- 输入的数据格式个别为 tuple2(K,V),K: 参加 join 的 key。V:样本用到的字段。
③ 输出数据流的 union
- 应用 Flink 的 union 操作,将多个输出流叠加到一起,造成一个 DataStream。
- 为每个输出流指定一个能够辨别的别名或者减少一个能够辨别的字段。
④ 输出数据流的聚合:keyby 操作
- 对 join 的 key 做 keyby 操作。接上例,示意依照用户 id 和内容 id 对多个数据流做 join。
- 如果 key 存在数据歪斜的状况,倡议对 key 加随机数后先聚合,去掉随机数后再次聚合。
⑤ 数据存储 state + timer
- 定义一个 Value State。
- keyby 后的 process 办法中,咱们会重写 processElement 办法,在 processElement 办法中判断,如果 value state 为空,则 new 一个新的 state,并将数据写到 value state 中,并且为这条数据注册一个 timer(timer 会由 Flink 按 key+timestamp 主动去重),另外此处咱们应用的是 ProcessingTime(示意 onTimer()在零碎工夫戳达到 Timer 设定的工夫戳时触发)。如果不为空则依照拼接的策略,更新曾经存在的后果。比方:工夫窗口内 用户 id1,内容 id1 的第一条日志数据没有点击行为,则这个字段为 0,第二条点击数据进入后,将这个字段更新为 1。当然除了更新操作,还有计数、累加、均值等各种操作。如何在 process 里辨别数据是来自曝光还是点击呢,应用下面步骤③定义的别名。
- 重写 onTimer 办法,在 onTimer 办法中次要是定义定时器触发时执行的逻辑:从 value state 里获取到存入的数据,并将数据输入。而后执行 state.clear。
- 样本从窗口输入的条件有 2 个:第一,timer 到期。第二,业务须要的样本都拼接上了。
此处参考伪代码:
public class StateSampleFunction extends KeyedProcessFunction<String, Tuple2, ReturnSample> {
/**
* 这个状态是通过过程函数来保护, 应用 ValueState
*/
private ValueState state;
private Long timer = null;
public StateSampleFunction (String time){timer = Long.valueOf(time);
}
@Override
public void open(Configuration parameters) throws Exception {
// 获取 state
state = getRuntimeContext().getState(new ValueStateDescriptor<>("state", TypeInformation.of(new TypeHint< ReturnSample >() {})));
}
@Override
public void processElement(Tuple2value, Context context, Collector< ReturnSample > collector) throws Exception {if (value.f0 == null){return;}
Object sampleValue = value.f1;
Long time = context.timerService().currentProcessingTime();
ReturnSample returnSample = state.value();
if (returnSample == null) {returnSample = new ReturnSample();
returnSample.setKey(value.f0);
returnSample.setTime(time);
context.timerService().registerProcessingTimeTimer(time +timer);
}
// 更新点击数据到 state 里
if (sampleValue instanceof ClickLog){ClickLog clickLog = (ClickLog)values;
returnSample =(ReturnSample) clickLog.setSample(returnSample);
}
state.update(returnSample);
}
/**
* @param timestamp
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector< ReturnSample > out) throws Exception {ReturnSample value = state.value();
state.clear();
out.collect(value);
}
}
⑥ 拼接后的日志格式化和过滤
- 拼接后的数据须要依照在线训练作业的要求对数据做格式化,比方 json、CSV 等格局。
- 过滤:决定什么样的数据是合格的样本。例如:有真正浏览的内容才算是可用的样本。
⑦ 输入 样本最终输入到实时的数据队列中。上面是理论的作业拓扑和运行时状态:
整个样本拼接过程的流程图:
StateBackend 的选取
应用 RocksDB/Gemini 作为 state 的 Backend 的劣势和倡议:咱们用大数据对 memory 和 RocksDB,Gemini 做了试验比照,结果显示 RocksDB 和 Gemin 在数据处理,作业稳定性和资源应用等方面比 memory 更正当。其中 Gemini 的劣势最为显著。此外,如果是大数据量的 state,倡议应用 Gemini + SSD 固态硬盘。
样本的监控
1. Flink 作业的异样监控
- 作业失败监控
- Failover 监控
- Checkpoint 失败的监控
- RocksDB 应用状况的监控
- 作业生产 Kafka 的 Comsumer Lag 的监控
- 作业反压的监控
2. 样本输出端 Kafka 的生产提早监控
3. 样本输入端 Kafka 的写入量的监控
4. 样本监控
- 拼接率监控
- 正样本监控
- 输入样本格局的监控
- 输入标签对应的值是否在失常范畴
- 输出标签对应的值是否为 null
- 输入标签对应的值是否为空
样本的校验
样本生成后,如何验证数据是否精确
- 在线和离线的互相校验
将在线样本从输入的 Kafka 中接入到 HDFS 上离线存储。并依照在线 join 的工夫窗口来分区。
- 用同等条件下生成的离线样本和在线样本做比照
- 白名单用户的全流程校验
将白名单用户的日志和样本后果存入 ES 等实时数仓中,来做校验。
故障的解决
样本异样对线上模型训练的影响十分大。当发现异常报警时,首先要做的是向在线模型训练作业发送样本异样的报警。收到报警信息后,模型进行更新。从而防止影响模型线上成果。
一般意义的业务故障解决后,抛弃原来的数据,所有输出日志流从最新的工夫点开始生产并生成新的样本即可。重要业务须要重置输出日志流的 Kafka offset 从故障工夫点开始从新生成样本数据。
平台化
通过平台化对样本生成的流程做出严格的标准十分重要。在平台化的过程中,须要提供简略通用的开发模板以进步作业开发效率;提供平台化的作业监控和样本指标监控框架,防止反复造车;提供通用的样本输入落地策略,和在线 / 离线校验策略,更便捷的为业务方服务。
微博基于 Flink 搭建的在线样本生成平台架构,如图:
UI 页面,如图:
基于平台化开发,用户只须要关怀业务逻辑局部即可。须要用户开发的有:
- 对应输出数据的数据荡涤逻辑
- 样本输入前的数据荡涤逻辑
其余的在 UI 上配置即可实现,具体有:
- 输出 Kafka 的配置信息及对应数据荡涤的 UDF 类
- 样本拼接的工夫窗口
- 窗口内对字段的聚合操作
- 样本输入的 Kafka 配置信息及输入前数据荡涤和格式化的 UDF 类
资源状况由平台方审核并配置。实现后,主动生成并提交作业。
作业提交后:
1. 平台会提供如前所述的作业相干监控,如下:
■ Flink 作业的异样监控
- 作业失败监控
- Failover 监控
- Checkpoint 失败的监控
- RocksDB 应用状况的监控
- 作业生产 Kafka 的 Comsumer Lag 的监控
- 作业反压的监控
■ 样本监控
- 拼接率监控
- 正样本监控
- 输入样本格局的监控
- 输入标签对应的值是否在失常范畴
- 输出标签对应的值是否为 null
- 输入标签对应的值是否为空
2. 平台会主动将数据落盘,存储到 HDFS 上。不便离线验证或者离线训练。
3. 用户只需将精力放到样本的验证上即可,由平台方保障作业的稳定性。
作者介绍:
曹富强,微博机器学习研发核心 - 高级零碎工程师。现负责微博机器学习平台数据计算 / 数据存储模块,次要波及实时计算 Flink、Storm、Spark Streaming,数据存储 Kafka、Redis,离线计算 Hive、Spark 等。目前专一于 Flink/Kafka/Redis 在微博机器学习场景的利用,为机器学习提供框架,技术,利用层面的反对。