关于Flink:伴鱼借助-Flink-完成机器学习特征系统的升级

9次阅读

共计 5018 个字符,预计需要花费 13 分钟才能阅读完成。

本文作者陈易生,介绍了伴鱼平台机器学习特色零碎的降级,在架构上,从 Spark 转为 Flink,解决了特色上线难的问题,以及 SQL + Python UDF 如何用于生产实践。次要内容为:

  1. 前言
  2. 老版特色零碎 V1
  3. 新版特色零碎 V2
  4. 总结

一、前言

在伴鱼,咱们在多个在线场景应用机器学习进步用户的应用体验,例如:在伴鱼绘本中,咱们依据用户的帖子浏览记录,为用户举荐他们感兴趣的帖子;在转化后盾里,咱们依据用户的绘本购买记录,为用户举荐他们可能感兴趣的课程等。

特色是机器学习模型的输出。如何高效地将特色从数据源加工进去,让它可能被在线服务高效地拜访,决定了咱们是否在生产环境牢靠地应用机器学习。为此,咱们搭建了特色零碎,系统性地解决这一问题。目前,伴鱼的机器学习特色零碎运行了靠近 100 个特色,反对了多个业务线的模型对在线获取特色的需要。

上面,咱们将介绍特色零碎在伴鱼的演进过程,以及其中的衡量考量。

二、旧版特色零碎 V1

特色零碎 V1 由三个外围组件形成:特色管道,特色仓库,和特色服务。整体架构如下图所示:

特色管道包含 流特色管道 批特色管道,它们别离生产流数据源和批数据源,对数据通过预处理加工成特色 (这一步称为特色工程),并将特色写入特色仓库。

  • 批特色管道应用 Spark 实现,由 DolphinScheduler 进行调度,跑在 YARN 集群上;
  • 出于技术栈的统一思考,流特色管道应用 Spark Structured Streaming 实现,和批特色管道一样跑在 YARN 集群上。

特色仓库选用适合的存储组件 (Redis) 和数据结构 (Hashes),为模型服务提供低提早的特色拜访能力。之所以选用 Redis 作为存储,是因为:

  • 伴鱼有丰盛的 Redis 应用教训;
  • 包含 DoorDash Feature Store [1] 和 Feast [2] 在内的业界特色仓库解决方案都应用了 Redis。

特色服务屏蔽特色仓库的存储和数据结构,对外裸露 RPC 接口 GetFeatures(EntityName, FeatureNames),提供对特色的低提早点查问。在实现上,这一接口根本对应于 Redis 的 HMGET EntityName FeatureName_1 ... FeatureName_N 操作。

这一版本的特色零碎存在几个问题:

  • 算法工程师短少管制,导致迭代效率低。这个问题与零碎波及的技术栈和公司的组织架构无关。在整个零碎中,特色管道的迭代需要最高,一旦模型对特色有新的需要,就须要批改或者编写一个新的 Spark 工作。而 Spark 工作的编写须要有肯定的 Java 或 Scala 常识,不属于算法工程师的常见技能,因而交由大数据团队全权负责。大数据团队同时负责多项数据需要,往往有很多排期工作。后果便是新特色的上线波及频繁的跨部门沟通,迭代效率低;
  • 特色管道只实现了轻量的特色工程,升高在线推理的效率。因为特色管道由大数据工程师而非算法工程师编写,简单的数据预处理波及更高的沟通老本,因而这些特色的预处理水平都比拟轻量,更多的预处理被留到模型服务甚至模型外部进行,增大了模型推理的时延。

为了解决这几个问题,特色零碎 V2 提出几个设计目标:

  • 将控制权交还算法工程师,进步迭代效率;
  • 将更高权重的特色工程交给特色管道,进步在线推理的效率。

三、新版特色零碎 V2

特色零碎 V2 相比特色零碎 V1 在架构上的惟一不同点在于,它将特色管道切分为三局部:特色生成管道,特色源,和特色注入管道。值得一提的是,管道在实现上均从 Spark 转为 Flink,和公司数据基础架构的倒退保持一致。特色零碎 V2 的整体架构如下图所示:

1. 特色生成管道

特色生成管道读取原始数据源,加工为特色,并将特色写入指定特色源 (而非特色仓库)。

  • 如果管道以流数据源作为原始数据源,则它是流特色生成管道;
  • 如果管道以批数据源作为原始数据源,则它是批特色生成管道。

特色生成管道的逻辑由算法工程师全权负责编写。其中,批特色生成管道应用 HiveQL 编写,由 DolphinScheduler 调度。流特色生成管道应用 PyFlink 实现,详情见下图:

算法工程师须要恪守上面步骤:

  1. 用 Flink SQL 申明 Flink 工作源 (source.sql) 和定义特色工程逻辑 (transform.sql);
  2. (可选) 用 Python 实现特色工程逻辑中可能蕴含的 UDF 实现 (udf_def.py);
  3. 应用自研的代码生成工具,生成可执行的 PyFlink 工作脚本 (run.py);
  4. 本地应用由平台筹备好的 Docker 环境调试 PyFlink 脚本,确保能在本地失常运行;
  5. 把代码提交到一个对立治理特色管道的代码仓库,由 AI 平台团队进行代码审核。审核通过的脚本会被部署到伴鱼实时计算平台,实现特色生成管道的上线。

