共计 7198 个字符,预计需要花费 18 分钟才能阅读完成。
简介:在理论业务应用中,须要常常实时做一些数据分析,包含实时 PV 和 UV 展现,实时销售数据,实时店铺 UV 以及实时举荐零碎等,基于此类需要,Confluent+ 实时计算 Flink 版是一个高效的计划。
业务背景
在理论业务应用中,须要常常实时做一些数据分析,包含实时 PV 和 UV 展现,实时销售数据,实时店铺 UV 以及实时举荐零碎等,基于此类需要,Confluent+ 实时计算 Flink 版是一个高效的计划。
Confluent 是基于 Apache Kafka 提供的企业级全托管流数据服务,由 Apache Kafka 的原始创建者构建,通过企业级性能扩大了 Kafka 的劣势,同时打消了 Kafka 治理或监控的累赘。
实时计算 Flink 版是阿里云基于 Apache Flink 构建的企业级实时大数据计算商业产品。实时计算 Flink 由 Apache Flink 开创团队官网出品,领有寰球对立商业化品牌,提供全系列产品矩阵,齐全兼容开源 Flink API,并充沛基于弱小的阿里云平台提供云原生的 Flink 商业增值能力。
一、筹备工作 - 创立 Confluent 集群和实时计算 Flink 版集群
登录 Confluent 治理控制台,创立 Confluent 集群,创立步骤参考 Confluent 集群开明
登录实时计算 Flink 版治理控制台,创立 vvp 集群。请留神,创立 vvp 集群抉择的 vpc 跟 confluent 集群的 region 和 vpc 应用同一个,这样能够在 vvp 外部拜访 confluent 的外部域名。
二、最佳实际 - 实时统计玩家充值金额 -Confluent+ 实时计算 Flink+Hologres
2.1 新建 Confluent 音讯队列
在 confluent 集群列表页,登录 control center
在左侧选中 Topics,点击 Add a topic 按钮,创立一个名为 confluent-vvp-test 的 topic,将 partition 设置为 3
2.2 配置后果表 Hologres
进入 Hologres 控制台,点击 Hologres 实例,在 DB 治理中新增数据库mydb
登录 Hologres 数据库,新建 SQL
Hologres 中创立后果表 SQL 语句
-- 用户累计生产后果表
CREATE TABLE consume (
appkey VARCHAR,
serverid VARCHAR,
servertime VARCHAR,
roleid VARCHAR,
amount FLOAT,
dt VARCHAR,
primary key(appkey,dt)
);
2.3 创立实时计算 vvp 作业
首先登录 vvp 控制台,抉择集群所在 region,点击控制台,进入开发界面
点击作业开发 Tab,点击新建文件,文件名称:confluent-vvp-hologres,文件类型抉择:流作业 /SQL
在输入框写入以下代码:
create TEMPORARY table kafka_game_consume_source(
appkey STRING,
servertime STRING,
consumenum DOUBLE,
roleid STRING,
serverid STRING
) with (
'connector' = 'kafka',
'topic' = 'game_consume_log',
'properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.xxx:9071[xxx 能够找开发同学查看]',
'properties.group.id' = 'gamegroup',
'format' = 'json',
'properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks',
'properties.ssl.truststore.password' = '[your truststore password]',
'properties.security.protocol'='SASL_SSL',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx[集群的用户]"password="xxx[相应的明码]";'
);
-- 创立累计生产 hologres sink 表
CREATE TEMPORARY TABLE consume(
appkey STRING,
serverid STRING,
servertime STRING,
roleid STRING,
amount DOUBLE,
dt STRING,
PRIMARY KEY (appkey,dt) NOT ENFORCED
)WITH (
'connector' = 'hologres',
'dbname' = 'mydb',
'endpoint' = 'hgprecn-cn-tl32gkaet006-cn-beijing-vpc.hologres.aliyuncs.com:80',
'password' = '[your appkey secret]',
'tablename' = 'consume',
'username' = '[your app key]',
'mutateType' = 'insertorreplace'
);
--{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"}
--{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"}
--{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"}
--{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"}
--{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}
-- 计算每个用户累积生产金额
insert into consume
SELECT
appkey,LAST_VALUE(serverid) as serverid,LAST_VALUE(servertime) as servertime,LAST_VALUE(roleid) as roleid,
sum(consumenum) as amount,
substring(servertime,1,10) as dt
FROM kafka_game_consume_source
GROUP BY appkey,substring(servertime,1,10)
having sum(consumenum) > 0;
在高级配置里,减少依赖文件 truststore.jks(拜访外部域名得增加这个文件,拜访公网域名能够不必),拜访依赖文件的固定门路前缀都是 /flink/usrlib/(这里就是 /flink/usrlib/truststore.jks)
点击上线按钮,实现上线
在运维作用列表里找到刚上线的作用,点击启动按钮,期待状态更新为 running,运行胜利。
在 control center 的【Topics->Messages】页面,逐条发送测试音讯,格局为:
{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"}
{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"}
{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"}
{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"}
{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}
2.4 查看用户充值金额实时统计成果
三、最佳实际 - 电商实时 PV 和 UV 统计 -Confluent+ 实时计算 Flink+RDS
3.1 新建 Confluent 音讯队列
在 confluent 集群列表页,登录 control center
在左侧选中 Topics,点击 Add a topic 按钮,创立一个名为 pv-uv 的 topic,将 partition 设置为 3
3.2 创立云数据库 RDS 后果表
登录 RDS 治理控制台页面,购买 RDS。确保 RDS 与 Flink 全托管集群在雷同 region,雷同 VPC 下
增加虚构交换机网段(vswitch IP 段)进入 RDS 白名单,详情参考:设置白名单文档
【vswitch IP 段】可在 flink 的工作空间详情中查问
在【账号治理】页面创立账号【高权限账号】
数据库实例下【数据库治理】新建数据库【conflufent_vvp】
应用零碎自带的 DMS 服务登陆 RDS,登录名和明码输出下面创立的高权限账户
双击【confluent_vvp】数据库,关上 SQLConsole,将以下建表语句复制粘贴到 SQLConsole 中,创立后果表
CREATE TABLE result_cps_total_summary_pvuv_min(
summary_date date NOT NULL COMMENT '统计日期',
summary_min varchar(255) COMMENT '统计分钟',
pv bigint COMMENT 'pv',
uv bigint COMMENT 'uv',
currenttime timestamp COMMENT '以后工夫',
primary key(summary_date,summary_min)
)
3.3 创立实时计算 VVP 作业
1.【[VVP 控制台】新建文件
在 SQL 区域输出以下代码:
-- 数据的订单源表
CREATE TABLE source_ods_fact_log_track_action (
account_id VARCHAR,
-- 用户 ID
client_ip VARCHAR,
-- 客户端 IP
client_info VARCHAR,
-- 设施机型信息
platform VARCHAR,
-- 零碎版本信息
imei VARCHAR,
-- 设施惟一标识
`version` VARCHAR,
-- 版本号
`action` VARCHAR,
-- 页面跳转形容
gpm VARCHAR,
-- 埋点链路
c_time VARCHAR,
-- 申请工夫
target_type VARCHAR,
-- 指标类型
target_id VARCHAR,
-- 指标 ID
udata VARCHAR,
-- 扩大信息,JSON 格局
session_id VARCHAR,
-- 会话 ID
product_id_chain VARCHAR,
-- 商品 ID 串
cart_product_id_chain VARCHAR,
-- 加购商品 ID
tag VARCHAR,
-- 非凡标记
`position` VARCHAR,
-- 地位信息
network VARCHAR,
-- 网络应用状况
p_dt VARCHAR,
-- 工夫分区天
p_platform VARCHAR -- 零碎版本信息
) WITH (
'connector' = 'kafka',
'topic' = 'game_consume_log',
'properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.c79f69095bc5d4d98b01136fe43e31b93:9071',
'properties.group.id' = 'gamegroup',
'format' = 'json',
'properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks',
'properties.ssl.truststore.password' = '【your password】',
'properties.security.protocol'='SASL_SSL',
'properties.sasl.mechanism'='PLAIN',
'properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="【your user name】"password="【your password】";'
);
--{"account_id":"id1","client_ip":"172.11.1.1","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:10:00"}
CREATE TABLE result_cps_total_summary_pvuv_min (
summary_date date,
-- 统计日期
summary_min varchar,
-- 统计分钟
pv bigint,
-- 点击量
uv bigint,
-- 一天内同个访客屡次拜访仅计算一个 UV
currenttime timestamp,
-- 以后工夫
primary key (summary_date, summary_min)
) WITH (
type = 'rds',
url = 'url ='jdbc:mysql://rm-【your rds clusterId】.mysql.rds.aliyuncs.com:3306/confluent_vvp',',
tableName = 'result_cps_total_summary_pvuv_min',
userName = 'flink_confluent_vip',
password = '【your rds password】'
);
CREATE VIEW result_cps_total_summary_pvuv_min_01 AS
select
cast (p_dt as date) as summary_date -- 工夫分区
, count (client_ip) as pv -- 客户端的 IP
, count (distinct client_ip) as uv -- 客户端去重
, cast (max (c_time) as TIMESTAMP) as c_time -- 申请的工夫
from
source_ods_fact_log_track_action
group
by p_dt;
INSERT
into result_cps_total_summary_pvuv_min
select
a.summary_date,
-- 工夫分区
cast (DATE_FORMAT (c_time, 'HH:mm') as varchar) as summary_min,
-- 取出小时分钟级别的工夫
a.pv,
a.uv,
CURRENT_TIMESTAMP as currenttime -- 以后工夫
from
result_cps_total_summary_pvuv_min_01 AS a;
点击【上线】之后,在作业运维页面点击启动按钮,直到状态更新为 RUNNING 状态。
在 control center 的【Topics->Messages】页面,逐条发送测试音讯,格局为:
{"account_id":"id1","client_ip":"72.11.1.111","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:11:00"}
{"account_id":"id2","client_ip":"72.11.1.112","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:12:00"}
{"account_id":"id3","client_ip":"72.11.1.113","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:13:00"}
3.4 查看 PV 和 UV 成果
能够看出 rds 数据表的 pv 和 uv 会随着发送的音讯数据,动静的变动,同时还能够通过【数据可视化】来查看相应的图表信息。
pv 图表展现:
uv 图表展现:
原文链接
本文为阿里云原创内容,未经容许不得转载。