关于jquery:Serverless在游戏运营行业进行数据采集分析的最佳实践

2次阅读

共计 11346 个字符,预计需要花费 29 分钟才能阅读完成。

简介:这个架构不光实用于游戏经营行业,其实任何大数据采集传输的场景都是实用的,目前也曾经有很多客户正在基于 Serverless 的架构跑在生产环境,或者正走在革新 Serverless 架构的路上。

家喻户晓,游戏行业在当今的互联网行业中算是一棵常青树。在疫情之前的 2019 年,中国游戏市场营收规模约 2884.8 亿元,同比增长 17.1%。2020 年因为疫情,游戏行业更是突飞猛进。玩游戏本就是中国网民最广泛的娱乐形式之一,疫情期间更甚。据不齐全统计,截至 2019 年,中国移动游戏用户规模约 6.6 亿人,占中国总网民规模 8.47 亿的 77.92%,可见游戏作为一种低门槛、低成本的娱乐伎俩,已成为大部分人生存中司空见惯的一部分。

对于玩家而言,市面上的游戏数量多如牛毛,那么玩家如何能发现和认知到一款游戏,并且继续的玩上来恐怕是所有游戏厂商须要思考的问题。加之 2018 年游戏版号停发事件,游戏厂商更加珍惜每一个已取得版号的游戏产品,所以这也使得“深度打磨产品质量”和“进步经营精密水平”这两个游戏产业倒退方向成为宽广游戏厂商的倒退思路,无论是新游戏还是老游戏都在致力落实这两点:

• 新游戏:面向玩家须要提供更短缺的推广资源和更残缺的游戏内容。
• 老游戏:通过用户行为剖析,投入更多的精力和老本,制作更优质的版本内容。

这里咱们重点来看新游戏。一家游戏企业辛辛苦苦研发三年,等着新游戏发售时一飞冲天。那么问题来了,新游戏如何被宽广玩家看到?

首先来看看游戏行业公司的分类:

• 游戏研发商:研发游戏的公司,生产和制作游戏内容。比方王者光荣的所有英雄设计、游戏战斗场景、战斗逻辑等,全副由游戏研发公司提供。
• 游戏发行商:游戏发行商的次要工作分三大块:市场工作、经营工作、客服工作。游戏发行商把控游戏命根子,市场工作外围是导入玩家,经营工作外围是将用户价值最大化、赚取更多利益。

• 游戏平台 / 渠道商:游戏平台和渠道商的外围目标就是曝光游戏,让尽量多的人能发现你的游戏。

这三种类型的业务,有专一于其中某一畛域的独立公司,也有能承接全副业务的公司,但无论那一种,这三者之间的关系是不会变的:

所以不难理解,想让更多的玩家看到你的游戏,游戏发行和经营是要害。艰深来讲,如果你的游戏呈现在目前所有大家熟知的平台广告中,那么最起码游戏的新用户注册数量是很可观的。因而这就引入了一个关键词:买量。

依据数据显示,2019 年月均买量手游数达 6000+ 款,而 2018 年仅为 4200 款。另一方面,随着抖音、微博等超级 APP 在游戏买量市场的资源歪斜,也助推手游买量的成果和效率有所晋升,游戏厂商更违心应用买量的形式来吸引用户。

但须要留神的是,在游戏买量的精准化水平一直进步的同时,买量的老本也在节节攀升,唯有合理配置买量、渠道与整合营销之间的关系,能力将宣发资源施展到最大的成果。

艰深来讲,买量其实就是在各大支流平台投放广告,宽广用户看到游戏广告后,有可能会点击广告,而后进入游戏厂商的宣传页面,同时会采集用户的一些信息,而后游戏厂商对采集到的用户信息进行大数据分析,进行进一步的定向推广。

游戏经营外围诉求

游戏厂商花钱买量,换来的用户信息以及新用户注册信息是为继续的游戏经营服务的,那么这个场景的外围诉求就是采集用户信息的完整性。比如说,某游戏厂商一天花 5000w 投放广告,在某平台某时段产生了每秒 1w 次的广告点击率,那么在这个时段内每一个点击广告的用户信息要残缺的被采集到,而后入库进行后续剖析。这就对数据采集零碎提出了很高的要求。这其中,最外围的一点就是零碎裸露接口的环节要可能安稳承载买量期间不定时的流量脉冲。在买量期间,游戏厂商通常会在多个平台投放广告,每个平台投放广告的工夫是不一样的,所以就会呈现全天不定时的流量脉冲景象。如果这个环节呈现问题,那么相当于买量的钱就打水漂了。

