

共计 4681 个字符,预计需要花费 12 分钟才能阅读完成。

本文是《用 Pulsar 开发多人在线小游戏》的第三篇,配套源码和全副文档参见我的 GitHub 仓库 play-with-pulsar 以及我的文章列表。

Pulsar Function 容许你编写函数对 topic 中的数据进行一些解决,函数的输出就是一个或多个 topic 中的音讯,函数的返回值能够发送到其余 topic 中。


比方说,发送到 topicA 中的音讯都是英文单词,我想把这些英文单词都转化成大写并转发到 topicB 中,那么就能够写一个 Pulsar function 做这个事件。

Pulsar Function 还反对 Stateful Storage,简略来说就是键值对的存储服务。


这个 Pulsar Function 会从一个 topic 中读取句子并切分成单词,而后统计每个单词呈现的频率。

单词频率其实是以键值对的模式存储在这个 Function 中的,能够通过 admin API 来读取键对应的值,官网文档:


Pulsar Function 能够独自部署成服务,也能够上传到 broker 上,作为 broker 的一部分。不过目前社区的倡议是部署独自的 Function 集群。

目前 Pulsar 反对应用 Python、Go、Java 来开发 Function,API 文档:


文档给出的例子比拟少,能够间接看 Pulsar Function examples,间接依据需要抉择适合的 Function 进行开发就行了。

本文就以炸弹人游戏为例,利用 Pulsar Function 开发游戏房间的计分板性能。

在咱们的炸弹人游戏中,玩家的死亡也会被形象成事件发送到 topic 中:

type UserDeadEvent struct {
    // 被炸死的玩家名
    playerName string
    // 杀手玩家名
    killerName string

相似单词计数器,咱们这里也能够实现一个 Pulsar Function,专门过滤玩家死亡的 UserDeadEvent 事件,而后统计 killerName 呈现的次数,就能够作为该玩家的分数了。

当然,咱们须要实时更新房间内玩家的分数,所以每个游戏房间除了 event topic 和 map topic 之外,咱们还须要一个 score topic,让 Pulsar Function 把分数更新事件输入到 score topic,并且利用 Pulsar client 的 tableview 性能做一个比拟好的展示。

那么当初须要实现的 Pulsar Function 有如下需要:

1、因为玩家产生的事件都发到了格局为 {roomName}-event-topic 的 topic 中,所以函数应该接管所有这些 topic 的音讯。

2、读取这些音讯的 Type 字段,过滤出 UserDeadEvent 事件,并读取 playerNamekillerNamekillerName 呈现的次数就是该玩家取得的分数。

3、还须要把玩家分数输入到另一个格局为 {roomName}-score-topic 的 topic 中。


Function 的开发



首先须要设置 Pulsar Function 开发相干的 Maven 依赖:


而后就能够开始开发了,残缺的源码在 function-code 目录:

public class ScoreboardFunction implements Function<GenericJsonRecord, Void> {

    public Void process(GenericJsonRecord input, Context context) {String type = (String) input.getField("type");
        if (type.equals("UserDeadEvent")) {String player = (String) input.getField("name");
            String killer = (String) input.getField("comment");
           if (player.equals(killer)) {
               // kill himself
               return null;

            // get the source topic of this message
            Optional<String> inputTopic = context.getCurrentRecord().getTopicName();
            if (inputTopic.isEmpty()) {return null;}
            // calculate the corresponding topic to send score
            Optional<String> outputTopic = changeEventTopicNameToScoreTopicName(inputTopic.get());
            if (outputTopic.isEmpty()) {return null;}
            // roomName-playerName as the stateful key  /
            // store the score in stateful function
            String killerKey = parseRoomName(inputTopic.get()).get() + "-" + killer;
            context.incrCounter(killerKey, 1);

            // send the score messages to score topic
            long score = context.getCounter(killerKey);
            try {
                // player name as the key, score as the value
                context.newOutputMessage(outputTopic.get(), Schema.STRING)
                        .value(score + "")
                        .send();} catch (PulsarClientException e) {
                // todo: ignore error for now

        return null;

因为咱们前文给 topic 中的音讯设置了 JSON Schema,所以这里设置 topic 中的音讯类型为 GenericJsonRecord

这段代码的逻辑应该不难理解,input 就是发到 event topic 的音讯,通过 Pulsar Function 的 context 能够拿到这个 event topic 的名字,因为 event topic 名字蕴含游戏房间名,所以只有批改 event topic 名称后缀即可失去 score topic 的名字。

函数的次要工作是过滤出 UserDeadEvent,读取 killerName。思考到不能把不同房间的击杀事件混在一起,我把 {roomName}-{killerName} 作为 Function 的键,并递增计数器记录玩家的分数,最初调用 context.newOutputMessage 把玩家的分数发送到房间对应的 score topic 中。

Function 的调试

能够参考这篇官网文档,用 localrun 模式在本地调试 Function:


localrun 模式相当于间接在本地起了一个 Function worker,可能连贯到 Pulsar,并运行咱们方才开发的 Function 代码。

残缺的源码在 function-code 目录,留神咱们要对 Function 进行正确的配置,比方 Function 类以及作为输出的 topic 名称等等:

 String inputTopic = ".*-event-topic";
// enable regex support to subscribe multiple topics
HashMap<String, ConsumerConfig> inputSpecs = new HashMap<>();
ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(true).build();
inputSpecs.put(inputTopic, consumerConfig);


配置完 functionConfig 后能够启动一个本地的 Function worker:

LocalRunner localRunner = LocalRunner.builder()


其中 brokerServiceUrl 是 Pulsar broker 的连贯地址,stateStorageServiceUrl 是提供 stateStorage 的 bookkeeper 地址,默认状况下在 4181 端口。

这样,只有启动 main 函数,就会启动 local runner,并加载咱们刚开发的 Function,把所有后缀为 -event-topic 的 topic 中的音讯输出给 Function。


咱们方才开发的 Function 会把玩家名称和该玩家取得的分数作为一条音讯的键和值发送到 {roomName}-score-topic 中,那么玩家客户端如何获取这些信息呢?这就要用到之前介绍的 tableView 性能了。

能够在游戏客户端代码中看到 tableView 的应用:

tableView, err := client.CreateTableView(pulsar.TableViewOptions{
    Topic:           roomName + "-score-topic",
    Schema:          pulsar.NewStringSchema(nil),
    SchemaValueType: reflect.TypeOf(""),

咱们在游戏数据中保护一个名为 scores 的 lru 缓存,存储最近的最多 5 名玩家的分数信息,同时利用 tableView 的 ForEachAndListen 办法更新 lru 缓存:

client.tableView.ForEachAndListen(func(playerName string, i interface{}) error {score := *i.(*string)
    g.scores.Add(playerName, score)
    return nil

这样,当玩家分数更新时,lru 缓存中的数据就会更新,咱们只有把对应的分数数据显示到游戏界面上即可。

更多高质量干货文章,请关注我的微信公众号 labuladong 和算法博客 labuladong 的算法秘籍
