关于flink:Flink-流式处理与推荐系统的样本实时生成

24次阅读

共计 5370 个字符,预计需要花费 14 分钟才能阅读完成。

继搜索引擎之后,举荐零碎曾经成为挪动互联网时代人们获取信息的次要渠道,比方,风行的新闻 App 都会利用举荐零碎进行用户的个性化举荐。新闻举荐场景具备高度的实时性,每时每刻都会有大量的新闻、热点产生。王喆前辈的两篇博客曾经对举荐零碎的实时性问题进行了深刻的探讨:

  • 天下文治,唯快不破,论举荐零碎的「实时性」
  • 如何加强举荐零碎模型更新的「实时性」?

增量更新、在线学习、部分更新甚至强化学习等训练策略能够令举荐零碎迅速的对用户的新行为做出反馈,而这些更新策略的前提是样本自身具备足够的实时性。新闻场景下,典型的训练样本是用户的点击等行为数据。用户行为数据在客户端收集之后,须要尽可能迅速的发回后盾生成训练样本,再尽快对模型进行更新。笔者于 2020 年夏在某千万用户级别新闻 App 举荐组实习,负责了样本实时化我的项目的次要开发工作。在这里对该我的项目进行技术层面的总结。

样本的产生与实时性

首先讨论一下典型的新闻利用举荐零碎的 Data Pipline 与模型训练样本的产生。客户端将一组新闻曝光给用户之后,曝光数据流 PV (Page View) 会被立刻发送回后端 Online Service. 用户会点击感兴趣的新闻,产生用户行为数据流也即 Action 数据流,这部分数据也被发回后端。之后后端会将这些用户行为音讯写入音讯队列 Message Queue, 最终会将其落盘到分布式文件系统,如 HDFS 上。最简化的举荐零碎样本生成逻辑就是 PV Join Action. 用户只会对局部曝光新闻样本产生行为,这部分样本即为正样本,残余的没有产生行为的曝光样本即为负样本。生成正负样本之后即可进行模型的训练。

实时性要求不高的举荐零碎能够应用批处理技术(典型的工具是 Apache Spark)生成样本,如图 1 左图所示. 设置一个定时工作,每隔一段时间,比方一小时,从 HDFS 读取工夫窗内的用户行为日志、曝光日志进行 JOIN 操作,生成训练样本,再将训练样本写回 HDFS, 之后启动模型的训练更新。

图 1: 典型的新闻举荐零碎 Data pipeline

批处理一个显著的问题是延时。定时运行批处理工作典型的周期是一个小时,这就意味着从样本产生到模型训练至多有一个小时的延时,有时批处理平台负载过大,工作须要排队,则延时会更大。另一个问题是边界问题,上一段提到的,PV 数据如果产生在批处理工作选取的日志工夫窗结尾,则对应的 Action 数据可能会落到批处理工作的下一个工夫窗中,导致 Join 失败,产生假负样本。

为了加强实时性,咱们应用 Apache Flink 框架,将样本的生成逻辑以流解决技术重写。如图 1 右图所示。线上服务生成的用户曝光与行为日志写入音讯队列后,不再期待其落盘到 HDFS, 而是间接用 Flink 生产这些音讯流。Flink 同时从 Redis 缓存等地位读取必要的特色信息,间接生成样本音讯流。样本音讯流写回 Kafka 队列,上游 Tensorflow 能够间接生产该音讯流进行模型的训练。

曝光与行为流的 Join 逻辑

对于新闻举荐,用户行为人造存在工夫不同步的问题。一条新闻曝光给用户,PV 数据流产生之后,用户可能立刻点击,也有可能几分钟,十几分钟,甚至数小时之后产生行为。这就意味着 PV 数据流达到之后,须要期待一段时间再与 Action 数据流进行 Join. 期待时长过短,会导致一些本应有用户行为的样本(正样本)因为用户行为没有来得及回流而被谬误的标记为负样本。期待时长过长会侵害加大零碎的延时。


图 2: 样本回流工夫散布。纵轴是回流的样本比例

