关于后端:用-Pulsar-开发多人小游戏六用-Pulsar-Function-制作房间计分板

6次阅读

共计 4681 个字符,预计需要花费 12 分钟才能阅读完成。

本文是《用 Pulsar 开发多人在线小游戏》的第三篇,配套源码和全副文档参见我的 GitHub 仓库 play-with-pulsar 以及我的文章列表。

Pulsar Function 容许你编写函数对 topic 中的数据进行一些解决,函数的输出就是一个或多个 topic 中的音讯,函数的返回值能够发送到其余 topic 中。

官网的一张图就能看明确了:

比方说,发送到 topicA 中的音讯都是英文单词,我想把这些英文单词都转化成大写并转发到 topicB 中,那么就能够写一个 Pulsar function 做这个事件。

Pulsar Function 还反对 Stateful Storage,简略来说就是键值对的存储服务。

比方官网给了一个单词计数器的例子:

这个 Pulsar Function 会从一个 topic 中读取句子并切分成单词,而后统计每个单词呈现的频率。

单词频率其实是以键值对的模式存储在这个 Function 中的,能够通过 admin API 来读取键对应的值,官网文档:

https://pulsar.apache.org/doc…

Pulsar Function 能够独自部署成服务,也能够上传到 broker 上,作为 broker 的一部分。不过目前社区的倡议是部署独自的 Function 集群。

目前 Pulsar 反对应用 Python、Go、Java 来开发 Function,API 文档:

https://pulsar.apache.org/doc…

文档给出的例子比拟少,能够间接看 Pulsar Function examples,间接依据需要抉择适合的 Function 进行开发就行了。

本文就以炸弹人游戏为例,利用 Pulsar Function 开发游戏房间的计分板性能。

在咱们的炸弹人游戏中,玩家的死亡也会被形象成事件发送到 topic 中:

type UserDeadEvent struct {
    // 被炸死的玩家名
    playerName string
    // 杀手玩家名
    killerName string
}

相似单词计数器,咱们这里也能够实现一个 Pulsar Function,专门过滤玩家死亡的 UserDeadEvent 事件,而后统计 killerName 呈现的次数,就能够作为该玩家的分数了。

当然,咱们须要实时更新房间内玩家的分数,所以每个游戏房间除了 event topic 和 map topic 之外,咱们还须要一个 score topic,让 Pulsar Function 把分数更新事件输入到 score topic,并且利用 Pulsar client 的 tableview 性能做一个比拟好的展示。

那么当初须要实现的 Pulsar Function 有如下需要:

1、因为玩家产生的事件都发到了格局为 {roomName}-event-topic 的 topic 中,所以函数应该接管所有这些 topic 的音讯。

2、读取这些音讯的 Type 字段,过滤出 UserDeadEvent 事件,并读取 playerNamekillerNamekillerName 呈现的次数就是该玩家取得的分数。

3、还须要把玩家分数输入到另一个格局为 {roomName}-score-topic 的 topic 中。

上面开始开发。

Function 的开发

先贴官网文档:

https://pulsar.apache.org/doc…

首先须要设置 Pulsar Function 开发相干的 Maven 依赖:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-functions-api</artifactId>
    <version>${pulsar.version}</version>
</dependency>

而后就能够开始开发了,残缺的源码在 function-code 目录:

public class ScoreboardFunction implements Function<GenericJsonRecord, Void> {

    @Override
    public Void process(GenericJsonRecord input, Context context) {String type = (String) input.getField("type");
        if (type.equals("UserDeadEvent")) {String player = (String) input.getField("name");
            String killer = (String) input.getField("comment");
           if (player.equals(killer)) {
               // kill himself
               return null;
           }

            // get the source topic of this message
            Optional<String> inputTopic = context.getCurrentRecord().getTopicName();
            if (inputTopic.isEmpty()) {return null;}
            // calculate the corresponding topic to send score
            Optional<String> outputTopic = changeEventTopicNameToScoreTopicName(inputTopic.get());
            if (outputTopic.isEmpty()) {return null;}
            // roomName-playerName as the stateful key  /
            // store the score in stateful function
            String killerKey = parseRoomName(inputTopic.get()).get() + "-" + killer;
            context.incrCounter(killerKey, 1);

            // send the score messages to score topic
            long score = context.getCounter(killerKey);
            try {
                // player name as the key, score as the value
                context.newOutputMessage(outputTopic.get(), Schema.STRING)
                        .key(killer)
                        .value(score + "")
                        .send();} catch (PulsarClientException e) {
                // todo: ignore error for now
                e.printStackTrace();}
        }

        return null;
    }
}