数据采集零碎传统架构

上图是一个绝对传统的数据采集零碎架构,最要害的就是裸露 HTTP 接口回传数据这部分,这部分如果出问题,那么采集数据的链路就断了。但这部分往往会面临两个挑战:

• 当流量脉冲来的时候,这部分是否能够疾速扩容以应答流量冲击。
• 游戏经营具备潮汐个性,并非天天都在进行,这就须要思考如何优化资源利用率。

通常状况下,在游戏有经营流动之前,会提前告诉运维同学,对这个环节的服务减少节点,但要减少多少其实是无奈预估的,只能大略拍一个数字。这是在传统架构下常常会呈现的场景,这就会导致两个问题:

• 流量太大,节点加少了,导致一部分流量的数据没有采集到。
• 流量没有预期那么大,节点加多了,导致资源节约。

数据采集零碎 Serverless 架构

咱们能够通过 函数计算(函数计算的基本概念能够参考这篇文章)来取代传统架构中裸露 HTTP 回传数据这部分,从而完满解决传统架构中存在问题,先来看架构图:

传统架构中的两个问题均能够通过函数计算百毫秒弹性的个性来解决。咱们并不需要去估算营销流动会带来多大的流量,也不须要去放心和思考对数据采集零碎的性能,运维同学更不须要提前准备 ECS。

因为函数计算的极致弹性个性,当没有买量、没有营销流动的时候,函数计算的运行实例是零。有买量流动时,在流量脉冲的状况下,函数计算会疾速拉起实例来承载流量压力;当流量缩小时,函数计算会及时开释没有申请的实例进行缩容。所以 Serverless 架构带来的劣势有以下三点:

• 无需运维染指,研发同学就能够很快的搭建进去。
• 无论流量大小,均能够安稳的承接。
• 函数计算拉起的实例数量能够紧贴流量大小的曲线,做到资源利用率最优化,再加上按量计费的模式,能够最大水平优化老本。

架构解析

从下面的架构图能够看到,整个采集数据阶段,分了两个函数来实现,第一个函数的作用是单纯的裸露 HTTP 接口接收数据,第二个函数用于解决数据,而后将数据发送至音讯队列 Kafka 和数据库 RDS。

1. 接收数据函数

咱们关上函数计算控制台,创立一个函数:
• 函数类型:HTTP(即触发器为 HTTP)
• 函数名称:receiveData
• 运行环境:Python3

• 函数实例类型:弹性实例
• 函数执行内存:512MB
• 函数运行超时工夫:60 秒
• 函数单实例并发度:1

• 触发器类型:HTTP 触发器
• 触发器名称:defaultTrigger
• 认证形式:anonymous(即无需认证)
• 申请形式:GET,POST

创立好函数之后,咱们通过在线编辑器编写代码:

# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
HELLO_WORLD = b'Hello world!n'
def handler(environ, start_response):
    logger = logging.getLogger() 
    context = environ['fc.context']
    request_uri = environ['fc.request_uri']
    for k, v in environ.items():
      if k.startswith('HTTP_'):
        # process custom request headers
        pass
    try:        
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))    
    except (ValueError):        
        request_body_size = 0   
    # 接管回传的数据
    request_body = environ['wsgi.input'].read(request_body_size)  
    request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
    request_body_obj = json.loads(request_body_str)
    logger.info(request_body_obj["action"])
    logger.info(request_body_obj["articleAuthorId"])

    status = '200 OK'
    response_headers = [('Content-type', 'text/plain')]
    start_response(status, response_headers)
    return [HELLO_WORLD]

此时的代码非常简单,就是接管用户传来的参数,咱们能够调用接口进行验证:

能够在函数的日志查问中看到此次调用的日志:

同时,咱们也能够查看函数的链路追踪来剖析每一个步骤的调用耗时,比方函数接到申请→冷启动(无沉闷实例时)→筹备代码→执行初始化办法→执行入口函数逻辑这个过程:

从调用链路图中能够看到,方才的那次申请蕴含了冷启动的工夫,因为过后没有沉闷实例,整个过程耗时 418 毫秒,真正执行入口函数代码的工夫为 8 毫秒。

当再次调用接口时,能够看到就间接执行了入口函数的逻辑,因为此时曾经有实例在运行,整个耗时只有 2.3 毫秒:

2. 解决数据的函数

