乐趣区

关于大数据:用户行为分析埋点实时数仓实践附用户关联源码

转自:https://blog.csdn.net/appearb…

一、概述

埋点采集、用户行为剖析、实时数仓、IdMapping

此文重点讲述埋点的 数据模型、数据格式、数据实时采集、加工、存储及用户关联。对于用户行为剖析的概念、意义以及埋点相干的货色此文不作赘述

二、数据模型

业界比拟风行的事件、用户模型;即:

  • who: 设施 ID、登录 ID

       * when: 事件产生工夫、上报工夫
       * where: 设施环境、网络环境、业务环境等
       * what: 事件标识、事件参数

咱们的数据存储也只有 events 和 users 两张表

events:不会变的日志表且数据量大;咱们用 ClickHouse 的分布式表存储

users:咱们只有几百万用户,且做用户关联时会频繁依据用户 id 查问、更新,而且做数据分析时要和事件表关联;咱们用 ClickHouse 的 mysql Engine 存储

events 建表语句:



1.  -- 事件 local 表; 按日期周分区
    
2.  CREATE TABLE analytics.events_replica ON CLUSTER ck_cluster(
    
3.      `track_id` String COMMENT '埋点',
    
4.      `event_id` Int64 COMMENT '事件 id',
    
5.      `distinct_id` String COMMENT '设施 id/ 用户核心 id',
    
6.      `user_id` Int64 COMMENT '用户表 id',
    
7.      `type` String COMMENT '埋点类型',
    
8.      `event` String COMMENT '埋点事件',
    
9.      `date` Date COMMENT '埋点日期',
    
10.      `time` DateTime64 (3, 'Asia/Shanghai') COMMENT '埋点上传工夫',
    
11.      `receive_time` DateTime64 (3, 'Asia/Shanghai') COMMENT '埋点承受工夫',
    
12.      `day` Int64 COMMENT '埋点距 1970/01/01 的天数',
    
13.      `week_id` Int64 COMMENT '埋点距 1970/01/01 的周数',
    
14.      `month_id` Int64 COMMENT '埋点距 1970/01/01 的月数'
    
15.      其余业务公共字段
    
16.      所有事件属性
    

18.  ) ENGINE = ReplicatedMergeTree ('/clickhouse/tables/analytics/events_replica/{shard}', '{replica}' )
    
19.  PARTITION BY toMonday (date) 
    
20.  ORDER BY
    
21.      (track_id) SETTINGS index_granularity = 8192
    

23.  -- 事件分布式表
    
24.  CREATE TABLE analytics.events ON CLUSTER ck_cluster
    
25.  AS analytics.events_replica ENGINE =Distributed('ck_cluster', 'analytics', 'events_replica', rand())
    

 

users 建表语句:



1.  -- ClickHouse Mysql Engine 表
    
2.  CREATE TABLE cON CLUSTER ck_cluster
    
3.  (
    
4.      `id` Int64 comment '零碎用户 id',
    
5.      `first_id` String comment '第一次关联的设施 id',
    
6.      `second_id` String comment '用户核心 id',
    
7.      `$device_id_list` String comment '非第一次关联的设施 id 汇合; 逗号分隔'
    
8.  )
    
9.  ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password');
    

11.  -- mysql 表
    