因为咱们前文给 topic 中的音讯设置了 JSON Schema,所以这里设置 topic 中的音讯类型为 GenericJsonRecord

这段代码的逻辑应该不难理解,input 就是发到 event topic 的音讯,通过 Pulsar Function 的 context 能够拿到这个 event topic 的名字,因为 event topic 名字蕴含游戏房间名,所以只有批改 event topic 名称后缀即可失去 score topic 的名字。

函数的次要工作是过滤出 UserDeadEvent,读取 killerName。思考到不能把不同房间的击杀事件混在一起,我把 {roomName}-{killerName} 作为 Function 的键,并递增计数器记录玩家的分数,最初调用 context.newOutputMessage 把玩家的分数发送到房间对应的 score topic 中。

Function 的调试

能够参考这篇官网文档,用 localrun 模式在本地调试 Function:

https://pulsar.apache.org/doc…

localrun 模式相当于间接在本地起了一个 Function worker,可能连贯到 Pulsar,并运行咱们方才开发的 Function 代码。

残缺的源码在 function-code 目录,留神咱们要对 Function 进行正确的配置,比方 Function 类以及作为输出的 topic 名称等等:

 String inputTopic = ".*-event-topic";
// enable regex support to subscribe multiple topics
HashMap<String, ConsumerConfig> inputSpecs = new HashMap<>();
ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(true).build();
inputSpecs.put(inputTopic, consumerConfig);
functionConfig.setInputSpecs(inputSpecs);

functionConfig.setClassName(ScoreboardFunction.class.getName());

配置完 functionConfig 后能够启动一个本地的 Function worker:

LocalRunner localRunner = LocalRunner.builder()
        .brokerServiceUrl("pulsar://localhost:6650")
        .stateStorageServiceUrl("bk://localhost:4181")
        .functionConfig(functionConfig)
        .build();

localRunner.start(false);

其中 brokerServiceUrl 是 Pulsar broker 的连贯地址,stateStorageServiceUrl 是提供 stateStorage 的 bookkeeper 地址,默认状况下在 4181 端口。

这样,只有启动 main 函数,就会启动 local runner,并加载咱们刚开发的 Function,把所有后缀为 -event-topic 的 topic 中的音讯输出给 Function。

计分板的开发

咱们方才开发的 Function 会把玩家名称和该玩家取得的分数作为一条音讯的键和值发送到 {roomName}-score-topic 中,那么玩家客户端如何获取这些信息呢?这就要用到之前介绍的 tableView 性能了。

能够在游戏客户端代码中看到 tableView 的应用:

tableView, err := client.CreateTableView(pulsar.TableViewOptions{
    Topic:           roomName + "-score-topic",
    Schema:          pulsar.NewStringSchema(nil),
    SchemaValueType: reflect.TypeOf(""),
})

咱们在游戏数据中保护一个名为 scores 的 lru 缓存,存储最近的最多 5 名玩家的分数信息,同时利用 tableView 的 ForEachAndListen 办法更新 lru 缓存:

client.tableView.ForEachAndListen(func(playerName string, i interface{}) error {score := *i.(*string)
    g.scores.Add(playerName, score)
    return nil
})

这样,当玩家分数更新时,lru 缓存中的数据就会更新,咱们只有把对应的分数数据显示到游戏界面上即可。

更多高质量干货文章,请关注我的微信公众号 labuladong 和算法博客 labuladong 的算法秘籍

正文完
 0