这一套流程确保了:

  • 算法工程师把握上线特色的自主权;
  • 平台工程师把控特色生成管道的代码品质,并在必要时能够对它们实现重构,而无需算法工程师的染指。

2. 特色源

特色源存储从原始数据源加工造成的特色。值得强调的是,它同时还是连贯算法工程师和 AI 平台工程师的桥梁。算法工程师只负责实现特色工程的逻辑,将原始数据加工为特色,写入特色源,剩下的事件就交给 AI 平台。平台工程师实现特色注入管道,将特色写入特色仓库,以特色服务的模式对外提供数据拜访服务。

3. 特色注入管道

特色注入管道将特色从特色源读出,写入特色仓库。因为 Flink 社区短少对 Redis sink 的原生反对,咱们通过拓展 RichSinkFunction [3] 简略地实现了 StreamRedisSinkBatchRedisSink,很好地满足咱们的需要。

其中,BatchRedisSink 通过 Flink Operator State [4] 和 Redis Pipelining [5] 的简略联合,大量参考 Flink 文档中的 BufferingSink,实现了批量写入,大幅缩小对 Redis Server 的申请量,增大吞吐,写入效率相比逐条插入晋升了 7 倍 [6]。BatchRedisSink 的简要实现如下。其中,flush 实现了批量写入 Redis 的外围逻辑,checkpointedState / bufferedElements / snapshotState / initializeState 实现了应用 Flink 有状态算子治理元素缓存的逻辑。

class BatchRedisSink(pipelineBatchSize: Int) extends RichSinkFunction[(String, Timestamp, Map[String, String])]
    with CheckpointedFunction {

  @transient
  private var checkpointedState
      : ListState[(String, java.util.Map[String, String])] = _

  private val bufferedElements
      : ListBuffer[(String, java.util.Map[String, String])] =
    ListBuffer.empty[(String, java.util.Map[String, String])]

  private var jedisPool: JedisPool = _

  override def invoke(value: (String, Timestamp, Map[String, String]),
      context: SinkFunction.Context
  ): Unit = {
    import scala.collection.JavaConverters._

    val (key, _, featureKVs) = value
    bufferedElements += (key -> featureKVs.asJava)

    if (bufferedElements.size == pipelineBatchSize) {flush()
    }
  }

  private def flush(): Unit = {
    var jedis: Jedis = null
    try {
      jedis = jedisPool.getResource
      val pipeline = jedis.pipelined()
      for ((key, hash) <- bufferedElements) {pipeline.hmset(key, hash)
      }
      pipeline.sync()} catch {...} finally {...}
    bufferedElements.clear()}

  override def snapshotState(context: FunctionSnapshotContext): Unit = {checkpointedState.clear()
    for (element <- bufferedElements) {checkpointedState.add(element)
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor =
      new ListStateDescriptor[(String, java.util.Map[String, String])](
        "buffered-elements",
        TypeInformation.of(new TypeHint[(String, java.util.Map[String, String])]() {}
        )
      )

    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    import scala.collection.JavaConverters._

    if (context.isRestored) {for (element <- checkpointedState.get().asScala) {bufferedElements += element}
    }
  }

  override def open(parameters: Configuration): Unit = {
    try {jedisPool = new JedisPool(...)
    } catch {...}
  }

  override def close(): Unit = {flush()
    if (jedisPool != null) {jedisPool.close()
    }
  }
}

特色零碎 V2 很好地满足了咱们提出的设计目标。

  • 因为特色生成管道的编写只需用到 SQL 和 Python 这两种算法工程师非常相熟的工具,因而他们全权负责特色生成管道的编写和上线,无需依赖大数据团队,大幅提高了迭代效率。在相熟后,算法工程师通常只需破费半个小时以内,就能够实现流特色的编写、调试和上线。而这个过程本来须要破费数天,取决于大数据团队的排期;
  • 出于同样的起因,算法工程师能够在有须要的前提下,实现更重度的特色工程,从而缩小模型服务和模型的累赘,进步模型在线推理效率。

四、总结

特色零碎 V1 解决了特色上线的问题,而特色零碎 V2 在此基础上,解决了特色上线难的问题。在特色零碎的演进过程中,咱们总结出作为平台研发的几点教训:

  • 平台应该提供用户想用的工具。这与 Uber ML 平台团队在外部推广的教训 [7] 相符。算法工程师在 Python 和 SQL 环境下工作效率最高,而不相熟 Java 和 Scala。那么,想让算法工程师自主编写特色管道,平台应该反对算法工程师应用 Python 和 SQL 编写特色管道,而不是让算法工程师去学 Java 和 Scala,或是把工作转手给大数据团队去做;
  • 平台应该提供易用的本地调试工具。咱们提供的 Docker 环境封装了 Kafka 和 Flink,让用户能够在本地疾速调试 PyFlink 脚本,而无需期待管道部署到测试环境后再调试;
  • 平台应该在激励用户自主应用的同时,通过自动化查看或代码审核等形式牢牢把控品质。

Reference

[1] https://doordash.engineering/…

[2] https://docs.feast.dev/feast-…

[3] https://github.com/apache/fli…

[4] https://ci.apache.org/project…

[5] https://redis.io/topics/pipel…

[6] https://site-git-update-featu…

[7] https://eng.uber.com/scaling-…

正文完
 0