第一个函数是通过在函数计算控制台在界面上创立的,抉择了运行环境是 Python3,咱们能够在官网文档中查看预置的 Python3 运行环境内置了哪些模块,因为第二个函数要操作 Kafka 和 RDS,所以须要咱们确认对应的模块。

从文档中能够看到,内置的模块中蕴含 RDS 的 SDK 模块,然而没有 Kafka 的 SDK 模块,此时就须要咱们手动装置 Kafka SDK 模块,并且创立函数也会应用另一种形式。

Funcraft

Funcraft 是一个用于反对 Serverless 利用部署的命令行工具,能帮忙咱们便捷地治理函数计算、API 网关、日志服务等资源。它通过一个资源配置文件(template.yml),帮助咱们进行开发、构建、部署操作。

所以第二个函数咱们须要应用 Fun 来进行操作,整个操作分为四个步骤:

• 装置 fun 工具。
• 编写 template.yml 模板文件,用来形容函数。
• 装置咱们须要的第三方依赖。
• 上传部署函数。

装置 Fun

Fun 提供了三种装置形式:

• 通过 npm 包治理装置 —— 适宜所有平台(Windows/Mac/Linux)且曾经预装了 npm 的开发者。
• 通过下载二进制装置 —— 适宜所有平台(Windows/Mac/Linux)。
• 通过 Homebrew 包管理器装置 —— 适宜 Mac 平台,更合乎 MacOS 开发者习惯。

文本示例环境为 Mac,所以应用 npm 形式装置,十分的简略,一行命令搞定:
sudo npm install @alicloud/fun -g

装置实现之后。在管制终端输出 fun 命令能够查看版本信息:

$ fun --version
3.6.20

在第一次应用 fun 之前须要先执行 fun config 命令进行配置,依照提醒,顺次配置 Account ID、Access Key Id、Secret Access Key、Default Region Name 即可。其中 Account ID、Access Key Id 你能够从函数计算控制台首页的右上方取得:

fun config
? Aliyun Account ID _**01
? Aliyun Access Key ID**_ qef6j
? Aliyun Access Key Secret *UFJG
? Default region name cn-hangzhou
? The timeout in seconds for each SDK client invoking 60
? The maximum number of retries for each SDK client 3

编写 template.yml

新建一个目录,在该目录下创立一个名为 template.yml 的 YAML 文件,该文件次要形容要创立的函数的各项配置,说白了就是将函数计算管制台上配置的那些配置信息以 YAML 格局写在文件里:

ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
FCBigDataDemo:
Type: 'Aliyun::Serverless::Service'
Properties:
Description: 'local invoke demo'
VpcConfig:
VpcId: 'vpc-xxxxxxxxxxx'
VSwitchIds: ['vsw-xxxxxxxxxx']
SecurityGroupId: 'sg-xxxxxxxxx'
LogConfig:
Project: fcdemo
Logstore: fc_demo_store
dataToKafka:
Type: 'Aliyun::Serverless::Function'
Properties:
Initializer: index.my_initializer
Handler: index.handler
CodeUri: './'
Description: ''
Runtime: python3

咱们来解析以上文件的核心内容:

• FCBigDataDemo:自定义的服务名称。通过上面的 Type 属性表明是服务,即Aliyun::Serverless::Service
• Properties:Properties 下的属性都是该服务的各配置项。
• VpcConfig:服务的 VPC 配置,蕴含:

  1. VpcId:VPC ID。
  2. VSwitchIds:交换机 ID,这里是数组,能够配置多个交换机。
  3. SecurityGroupId:平安组 ID。

• LogConfig:服务绑定的日志服务(SLS)配置,蕴含:

  1. Project:日志服务项目。
  2. Logstore:LogStore 名称。

• dataToKafka:该服务下自定义的函数名称。通过上面的 Type 属性表明是函数,即Aliyun::Serverless::Function
• Properties:Properties 下的属性都是该函数的各配置项。
• Initializer:配置初始化函数。
• Handler:配置入口函数。
• Runtime:函数运行环境。

目录构造为:

装置第三方依赖

服务和函数的模板创立好之后,咱们来装置须要应用的第三方依赖。在这个示例的场景中,第二个函数须要应用 Kafka SDK,所以能够通过 fun 工具联合 Python 包管理工具 pip 进行装置:

fun install –runtime python3 –package-type pip kafka-python

执行命令后有如下提示信息:

此时咱们会发现在目录下会生成一个.fun 文件夹,咱们装置的依赖包就在该目录下:

部署函数

当初编写好了模板文件以及装置好了咱们须要的 Kafka SDK 后,还须要增加咱们的代码文件 index.py,代码内容如下:

# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
from kafka import KafkaProducer
producer = None
def my_initializer(context):    
    logger = logging.getLogger() 
    logger.info("init kafka producer")
    global producer
    producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
def handler(event, context):
    logger = logging.getLogger()   
    # 接管回传的数据
    event_str = json.loads(event)
    event_obj = json.loads(event_str)
    logger.info(event_obj["action"])
    logger.info(event_obj["articleAuthorId"])
    # 向 Kafka 发送音讯
    global producer
    producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
    producer.close()
    return 'hello world'

代码很简略,这里做以简略的解析:

my_initializer:函数实例被拉起时会先执行该函数,而后再执行 handler 函数,当函数实例在运行时,之后的申请都不会执行 my_initializer 函数。个别用于各种连贯的初始化工作,这里将初始化 Kafka Producer 的办法放在了这里,防止重复初始化 Produer。
handler:该函数只有两个逻辑,接管回传的数据和将数据发送至 Kafka 的指定 Topic。
上面通过 fun deploy 命令部署函数,该命令会做两件事:
• 依据 template.yml 中的配置创立服务和函数。
• 将 index.py.fun上传至函数中。

登录函数计算控制台,能够看到通过 fun 命令部署的服务和函数:

进入函数,也能够清晰的看到第三方依赖包的目录构造:

3. 函数之间调用

目前两个函数都创立好了,上面的工作就是由第一个函数接管到数据后拉起第二个函数发送音讯给 Kafka。咱们只须要对第一个函数做些许改变即可:

# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
import fc2
HELLO_WORLD = b'Hello world!n'
client = None
def my_initializer(context):    
    logger = logging.getLogger() 
    logger.info("init fc client")
    global client
    client = fc2.Client(
        endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
        accessKeyID="your_ak",
        accessKeySecret="your_sk"
    )
def handler(environ, start_response):
    logger = logging.getLogger() 
    context = environ['fc.context']
    request_uri = environ['fc.request_uri']
    for k, v in environ.items():
      if k.startswith('HTTP_'):
        # process custom request headers
        pass
    try:        
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))    
    except (ValueError):        
        request_body_size = 0   
    # 接管回传的数据
    request_body = environ['wsgi.input'].read(request_body_size)  
    request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
    request_body_obj = json.loads(request_body_str)
    logger.info(request_body_obj["action"])
    logger.info(request_body_obj["articleAuthorId"])
    global client
    client.invoke_function(
        'FCBigDataDemo',
        'dataToKafka',
        payload=json.dumps(request_body_str),
        headers = {'x-fc-invocation-type': 'Async'}
    )

    status = '200 OK'
    response_headers = [('Content-type', 'text/plain')]
    start_response(status, response_headers)
    return [HELLO_WORLD]

如下面代码所示,对第一个函数的代码做了三个中央的改变:

• 导入函数计算的库:import fc2
• 增加初始化办法,用于创立函数计算 Client:

def my_initializer(context):
        logger = logging.getLogger()
        logger.info("init fc client")
        global client
        client = fc2.Client(
            endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
            accessKeyID="your_ak",
            accessKeySecret="your_sk"
)

这里须要留神的时,当咱们在代码里减少了初始化办法后,须要在函数配置中指定初始化办法的入口:

• 通过函数计算 Client 调用第二个函数:

global client
    client.invoke_function(
            'FCBigDataDemo',
            'dataToKafka',
          payload=json.dumps(request_body_str),
            headers = {'x-fc-invocation-type': 'Async'}
)

invoke_function函数有四个参数:

• 第一个参数:调用函数所在的服务名称。
• 第二个参数:调用函数的函数名称。
• 第三个参数:向调用函数传的数据。
• 第四个参数:调用第二个函数 Request Header 信息。这里次要通过 x-fc-invocation-type 这个 Key 来设置是同步调用还是异步调用。这里设置 Async 为异步调用。

如此设置,咱们便能够验证通过第一个函数提供的 HTTP 接口发动申请→采集数据→调用第二个函数→将数据作为音讯传给 Kafka 这个流程了。

应用两个函数的目标

到这里有些同学可能会有疑难,为什么须要两个函数,而不在第一个函数里间接向 Kafka 发送数据呢?咱们先来看这张图:

