共计 4317 个字符,预计需要花费 11 分钟才能阅读完成。
2018 刚过去,趁着春节放假对过去一年主导开发的项目做个梳理和总结
项目背景
平台运营到一定阶段,一定会累积大批量的用户数据,这些用户数据是运营人员的黄金财产。而如何利用用户的数据来做运营(消息推送、触达消息、优惠券发送、广告位等),正是精准运营系统需要解决的问题。本文是基于信贷业务实践后写出来的,其它行业如保险、电商、航旅、游戏等也可以参考。
业务场景
先看几个具有代表性的需求
用户可用额度在 20000~50000 元,而且有借款记录,未还本金为 0,性别为“男”用户发生了 A 行为且未还本金大于 5000 用户在 1 天内发生 A 行为次数大于等于 3 次用户在 A 行为前 24 小时内未发生 B 行为用户在 A 行为后一个月内未发生 B 行为
业务上有两种消息类型
日常消息:由业务人员通过条件筛选锁定用户群,定时或即时给批量用户发送消息或者优惠券
触达消息:主要由用户自身的行为触发,比如登陆、进件申请、还款等,满足一定筛选条件实时给用户发送消息或优惠券
对于用户筛选条件,也主要有两种类型
用户状态:包括用户自身属性如性别、年龄、学历、收入等,还有用户相关联实体如进件订单、账户信息、还款计划、优惠券等的属性,以及用户画像数据如行为偏好、进件概率等
用户行为:即用户的动作,包括登陆、进件申请、还款,甚至前端点击某个按钮、在某个文本框输入都算
早期方案
早期方案存在以下痛点
至少两次跨部门沟通配合成本,周期被拉长
非实时消息推送,无法实现基于用户行为的实时推送场景
非实时效果验证,无法及时调整运营策略
系统搭建的目标
需要定义规则,提供可视化界面给业务人员动态配置,无需重启系统即使生效,减少沟通成本和避免重复开发,总之就是要更加 自动化 和 易配置
采集实时数据,根据实时事件做实时推送,总之就是要 实时
技术选型
数据采集、转换、存储
采集:状态类的数据主要放在各个业务系统的关系型数据库中,由于历史原因有 postgres 和 mysql,需要实时采集表的数据变更,这里使用 kafka connector 读取 mysql 的 binlog 或 postgres 的 xlog,另外还有标签系统计算出来的标签,在 kafka 中;而事件类数据主要来源于前端上报事件(有专门的服务接收再丢到 kafka),关系型数据库里面也可以提取一些事件。
转换:采集出来的数据需要做一些格式统一等操作,用 kafka connector。
存储:采用 Elasticsearch 存储用户数据,ES 查询不像 mysql 或 mongoDB 用 B -tree 或 B +tree 实现索引,而是使用 bitset 和 skip list 来处理联合索引,特别适合多字段的复杂查询条件。
下面重点看下 kafka connector 和 Elasticsearch 如何使用
kafka connector
kafka connector 有 Source 和 Sink 两种组件,Source 的作用是读取数据到 kafka,这里用开源实现 debezium 来采集 mysql 的 binlog 和 postgres 的 xlog。Sink 的作用是从 kafka 读数据写到目标系统,这里自己研发一套组件,根据配置的规则将数据格式化再同步到 ES。kafka connector 有以下优点:
提供大量开箱即用的插件,比如我们直接用 debezium 就能解决读取 mysql 和 pg 数据变更的问题
伸缩性强,对于不同的 connector 可以配置不同数量的 task,分配给不同的 worker,,我们可以根据不同 topic 的流量大小来调节配置。
容错性强,worker 失败会把 task 迁移到其它 worker 上面
使用 rest 接口进行配置,我们可以对其进行包装很方便地实现一套管理界面
Elasticsearch
对于状态数据,由于状态的写操作相对较少,我们采取嵌套文档的方式,将同个用户的相关实体数据都同步写入到同个文档,具体实现用 painless 脚本做局部更新操作。效果类似这样:
{
“id”:123,
“age”:30,
“credit_line”:20000,
“education”:”bachelor”,
…
“last_loan_applications”:{
“loan_id”:1234,
“status”:”reject”,
…
}
…
}
事件数据写入比较频繁,数据量比较多,我们使用父子文档的方式做关联,效果类似这样:
{
“e_uid”:123,
“e_name”:”loan_application”,
“e_timestamp”:”2019-01-01 10:10:00″
…
}
(e_前缀是为了防止同个 index 下同名字段冲突)ES 这样存储一方面是方便做统计报表,另一方面跟用户筛选和触达有关。
规则引擎
在设计规则引擎前,我们对业界已有的规则引擎,主要包括 Esper, Drools, Flink CEP,进行了初步调研。
Esper
Esper 设计目标为 CEP 的轻量级解决方案,可以方便的嵌入服务中,提供 CEP 功能。优势:
轻量级可嵌入开发,常用的 CEP 功能简单好用。
EPL 语法与 SQL 类似,学习成本较低。
劣势:
单机全内存方案,需要整合其他分布式和存储。
以内存实现时间窗功能,无法支持较长跨度的时间窗。
无法有效支持定时触达(如用户在浏览发生一段时间后触达条件判断)。
Drools
Drools 开始于规则引擎,后引入 Drools Fusion 模块提供 CEP 的功能。优势:
功能较为完善,具有如系统监控、操作平台等功能。
规则支持动态更新
劣势:
以内存实现时间窗功能,无法支持较长跨度的时间窗。
无法有效支持定时触达(如用户在浏览发生一段时间后触达条件判断)。
Flink
Flink 是一个流式系统,具有高吞吐低延迟的特点,Flink CEP 是一套极具通用性、易于使用的实时流式事件处理方案。优势:
继承了 Flink 高吞吐的特点
事件支持存储到外部,可以支持较长跨度的时间窗。
可以支持定时触达(用 followedBy+PartternTimeoutFunction 实现)
劣势:
无法动态更新规则(痛点)
自定义规则
综上对比了几大开源规则引擎,发现都无法满足业务需求:
业务方要求支持长时间窗口(n 天甚至 n 个月,比如放款一个月后如果没产生还款事件就要发消息)
动态更新规则,而且要可视化(无论用哪个规则引擎都需要包装,需要考虑二次开发成本)
最终我们选择自己根据业务需要,开发基于 json 的自定义规则,规则类似下面例子:
{
“batchId”: “xxxxxxxx”, // 流水号,创建每条运营规则时生成
“type”: “trigger”, //usual
“triggerEvent”: “login”,
“after”: “2h”, // 分钟 m, 小时 h, 天 d, 月 M
“pushRules”: [// 支持同时推送多条不同类型的消息
{
“pushType”: “sms”, //wx,app,coupon
“channel”: “cl”,
“content”: “hello #{userInfo.name}”
},
{
“pushType”: “coupon”,
“couponId”: 1234
}
],
“statusConditions”: [
{
“name”: “and”, // 逻辑条件,支持与 (and) 或(or)非(not)
“conditions”: [
{
“name”: “range”,
“field”: “credit_line”,
“left”: 2000,
“right”: 10000,
“includeLeft”: true,
“includeRight”: false
},
{
“name”:”in”,
“filed”:”education”,
“values”:[“bachelor”,”master”]
}
]
}
],
“eventConditions”: [
{
“name”: “or”,// 逻辑条件,支持与 (and) 或(or)非(not)
“conditions”: [
{
“name”: “event”,
“function”: “count”, // 聚合函数, 目前只支持 count
“eventName”: “xxx_button_click”,
“range”: {// 聚合结果做判断
“left”: 1,
“includeLeft”: true
},
“timeWindow”: {
“type”: “fixed”, //fixed 为固定窗口,sliding 为滑动窗口
“start”: “2019-01-01 01:01:01”,
“end”: “2019-02-01 01:01:01”
},
“conditions”: [//event 查询条件继承 and 逻辑条件,所以事件也可以过滤字段
{
“name”: “equals”,
“field”: “f1”,
“value”: “v1”
}
]
}
]
}
]
}
使用面向对象思维对过滤条件做抽象后,过滤条件继承关系如下:
然后代码里加一层 parser 把 Condition 都转成 ES 查询语句,实现轻量级的业务规则配置功能。
整体技术方案
系统组成模块及功能如下:mysql binlog:mysql 的数据变更,由 kafka connector 插件读取到 kafka,数据源之一 postgres xlog:pg 的数据变更,由 kafka connector 插件读取到 kafka,数据源之一 report server:事件上报服务,数据源之一 tags:用户画像系统计算出来的标签,数据源之一触发场景路由:分实时触发和延迟触发,实时触发直接到下一步,延迟触发基于 rabbitmq 的延迟队列实现用户筛选模块:将筛选规则翻译为 ES 查询语句到 ES 查询用户数据,可以是批量的和单个用户的变量渲染模块:对推送内容做处理推送适配器:兼容不同的推送方式定时任务调度器:基于 elastic-job,处理定时推送任务规则配置控制台:提供可视化配置界面(运营规则配置、数据采集规则配置、字段元数据配置等)报表服务:提供报表查询功能运营位服务:提供外部接口,根据条件匹配运营位(如启动图、首页 banner 图片等)
总结与展望
系统基本满足了目前的业务需求,对转化率等运营指标提升显著
可以扩展其它业务,如推荐、风控、业务监控等
规则定时拉取,实时性差,可以用 zk 做发布订阅实现即时更新
目前事件的聚合函数只支持 count,能满足业务需求但是未来可能还需要支持其它函数
系统只经过千万级用户的生产验证,再高数量级的话可能还有很多性能优化的工作, 如 ES 并行查询(目前用 scroll api 批量拉取用户数据是串行的)
事件类数据越来越多,目前采取定时删除半年前数据的方式,防止持续增长过快不可控,所以事件类条件不可超过半年的时间窗口
虽然系统对业务无入侵,但是反过来看本系统依赖于上游数据,上游数据发生变化时如何做到影响最小?
未来会继续从技术及业务两方面入手,将系统建设的更加易用、高效。