12.  CREATE TABLE `users` (13.    `id` bigint(32) DEFAULT NULL,
    
14.    `first_id` varchar(100) DEFAULT NULL,
    
15.    `second_id` varchar(100) DEFAULT NULL,
    
16.    `$device_id_list` varchar(500) DEFAULT NULL
    
17.  ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
    

三、数据格式

1. 事件埋点
埋点机会: 行为事件记录
type = track
用户登录前: is_login_id=false, distinct_id= 设施 id
用户登录后: is_login_id=true, distinct_id= 用户 id
只能在 properties 里增加属性

{
  “distinct_id”: “ 登录前(设施 id)、登录后(用户 id)”,
  “time”: “ 以后工夫戳 ”,
  “type”: “track”,
  “event”: “ 事件名 ”,
  “properties”: {
    “$is_login_id”: true, 
    “$ 内置属性名 ”: “ 内置属性值 ”,
    “$ 自定义属性名 ”: “ 自定义属性值 ”
  }
}

例子:
{
  “distinct_id”: “123456”,
  “time”: 1434556935000,
  “type”: “track”,
  “event”: “ViewProduct”,
  “properties”: {
    “$is_login_id”: true,
    “$app_version”: “1.3”,
    “$wifi”: true,
    “$ip”: “180.79.35.65”,
    “$province”: “ 湖南 ”,
    “$city”: “ 长沙 ”,
    “$user_agent”: “Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_2 like Mac OS X) AppleWebKit/602.1.50 (KHTML, like Gecko) CriOS/58.0.3029.113 Mobile/14F89 Safari/602.1”,
    “$screen_width”: 320,
    “$screen_height”: 568,
    “product_id”: 12345,
    “product_name”: “ 苹果 ”,
    “product_classify”: “ 水果 ”,
    “product_price”: 14.0
  }
}

2. 用户关联
埋点机会: 用户登录后
type=track_signup; event=$SignUp; distinct_id= 用户 ID; original_id= 设施 id

{
    “distinct_id”:” 用户 Id”,
    “original_id”:” 设施 id”,
    “time”: “ 以后工夫戳 ”,
    “type”: “track_signup”,
    “event”: “$SignUp”,
    “properties”: {
        “$ 内置属性名 ”:” 内置属性值 ”
    }
}

例子:
{
    “distinct_id”:”12345″,
    “original_id”:”2b0a6f51a3cd6775″,
    “time”: 1434557935000,
    “type”: “track_signup”,
    “event”: “$SignUp”,
    “properties”: {
        “$manufacturer”:”Apple”,
        “$model”: “iPhone5,2”,
        “$os”:”iOS”,
        “$os_version”:”7.0″,
        “$app_version”:”1.3″,
        “$wifi”:true,
        “$ip”:”180.79.35.65″,
        “$province”:” 湖南 ”,
        “$city”:” 长沙 ”,
        “$screen_width”:320,
        “$screen_height”:568
    }
}
 

四、架构图

  • 前后端埋点:分为全埋点和自定义事件埋点;按数据条数和工夫距离批量发送
  • 埋点收集器:一个 API 接口,通过 nginx 作负载平衡,接管到埋点后异步写入 kafka;业界通用的做法是用 nginx 承受埋点后间接落盘,而后再通过 flume、logstash 等日志采集工具采集到 kafka。
  • kafka 原始数据:通过 flume 采集一份到离线数仓
  • Flink ETL:外围数据处理逻辑

1. 动静增加 ClickHouse 列

2. 用户关联

3. 数据校验、解析、荡涤

  • 批量写入:按数据条数和工夫距离批量写入 ClickHouse

五、动静增加 ClickHouse 列

自定义埋点的事件属性会随着业务减少,事件属性会作为 events 表的列造成一张宽表,所以采集到事件后,会依据事件的属性实时动静增加 events 表的字段

events 表的列会初始化一份到 redis 的 set 里,在 Flink ETL 里,和埋点属性的汇合取差集,并更新 redis

须要留神的时:增加列时须要同时增加 events 的 local 表和 distributed 表

六、用户关联(IdMapping)

参考某策的用户关联:标识用户

大略逻辑:

1. 依据埋点事件、用户关联事件的设施 ID 或登录 ID 去用户表里找到对应的用户 ID 作为事件表的用户 ID

2. 定时调度刷新设施多对一的状况

流程图如下(源码见文末):

七、批量写入

因为 jdbc 的 batchInsert 须要 sql 一样,咱们的实时采集事件却有所差异,导致 sql 不一样;这里咱们能够依据 sql 分组,按一分钟或 1000 条批量写入即可

八、完结(附用户关联源码)

我基于 mysql 实现了用户关联的逻辑;能够做到设施多对一,关联登录前后的用户

用户关联源码:https://github.com/ostarsier/idmapping

退出移动版