这一方面有一些比较复杂的解决方案,比方这篇文章的总结。离线剖析一下理论的 Action 数据流与 PV 数据流之间的延时散布,是一个十分典型的指数分布,如图 2 所示。数分钟之内绝大多数用户行为曾经回流,而数分钟对咱们目前是一个能够承受的延时,因而,一个简略的解决方案是设置一个大小比拟折中的工夫窗。Flink 自身即提供了 Window Join 来实现这种逻辑。但咱们最终敲定的理论逻辑实现要比上文形容远为简单,Windiow Join 无奈满足要求,因而咱们自行实现了一些底层的 CoProcessFunction 函数,综合应用 PV 数据流、Action 数据流和一些特色流,实现了最终训练样本的生成。

我的项目 Flink App 架构

落实到工程上,数据流示意图如图 3 所示。理论我的项目中有多个 Action Stream, 从 Kafka 生产之后合并。而后 Action Stream 与 PV Stream 进行 Join 操作。为了生成理论的训练样本,还须要一些新闻相干的特色,这些特色局部由 Flink App 拜访 Redis 数据库进行查问(查问操作应该加缓存),局部也是以 Kafka 音讯流的模式提供的,须要失去音讯流之后缓存在 Flink App 内存。最终实现 Join 逻辑,生成用于模型训练的 Sample Stream.

图 3: Joiner 数据流示意图

为了实现 Join 操作,首先须要以某种形式进行 Key 的对应,这是通过 KeyedStream 实现的。首先,Flink 通过 FlinkKafkaConsumer 等工具生产 Kafka 音讯流生成 Flink Stream, 而后通过 keyBy 办法将流依照某种 key 进行划分,具备雷同 keyBy 办法的流能够借助 CoProcessFunction 进行 coProcess 操作,简略的例子:

val pvStream = getDataStreamFromKafka...
val actionStream = getDataStreamFromKafka...
val sampleStream = pvStream.keyBy(x => (x.id, x.time))
    .connect(actionStream.keyBy(x => (x.id, x.time)))
    .process(new CoProcessJoin)
    .name("sampleStream") 

CoProcessFunction 是 Flink 提供的一种较低层的对于流操作的形象。比方,上面是一个简略的 CoProcessFunction 实现两个流的 Join 操作,继承 CoProcessFunction 类,override 两个函数,定义好在两个流中读取到元素时的动作即可:

// apache flink, scala, CoProcessFunction Sample
// doc: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html

class CoProcessJoin extends CoProcessFunction[PV, Action, Sample] {
  private val ttlConfig = 
    StateTtlConfig
    .newBuilder(Time.milliseconds(time))
    .build()
  private val descriptor =
    new ValueStateDescriptor[NewsInfo]("newsInfo", classOf[NewsInfo])
  descriptor.enableTimeToLive(ttlConfig)

  lazy val pvVal: ValueState[PV] = 
    getRuntimeContext.getState(descriptor)

  override def processElement1(
    value: PV,
    ctx: CoProcessFunction[PV, Action, Sample]#Context,
    out: Collector[Sample]): Unit = {if(value != null) {pvVal.update(value)
      registerTimerAfter(ctx.timerService(), waitingTime)
    }
  }

  override def processElement2(
    value: Action,
    ctx: CoProcessFunction[PV, Action, Sample]#Context,
    out: Collector[Sample]): Unit = {if(pvVal.value() != null) {out.collect(positiveSample(pvVal.value(), value));
    }
  }

  override def onTimer(
    timestamp: Long,
    ctx: CoProcessFunction[PV, Action, Sample]#OnTimerContext,
    out: Collector[Sample]): Unit = {if (pvVal.value() != null) {out.collect(negitiveSample(pvValue.value()))
    }
  }
} 

以上代码块的例子中,首先定义了一个 ValueState, 这是一种由 Flink 保护的变量,能够保留在 Flink App 内存(或者其余长久化计划)中。假如 PV 总是首先达到(理论利用中并非如此),由 Flink 将其缓存并期待对应的 Action 达到。Action 达到之后,从 ValueState 中读取 PV,并通过肯定的操作生成 Sample 即可。CoProcessFunction 除了能定义 ValueState 之外,还能定义定时器,从而在没有元素达到的工夫点也能触发一些动作。如,对于举荐零碎样本生成,期待肯定的工夫之后如果没有 Action, 则生成负样本。真正应用 ValueState 时,留神设置生存周期或者手动清空状态,防止状态始终不开释而占用内存。

