关于存储:利用-DynamoDB-和-S3-结合-gzip-压缩最大化存储玩家数据

15次阅读

共计 6659 个字符,预计需要花费 17 分钟才能阅读完成。

前言

一些传统游戏架构中,采纳 MySQL 存储玩家存档数据,利用分库分表扩散单库单表的存储和性能压力,从而达到反对更多玩家的目标。随着数据量增长,数据表中 varchar 类型曾经无奈满足游戏中单字段的存储需要,而 blob 字段的利用对于这种架构下革新老本是最低的,因而一些游戏开始在最后设计的时候,数据库表构造就采纳了 Blob 字段作为其玩家的游戏工作、道具等数据的存储。

Blob 字段在 MySQL 5.6 / 5.7 中存在 bug(MySQL Bugs: #96466),这个 bug 有概率导致数据库集群解体,造成数据失落。即便在 MySQL 8.0 中,因为引擎自身设计的限度,在单表 20GB 以上,高频的更新就会导致数据库呈现性能受限。并且随着表增大,性能问题会越来越显著。

随着当游戏业务暴发时增长的时候,传统关系型数据库在分库分表的时候,须要进行利用革新,同时存在肯定的停机保护工夫。而且这些扩大实现后,在游戏的夕阳期进行膨胀也须要进行利用革新,这无疑对业务开发和根底运维的部门造成了很多额定的工作量。

DynamoDB 在利用到这个场景上是十分实用的。在业务倒退任意阶段,都能够实现 0 停机的扩大,主动伸缩的个性。而且这所有对于应用层是齐全通明的。同时在日常运维中也能够贴合业务负载进行动静扩缩容,从而进一步降低成本。

亚马逊云科技开发者社区为开发者们提供寰球的开发技术资源。这里有技术文档、开发案例、技术专栏、培训视频、流动与比赛等。帮忙中国开发者对接世界最前沿技术,观点,和我的项目,并将中国优良开发者或技术举荐给寰球云社区。如果你还没有关注 / 珍藏,看到这里请肯定不要匆匆划过,点这里让它成为你的技术宝库!

概述

本文次要讲述在游戏场景下,依据 DynamoDB 的限度(每个我的项目都必须小于 400KB),在限度下尽可能存储更多的数据和当存储量超出限度时,扩大存储的最大化利用空间。重点形容如何利用 DynamoDB+S3 保留玩家存档中的大数据量属性,防止数据存在 S3 上后,在数据写入 S3 时,产生读取到 S3 旧存档的状况。同时利用 gzip 压缩缩小数据大小,缩小 IO 的开销晋升性能。

架构图

实战编码

指标

  1. 所有数据保留前都进行 gzip 压缩,读取后都用 gzip 解压。
  2. S3 存储和 DynamoDB 的 binary 字段存储能够自适应。如果用户数据压缩后如果大于指定的值则写入 S3,否则间接保留到以后数据库我的项目中的字段。
  3. DynamoDB 我的项目读取的时候,解析解压后的字段,如果字符串以 s3:// 结尾,则持续从 S3 中获取数据
  4. 设置 S3 读锁字段,判断以后状态是否正在写入 S3,以阻塞读过程。在每个我的项目须要写入 S3 前都会设置 read_lock 为 Ture,S3 写胜利后则设置为 False。读取记录后,read_lock 是否为 True,如果是判断被阻塞,过程会期待一段时间后进行重试,直到重试次数超出指定的值。重试超时后,读过程会认为写过程可能因为某种原因导致写永远无奈胜利,于是会将 read_lock 设置成 False。

第一步:初始化环境参数

from time import sleep
import boto3
import gzip
import random
import json
import hashlib
import logging

# 写入 S3 的门槛,超过这个值数据会写入 S3,否则保留在数据库内,默认值 350KB
UPLOAD_TO_S3_THRESHOLD_BYTES = 358400
# 用户数据库保留的指标 S3 存储桶
USER_DATA_BUCKET = 'linyesh-user-data'
# 遇到 S3 有读锁,从新申请最大次数,超出次数限度锁会被主动革除
S3_READ_LOCK_RETRY_TIMES = 10
# 遇到 S3 有读锁,读申请重试间隔时间
S3_READ_RETRY_INTERVAL = 0.2

dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

参数阐明

  • UPLOAD_TO_S3_THRESHOLD_BYTES:为字段最大的数据存储长度限度。单位为:字节数。因为 DynamoDB 一个我的项目(Item)数据大小限度为 400KB。咱们除了数据存档中最大字段还必须预留一部分空间给其余字段,防止整个 Item 超出 400KB。
  • USER_DATA_BUCKET:S3 用于存储超出 400KB 后的玩家大字段数据。须要提前建好,具体步骤参考:创立存储桶
  • S3_READ_LOCK_RETRY_TIMES:限度当玩家在 S3 上的存档处在写入状态时候,读申请重试的次数。在我的项目处于读锁状态的时候,读过程会期待一段时间后重试。
  • S3_READ_RETRY_INTERVAL:读锁状态下,重试读的间隔时间,单位:秒。

留神:S3_READ_LOCK_RETRY_TIMES 乘以 S3_READ_RETRY_INTERVAL 的工夫实践上必须小于 S3 存档上传工夫的最大值,因而理论应用本文中的代码应该依据存档可能的大小来调整这 2 个参数。否则可能存档会有大概率会产生脏读的状况。

第二步:创立 DynamoDB 表

def create_tables():
    """
    创立表
    :return:
    """
    response = dynamodb.create_table(
        TableName='players',
        KeySchema=[
            {
                'AttributeName': 'username',
                'KeyType': 'HASH'
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'username',
                'AttributeType': 'S'
            }
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )

    # Wait until the table exists.
    response.wait_until_exists()

    # Print out some data about the table.
    logger.debug(response.item_count)

第三步:编写辅助逻辑

指数级回退函数

def run_with_backoff(function, retries=5, **function_parameters):
    base_backoff = 0.1  # base 100ms backoff
    max_backoff = 10  # sleep for maximum 10 seconds
    tries = 0
    while True:
        try:
            return function(function_parameters)
        except (ConnectionError, TimeoutError):
            if tries >= retries:
                raise
            backoff = min(max_backoff, base_backoff * (pow(2, tries) + random.random()))
            logger.debug(f"sleeping for {backoff:.2f}s")
            sleep(backoff)
            tries += 1

S3 门路判断函数

def is_s3_path(content):
    return content.startswith('s3://')

S3 文件获取

def get_s3_object(key):
    response = s3.get_object(Bucket=USER_DATA_BUCKET, Key=s3_key_generator(key))
    return response['Body']

查看大小超限

def check_threshold(current_size):
     return current_size > UPLOAD_TO_S3_THRESHOLD_BYTES

S3 Key 生成函数

这个函数能够将玩家的存档随机调配到 S3 桶下不同的 Prefix 中,这有利于进步 S3 中 IO 的性能。

def s3_key_generator(key):  
    s3_prefix = hashlib.md5((key).encode('utf-8')).hexdigest()[:8]  
    return s3_prefix + '/' + key 

文件上传到 S3

def upload_content_to_s3(obj_param):  
    s3_key = s3_key_generator(obj_param['key'])  
    try:  
        response = s3.put_object(Body=obj_param['content_bytes'],  
            Bucket=USER_DATA_BUCKET,  
            Key=s3_key)  
        return "s3://%s/%s" % (USER_DATA_BUCKET, s3_key)  
    except Exception as e:  
        logger.error(e)  
        raise e  

第四步:编写主体逻辑

写入单个我的项目到 DynamoDB 数据库

def put_item(load_data):  
    gzip_data = gzip.compress(load_data)  # 压缩数据  
    logger.debug('压缩后大小 %.2fKB,原始大小 %.2fKB,压缩率 %.2f%%' % (len(gzip_data) / 1024.0,  
        len(load_data) / 1024.0,  
        100.0 * len(gzip_data) / len(load_data)))  
  
    table = dynamodb.Table('players')  
    player_username = 'player' + str(random.randint(1, 1000))  
    if check_threshold(len(gzip_data)):  
        try:  
            # 读锁爱护  
            table.update_item(  
                Key={'username': player_username,},  
                UpdateExpression="set read_lock = :read_lock",  
                ExpressionAttributeValues={':read_lock': True,},  
            )  
  
            # 写入数据到 S3  
            s3_path = run_with_backoff(upload_content_to_s3, key=player_username, content_bytes=gzip_data)  
            # 解除读锁爱护,同时存储数据在 S3 上到门路  
            response = table.put_item(  
                Item={  
                    'username': player_username,  
                    'read_lock': False,  
                    'inventory': gzip.compress(s3_path.encode(encoding='utf-8', errors='strict')),  
                }  
            )  
            logger.debug('胜利上传大纪录到 S3,门路:%s' % s3_path)  
        except Exception as e:  
            logger.debug('存档失败')  
            logger.error(e)  
    else:  
        response = table.put_item(  
            Item={  
                'username': player_username,  
                'inventory': gzip_data,  
            }  
        )  
        logger.debug('胜利上传纪录, username=%s' % player_username) 

读取数据库中一条玩家记录

def get_player_profile(uid):  
    """ 
    读取记录 
    :param uid: 玩家 id 
    :return: 
    """table = dynamodb.Table('players')  
    player_name = 'player' + str(uid)  
  
    retry_count = 0  
    while True:  
        response = table.get_item(  
            Key={'username': player_name,}  
        )  
  
        if 'Item' not in response:  
            logger.error('Not Found')  
            return {}  
  
        item = response['Item']  
        # 查看读锁信息, 如果存在锁依据参数设置,距离一段时间从新读取记录  
        if 'read_lock' in item and item['read_lock']:  
            retry_count += 1  
            logger.info('以后第 %d 次重试' % retry_count)  
            # 如果超时无奈读取记录,则打消读锁,并从新读取记录  
            if retry_count < S3_READ_LOCK_RETRY_TIMES:  
                sleep(S3_READ_RETRY_INTERVAL)  
                continue  
            else:  
                table.update_item(  
                    Key={'username': player_name,},  
                    UpdateExpression="set read_lock = :read_lock",  
                    ExpressionAttributeValues={':read_lock': False,},  
                )  
  
        inventory_bin = gzip.decompress(item['inventory'].value)  # 解压缩数据  
        inventory_str = inventory_bin.decode("utf-8")  
        if is_s3_path(inventory_str):  
            player_data = gzip.decompress(get_s3_object(player_name).read())  
            inventory_json = json.loads(player_data)  
        else:  
            inventory_json = json.loads(inventory_str)  
  
        user_profile = {**response['Item'], **{'inventory': inventory_json}}  
        return user_profile  

最初,编写测试逻辑

筹备几个不同大小的 json 文件,察看写入数据库中的变动。

if __name__ == '__main__':  
    path_example = 'small.json'  
    # path_example = '500kb.json'  
    # path_example = '2MB.json'  
    with open(path_example, 'r') as load_f:  
        load_str = json.dumps(json.load(load_f))  
        test_data = load_str.encode(encoding='utf-8', errors='strict')  
    put_item(test_data)  
  
    # player_profile = get_player_profile(238)  
    # logger.info(player_profile)  

如果须要测试读锁,能够将数据库中单个我的项目的 read_lock 手动设置成 True,而后察看读取逻辑在这个过程中的变动。

总结

在本次测试中发现,json 格局的数据应用 gzip 后,压缩率约为 25% 左右,实践上咱们能够把单个我的项目(item)中能够存储最大约为 1.6MB 的数据项。即使有大量压缩后超过 400KB 的数据,也能够存储到 S3 上,仅在 DynamoDB 中存储元数据和大字段数据在 S3 上的门路。

gzip 会带来一些额定的计算和 IO 开销,然而这些开销次要会落在游戏服务器上,对于数据库来说反而缩小了 IO 的开销。

在大多数场景下,玩家数据即使不压缩也很少会超过 400KB。这种状况下,倡议能够尝试比照压缩启用和不启用两种场景的性能数据。以决定哪种形式更适宜本人的游戏。

限度

对于存在单用户有高并发存档需要的游戏而言,以上设计中并未蕴含在数据存储在 S3 上后,呈现并发写的场景思考。如果有此场景的需要,须要一些应用逻辑或者架构调整。

本篇作者

林业
Amazon 解决方案架构师,负责基于 Amazon 的云计算计划的征询与架构设计。领有超过 14 年研发教训,曾打造千万级用户 APP,多项 Github 开源我的项目贡献者。在游戏、IOT、智慧城市、汽车、电商等多个畛域都领有丰盛的实践经验。

文章起源:https://dev.amazoncloud.cn/column/article/630a281576658473a32…

正文完
 0