本文是《用 Pulsar 开发多人在线小游戏》的第三篇,配套源码和全副文档参见我的 GitHub 仓库 play-with-pulsar 以及我的文章列表。
Pulsar Function 容许你编写函数对 topic 中的数据进行一些解决,函数的输出就是一个或多个 topic 中的音讯,函数的返回值能够发送到其余 topic 中。
官网的一张图就能看明确了:
比方说,发送到 topicA
中的音讯都是英文单词,我想把这些英文单词都转化成大写并转发到 topicB
中,那么就能够写一个 Pulsar function 做这个事件。
Pulsar Function 还反对 Stateful Storage,简略来说就是键值对的存储服务。
比方官网给了一个单词计数器的例子:
这个 Pulsar Function 会从一个 topic 中读取句子并切分成单词,而后统计每个单词呈现的频率。
单词频率其实是以键值对的模式存储在这个 Function 中的,能够通过 admin API 来读取键对应的值,官网文档:
https://pulsar.apache.org/doc…
Pulsar Function 能够独自部署成服务,也能够上传到 broker 上,作为 broker 的一部分。不过目前社区的倡议是部署独自的 Function 集群。
目前 Pulsar 反对应用 Python、Go、Java 来开发 Function,API 文档:
https://pulsar.apache.org/doc…
文档给出的例子比拟少,能够间接看 Pulsar Function examples,间接依据需要抉择适合的 Function 进行开发就行了。
本文就以炸弹人游戏为例,利用 Pulsar Function 开发游戏房间的计分板性能。
在咱们的炸弹人游戏中,玩家的死亡也会被形象成事件发送到 topic 中:
type UserDeadEvent struct {
// 被炸死的玩家名
playerName string
// 杀手玩家名
killerName string
}
相似单词计数器,咱们这里也能够实现一个 Pulsar Function,专门过滤玩家死亡的 UserDeadEvent
事件,而后统计 killerName
呈现的次数,就能够作为该玩家的分数了。
当然,咱们须要实时更新房间内玩家的分数,所以每个游戏房间除了 event topic 和 map topic 之外,咱们还须要一个 score topic,让 Pulsar Function 把分数更新事件输入到 score topic,并且利用 Pulsar client 的 tableview 性能做一个比拟好的展示。
那么当初须要实现的 Pulsar Function 有如下需要:
1、因为玩家产生的事件都发到了格局为 {roomName}-event-topic
的 topic 中,所以函数应该接管所有这些 topic 的音讯。
2、读取这些音讯的 Type
字段,过滤出 UserDeadEvent
事件,并读取 playerName
和 killerName
,killerName
呈现的次数就是该玩家取得的分数。
3、还须要把玩家分数输入到另一个格局为 {roomName}-score-topic
的 topic 中。
上面开始开发。
Function 的开发
先贴官网文档:
https://pulsar.apache.org/doc…
首先须要设置 Pulsar Function 开发相干的 Maven 依赖:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>${pulsar.version}</version>
</dependency>
而后就能够开始开发了,残缺的源码在 function-code 目录:
public class ScoreboardFunction implements Function<GenericJsonRecord, Void> {
@Override
public Void process(GenericJsonRecord input, Context context) {String type = (String) input.getField("type");
if (type.equals("UserDeadEvent")) {String player = (String) input.getField("name");
String killer = (String) input.getField("comment");
if (player.equals(killer)) {
// kill himself
return null;
}
// get the source topic of this message
Optional<String> inputTopic = context.getCurrentRecord().getTopicName();
if (inputTopic.isEmpty()) {return null;}
// calculate the corresponding topic to send score
Optional<String> outputTopic = changeEventTopicNameToScoreTopicName(inputTopic.get());
if (outputTopic.isEmpty()) {return null;}
// roomName-playerName as the stateful key /
// store the score in stateful function
String killerKey = parseRoomName(inputTopic.get()).get() + "-" + killer;
context.incrCounter(killerKey, 1);
// send the score messages to score topic
long score = context.getCounter(killerKey);
try {
// player name as the key, score as the value
context.newOutputMessage(outputTopic.get(), Schema.STRING)
.key(killer)
.value(score + "")
.send();} catch (PulsarClientException e) {
// todo: ignore error for now
e.printStackTrace();}
}
return null;
}
}
因为咱们前文给 topic 中的音讯设置了 JSON Schema,所以这里设置 topic 中的音讯类型为 GenericJsonRecord
。
这段代码的逻辑应该不难理解,input
就是发到 event topic 的音讯,通过 Pulsar Function 的 context
能够拿到这个 event topic 的名字,因为 event topic 名字蕴含游戏房间名,所以只有批改 event topic 名称后缀即可失去 score topic 的名字。
函数的次要工作是过滤出 UserDeadEvent
,读取 killerName
。思考到不能把不同房间的击杀事件混在一起,我把 {roomName}-{killerName}
作为 Function 的键,并递增计数器记录玩家的分数,最初调用 context.newOutputMessage
把玩家的分数发送到房间对应的 score topic 中。
Function 的调试
能够参考这篇官网文档,用 localrun 模式在本地调试 Function:
https://pulsar.apache.org/doc…
localrun 模式相当于间接在本地起了一个 Function worker,可能连贯到 Pulsar,并运行咱们方才开发的 Function 代码。
残缺的源码在 function-code 目录,留神咱们要对 Function 进行正确的配置,比方 Function 类以及作为输出的 topic 名称等等:
String inputTopic = ".*-event-topic";
// enable regex support to subscribe multiple topics
HashMap<String, ConsumerConfig> inputSpecs = new HashMap<>();
ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(true).build();
inputSpecs.put(inputTopic, consumerConfig);
functionConfig.setInputSpecs(inputSpecs);
functionConfig.setClassName(ScoreboardFunction.class.getName());
配置完 functionConfig
后能够启动一个本地的 Function worker:
LocalRunner localRunner = LocalRunner.builder()
.brokerServiceUrl("pulsar://localhost:6650")
.stateStorageServiceUrl("bk://localhost:4181")
.functionConfig(functionConfig)
.build();
localRunner.start(false);
其中 brokerServiceUrl
是 Pulsar broker 的连贯地址,stateStorageServiceUrl
是提供 stateStorage 的 bookkeeper 地址,默认状况下在 4181 端口。
这样,只有启动 main 函数,就会启动 local runner,并加载咱们刚开发的 Function,把所有后缀为 -event-topic
的 topic 中的音讯输出给 Function。
计分板的开发
咱们方才开发的 Function 会把玩家名称和该玩家取得的分数作为一条音讯的键和值发送到 {roomName}-score-topic
中,那么玩家客户端如何获取这些信息呢?这就要用到之前介绍的 tableView 性能了。
能够在游戏客户端代码中看到 tableView 的应用:
tableView, err := client.CreateTableView(pulsar.TableViewOptions{
Topic: roomName + "-score-topic",
Schema: pulsar.NewStringSchema(nil),
SchemaValueType: reflect.TypeOf(""),
})
咱们在游戏数据中保护一个名为 scores
的 lru 缓存,存储最近的最多 5 名玩家的分数信息,同时利用 tableView 的 ForEachAndListen
办法更新 lru 缓存:
client.tableView.ForEachAndListen(func(playerName string, i interface{}) error {score := *i.(*string)
g.scores.Add(playerName, score)
return nil
})
这样,当玩家分数更新时,lru 缓存中的数据就会更新,咱们只有把对应的分数数据显示到游戏界面上即可。
更多高质量干货文章,请关注我的微信公众号 labuladong 和算法博客 labuladong 的算法秘籍 。