当咱们应用异步调用函数时,在函数外部会默认先将申请的数据放入音讯队列进行第一道削峰填谷,而后每一个队列在对应函数实例,通过函数实例的弹性拉起多个实例进行第二道削峰填谷。所以这也就是为什么这个架构能稳固承载大并发申请的外围起因之一。

4. 配置 Kafka

在游戏经营这个场景中,数据量是比拟大的,所以对 Kafka 的性能要求也是比拟高的,相比开源自建,应用云上的 Kafka 省去很多的运维操作,比方:

• 咱们不再须要再保护 Kafka 集群的各个节点。
• 不须要关怀主从节点数据同步问题。
• 能够疾速、动静扩大 Kafka 集群规格,动静减少 Topic,动静减少分区数。
• 欠缺的指标监控性能,音讯查问性能。

总的来说,就是所有 SLA 都有云上兜底,咱们只须要关注在音讯发送和音讯生产即可。

所以咱们能够关上 Kafka 开明界面,依据理论场景的需要一键开明 Kafka 实例,开明 Kafka 后登录控制台,在根本信息中能够看到 Kafka 的接入点:

• 默认接入点:走 VPC 内网场景的接入点。
• SSL 接入点:走公网场景的接入点。

将默认接入点配置到函数计算的第二个函数中即可。

....
producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
....

而后点击左侧控制台Topic 治理,创立 Topic:

将创立好的 Topic 配置到函数计算的第二个函数中即可。

...
# 第一个参数为 Topic 名称
producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
...

上文曾经列举过云上 Kafka 的劣势,比方动静减少 Topic 的分区数,咱们能够在 Topic 列表中,对 Topic 的分区数进行动静调整:

单 Topic 最大反对到 360 个分区,这是开源自建无奈做到的。

接下来点击控制台左侧Consumer Group 治理,创立 Consumer Group:

至此,云上的 Kafka 就算配置结束了,即 Producer 能够往刚刚创立的 Topic 中发消息了,Consumer 能够设置刚刚创立的 GID 以及订阅 Topic 进行音讯承受和生产。

Flink Kafka 消费者

在这个场景中,Kafka 前面往往会跟着 Flink,所以这里简要给大家介绍一下在 Flink 中如何创立 Kafka Consumer 并生产数据。代码片段如下:

final ParameterTool parameterTool = ParameterTool.fromArgs(args);
String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo");
String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092");
Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo");
FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps);
kafka.setStartFromLatest();
kafka.setCommitOffsetsOnCheckpoints(false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserBehaviorEvent> dataStreamByEventTime = env.addSource(kafka);

以上就是构建 Flink Kafka Consumer 和增加 Kafka Source 的代码片段,还是非常简单的。

压测验证

至此,整个数据采集的架构就搭建结束了,上面咱们通过压测来测验一下整个架构的性能。这里应用阿里云 PTS 来进行压测。

创立压测场景

关上 PTS 控制台,点击左侧菜单 创立压测 / 创立 PTS 场景

在场景配置中,将第一个函数计算函数裸露的 HTTP 接口作为串联链路,配置如下图所示:

接口配置完后,咱们来配置施压:

• 压力模式:
• 并发模式:指定有多少并发用户同时发申请。
• RPS 模式:指定每秒有多少申请数。
• 递增模式:在压测过程中能够通过手动调节压力,也能够主动按百分比递增压力。
• 最大并发:同时有多少个虚构用户发动申请。
• 递增百分比:如果是主动递增的话,按这里的百分比递增。
• 单量级继续时长:在未齐全达到压力全量的时候,每一级梯度的压力放弃的时长。
• 压测总时长:一共须要压测的时长。

这里因为资源老本起因,并发用户数设置为 2500 来进行验证。

从上图压测中的状况来看,TPS 达到了 2w 的封顶,549w+ 的申请,99.99% 的申请是胜利的,那 369 个异样也能够点击查看,都是压测工具申请超时导致的。

总结

至此,整个基于 Serverless 搭建的大数据采集传输的架构就搭建好了,并且进行了压测验证,整体的性能也是不错的,并且整个架构搭建起来也非常简单和容易了解。这个架构不光实用于游戏经营行业,其实任何大数据采集传输的场景都是实用的,目前也曾经有很多客户正在基于 Serverless 的架构跑在生产环境,或者正走在革新 Serverless 架构的路上。

作者:计缘,阿里云解决方案架构师
原文链接
本文为阿里云原创内容,未经容许不得转载

正文完
 0