简介:在理论业务应用中,须要常常实时做一些数据分析,包含实时PV和UV展现,实时销售数据,实时店铺UV以及实时举荐零碎等,基于此类需要,Confluent+实时计算Flink版是一个高效的计划。

业务背景

在理论业务应用中,须要常常实时做一些数据分析,包含实时PV和UV展现,实时销售数据,实时店铺UV以及实时举荐零碎等,基于此类需要,Confluent+实时计算Flink版是一个高效的计划。

Confluent是基于Apache Kafka提供的企业级全托管流数据服务,由 Apache Kafka 的原始创建者构建,通过企业级性能扩大了 Kafka 的劣势,同时打消了 Kafka治理或监控的累赘。

实时计算Flink版是阿里云基于 Apache Flink 构建的企业级实时大数据计算商业产品。实时计算 Flink 由 Apache Flink 开创团队官网出品,领有寰球对立商业化品牌,提供全系列产品矩阵,齐全兼容开源 Flink API,并充沛基于弱小的阿里云平台提供云原生的 Flink 商业增值能力。

一、筹备工作-创立Confluent集群和实时计算Flink版集群

登录Confluent治理控制台,创立Confluent集群,创立步骤参考 Confluent集群开明

登录实时计算Flink版治理控制台,创立vvp集群。请留神,创立vvp集群抉择的vpc跟confluent集群的region和vpc应用同一个,这样能够在vvp外部拜访confluent的外部域名。

二、最佳实际-实时统计玩家充值金额-Confluent+实时计算Flink+Hologres

2.1 新建Confluent音讯队列

在confluent集群列表页,登录control center

在左侧选中Topics,点击Add a topic按钮,创立一个名为confluent-vvp-test的topic,将partition设置为3


2.2 配置后果表 Hologres

进入Hologres控制台,点击Hologres实例,在DB治理中新增数据库mydb

登录Hologres数据库,新建SQL

Hologres中创立后果表 SQL语句

--用户累计生产后果表 CREATE TABLE consume (    appkey VARCHAR,    serverid VARCHAR,    servertime VARCHAR,    roleid VARCHAR,    amount FLOAT,    dt VARCHAR,    primary key(appkey,dt)  );

2.3 创立实时计算vvp作业

首先登录vvp控制台,抉择集群所在region,点击控制台,进入开发界面

点击作业开发Tab,点击新建文件,文件名称:confluent-vvp-hologres,文件类型抉择:流作业/SQL

在输入框写入以下代码:

create TEMPORARY table kafka_game_consume_source(    appkey STRING,  servertime STRING,  consumenum DOUBLE,  roleid STRING,  serverid STRING    ) with (   'connector' = 'kafka',   'topic' = 'game_consume_log',   'properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.xxx:9071[xxx能够找开发同学查看]',   'properties.group.id' = 'gamegroup',   'format' = 'json',   'properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks',   'properties.ssl.truststore.password' = '[your truststore password]',   'properties.security.protocol'='SASL_SSL',   'properties.sasl.mechanism'='PLAIN',   'properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx[集群的用户]" password="xxx[相应的明码]";');-- 创立累计生产hologres sink表CREATE TEMPORARY TABLE consume( appkey STRING,   serverid STRING,  servertime STRING,  roleid STRING,  amount DOUBLE,  dt STRING,  PRIMARY KEY (appkey,dt) NOT ENFORCED  )WITH (  'connector' = 'hologres',  'dbname' = 'mydb',  'endpoint' = 'hgprecn-cn-tl32gkaet006-cn-beijing-vpc.hologres.aliyuncs.com:80',  'password' = '[your appkey secret]',  'tablename' = 'consume',  'username' = '[your app key]',  'mutateType' = 'insertorreplace'  );--{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"}--{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"}--{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"}--{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"}--{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}  -- 计算每个用户累积生产金额  insert into consume  SELECT   appkey,LAST_VALUE(serverid) as serverid,LAST_VALUE(servertime) as servertime,LAST_VALUE(roleid) as roleid,   sum(consumenum) as amount,  substring(servertime,1,10) as dt  FROM kafka_game_consume_source  GROUP BY appkey,substring(servertime,1,10)  having sum(consumenum) > 0;

在高级配置里,减少依赖文件truststore.jks(拜访外部域名得增加这个文件,拜访公网域名能够不必),拜访依赖文件的固定门路前缀都是/flink/usrlib/(这里就是/flink/usrlib/truststore.jks)

点击上线按钮,实现上线

在运维作用列表里找到刚上线的作用,点击启动按钮,期待状态更新为running,运行胜利。

在control center的【Topics->Messages】页面,逐条发送测试音讯,格局为:

{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"}{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"}{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"}{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"}{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}

2.4 查看用户充值金额实时统计成果

三、最佳实际-电商实时PV和UV统计-Confluent+实时计算Flink+RDS

3.1 新建Confluent音讯队列

在confluent集群列表页,登录control center