我的项目难点

人们经常会将 Flink 与 Spark 进行比照。Spark 个别将数据抽象成没有时序的 DataSet,其解决逻辑人造要简略得多。Spark 和 Flink 都会将计算工作形象成计算图(有向无环图 DAG), 但作为批处理工作 Spark 实际上能够一一实现计算工作,不必要齐全并行整个 DAG, 并行度不好只会导致整个批处理工作耗时稍长,但不会导致异样。但对于 Flink 工作,整个 DAG 经常必须同时运行,计算资源必须妥善调配到 DAG 的每个节点上,否则就会导致音讯积压。因而,Flink App 的开发要比 Spark 简单得多,但这些复杂性是为了实现准实时处理所必然的。

本我的项目中次要难点能够分为以下几块。首先,须要了解 Flink 对于 DataStream 的形象,了解 CoProcessFunction 的形象,从而应用 CoProcessFunction 实现本人的业务逻辑。了解 Flink 的形象其实并不艰难,但对于理论业务而言,业务逻辑可能非常复杂,而 Flink 提供的 State, Timer 等工具实际上是比拟简陋的,比方 Timer 只是能在特定工夫触发调用 onTimer 办法,如果有多个 Timer, 则 onTimer 办法须要自行判断本身是被哪个 Timer 触发的。CoProcessFunction 总共只有 processElement1, processElement2, onTimer 三个办法,但业务逻辑可能会有非常复杂的状态转移,咱们在实现过程中自行保护了一个无限状态机。

另一方面,业务数据流量十分大,咱们须要将整个 Flink 工作的计算压力绝对平均的调配到所有的计算节点上,否则瓶颈节点会导致音讯积压。对此,Flink 提供了为每个操作 (Operator) 指定并行度(运算节点个数),并将 Operator 组织成组,每个组共享计算资源等办法。但计算节点的调配还有一些限度,比方生产 Kafka 的输出节点并行度最多只能设置成跟上游 Kafka 并行度统一。在这些限度之下,节点计算累赘的妥善调配并不容易,很容易呈现局部节点 CPU 比拟闲暇,局部节点压力过大音讯积压。更加可怜的是,咱们没有找到十分无效的办法剖析每个操作的计算代价,次要依附调试来发现瓶颈。

除了计算资源之外,因为咱们保护了大量的状态信息,集群的存储也面临压力。这一部分相对而言容易依据音讯流量和保留时长进行数量级的预计。应用 StateValue 或者其余类型的 State 时应该留神 Flink 是如何存储这些 State 的,如果存储在内存中,当然须要留神内存压力,Flink 也能应用数据库等形式进行长久化,可能须要留神 IO 压力。

我的项目反思

该我的项目原「打算」一个月,实际上拖了五个月才上线。在参加该我的项目过程中,笔者带有一些「实习生心态」,作为次要开发人员却不足责任意识,感觉布局性质的工作应该由 mentor, leader 等人实现,只是在 mentor 领导下进行机械的开发工作、解决技术问题。我的项目周期拖的工夫如此之长并不是开发效率问题或者技术艰难,起因总结如下:

  1. 咱们并没有在开始开发工作之前妥善的预估资源应用,导致开发过程中经常停下来期待资源申请。而公司外部资源,包含机器和数据权限,申请流程比拟长,
  2. 因为低估了我的项目规模和开发工夫,没有搭建本地测试环境、筹备测试数据等,导致呈现问题之后调试只能依附上线到测试环境上借助日志调试,过程十分苦楚耗时。
  3. 举荐零碎的样本实时化是一个驰名问题,因而没有在开始开发之前借助离线试验验证该项目标收益,这一点在与有教训的前辈交换之后才意识到问题的严重性。如果开发实现,却没有显著的收益,就会十分蹩脚。

正文完
 0