在左侧选中Topics,点击Add a topic按钮,创立一个名为pv-uv的topic,将partition设置为3


3.2 创立云数据库RDS后果表

登录 RDS 治理控制台页面,购买RDS。确保RDS与Flink全托管集群在雷同region,雷同VPC下

增加虚构交换机网段(vswitch IP段)进入RDS白名单,详情参考:设置白名单文档


【vswitch IP段】可在 flink的工作空间详情中查问

在【账号治理】页面创立账号【高权限账号】

数据库实例下【数据库治理】新建数据库【conflufent_vvp】

应用零碎自带的DMS服务登陆RDS,登录名和明码输出下面创立的高权限账户

双击【confluent_vvp】数据库,关上SQLConsole,将以下建表语句复制粘贴到 SQLConsole中,创立后果表

CREATE TABLE result_cps_total_summary_pvuv_min(  summary_date date NOT NULL COMMENT '统计日期',  summary_min varchar(255) COMMENT '统计分钟',  pv bigint COMMENT 'pv',  uv bigint COMMENT 'uv',  currenttime timestamp COMMENT '以后工夫',  primary key(summary_date,summary_min))

3.3 创立实时计算VVP作业

1.【[VVP控制台】新建文件

在SQL区域输出以下代码:

--数据的订单源表CREATE TABLE source_ods_fact_log_track_action (  account_id VARCHAR,  --用户ID  client_ip VARCHAR,  --客户端IP  client_info VARCHAR,  --设施机型信息  platform VARCHAR,  --零碎版本信息  imei VARCHAR,  --设施惟一标识  `version` VARCHAR,  --版本号  `action` VARCHAR,  --页面跳转形容  gpm VARCHAR,  --埋点链路  c_time VARCHAR,  --申请工夫  target_type VARCHAR,  --指标类型  target_id VARCHAR,  --指标ID  udata VARCHAR,  --扩大信息,JSON格局  session_id VARCHAR,  --会话ID  product_id_chain VARCHAR,  --商品ID串  cart_product_id_chain VARCHAR,  --加购商品ID  tag VARCHAR,  --非凡标记  `position` VARCHAR,  --地位信息  network VARCHAR,  --网络应用状况  p_dt VARCHAR,  --工夫分区天  p_platform VARCHAR --零碎版本信息) WITH (   'connector' = 'kafka',   'topic' = 'game_consume_log',   'properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.c79f69095bc5d4d98b01136fe43e31b93:9071',   'properties.group.id' = 'gamegroup',   'format' = 'json',   'properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks',   'properties.ssl.truststore.password' = '【your password】',   'properties.security.protocol'='SASL_SSL',   'properties.sasl.mechanism'='PLAIN',   'properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="【your user name】" password="【your password】";');--{"account_id":"id1","client_ip":"172.11.1.1","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:10:00"}CREATE TABLE result_cps_total_summary_pvuv_min (  summary_date date,  --统计日期  summary_min varchar,  --统计分钟  pv bigint,  --点击量  uv bigint,  --一天内同个访客屡次拜访仅计算一个UV  currenttime timestamp,  --以后工夫  primary key (summary_date, summary_min)) WITH (  type = 'rds',  url = 'url = 'jdbc:mysql://rm-【your rds clusterId】.mysql.rds.aliyuncs.com:3306/confluent_vvp',',  tableName = 'result_cps_total_summary_pvuv_min',  userName = 'flink_confluent_vip',  password = '【your rds password】');CREATE VIEW result_cps_total_summary_pvuv_min_01 ASselect  cast (p_dt as date) as summary_date --工夫分区  , count (client_ip) as pv --客户端的IP  , count (distinct client_ip) as uv --客户端去重  , cast (max (c_time) as TIMESTAMP) as c_time --申请的工夫from  source_ods_fact_log_track_actiongroup  by p_dt;INSERT  into result_cps_total_summary_pvuv_minselect  a.summary_date,  --工夫分区  cast (DATE_FORMAT (c_time, 'HH:mm') as varchar) as summary_min,  --取出小时分钟级别的工夫  a.pv,  a.uv,  CURRENT_TIMESTAMP as currenttime --以后工夫from  result_cps_total_summary_pvuv_min_01 AS a;

点击【上线】之后,在作业运维页面点击启动按钮,直到状态更新为RUNNING状态。

在control center的【Topics->Messages】页面,逐条发送测试音讯,格局为:

{"account_id":"id1","client_ip":"72.11.1.111","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:11:00"}{"account_id":"id2","client_ip":"72.11.1.112","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:12:00"}{"account_id":"id3","client_ip":"72.11.1.113","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:13:00"}

3.4 查看PV和UV成果

能够看出rds数据表的pv和uv会随着发送的音讯数据,动静的变动,同时还能够通过【数据可视化】来查看相应的图表信息。

pv图表展现:

uv图表展现:

原文链接
本文为阿里云原创内容,未经容许不得转载。