本文是《用 Pulsar 开发多人在线小游戏》的第三篇,配套源码和全副文档参见我的 GitHub 仓库 play-with-pulsar 以及我的文章列表。

我举荐《数据密集型利用零碎设计》这本书的第四章:编码与演变(在线浏览地址)。

编码(encoding)和演变(evolution)是两个不同的概念,我举例说明一下。

编码是把内存中的数据结构转化成某种格局的字节序列,不便传输或存储

咱们最常见的编码方式就是 JSON 了,JSON 的益处就是字符串解决起来很简略,且易于人类浏览;但问题是反对的数据类型不够丰盛,且传输效率不高。为了解决 JSON 的这些问题,就呈现了 XML 格局或者 protobuf 这样的二进制编码协定。

确定了编码方式,生产者就能够欢快地把数据编码成对应的格局发送进 Pulsar,消费者只有依照依照同样的协定解码,就能失去原始数据了。

但问题是,咱们开发的程序是在一直变动的,所以数据自身的构造很可能发生变化,这也就是演变的概念

比如说一开始我把这个 User 类序列化成 JSON 字符串而后发到 Pulsar 中:

class User {    String name;    int id;}

然而随着业务倒退,我发现用 int 类型曾经无奈示意用户 ID 了,须要把 id 这个字段改为字符串类型:

class User {    String name;    String id;}

这种状况下,生产者仍然能够把这个类序列化成 JSON 而后发到 Pulsar 中,Pulsar 不关怀数据自身的内容,默认把所有数据都视为字节数组,所以会欣然接受生产者发来的音讯,但消费者那边生产数据时必定会出问题。

那你说,我同时改生产者和消费者的代码还不行吗?能够,但仍然存在很多问题,比如说:

1、在简单的业务场景中,数据处理的逻辑能够非常复杂,生产者和消费者可能横跨多个部门,协调老本很高。

2、必须先下线所有消费者,代码降级完之后能力从新上线。否则消费者忽然生产到无奈辨认的新的数据格式,会产生不可预期的谬误。

所以,让零碎可能更加灵便地适应变动,就是 schema 可能给咱们提供的价值

有了 schema 来形容数据的格局,咱们就能够设置数据结构的兼容性(compatibility),比方说咱们能够设置数据向后兼容,即新代码能够读取旧格局的数据;或者设置向前兼容,即旧代码能够读取新格局的数据。

在 Pulsar 中应用 schema

schema 相干的官网文档:

https://pulsar.apache.org/doc...

咱们能够设置 schema 的兼容性,官网文档:

https://pulsar.apache.org/doc...

咱们在创立 Pulsar 的生产者/消费者时能够指定音讯的 schema:

// 指定生产者的 schemaConsumer<User> consumer = client.newConsumer(Schema.AVRO(User.class))        .subscriptionType(SubscriptionType.Shared)        .topic(topicName)        .subscriptionName(subscriptionName)        .subscribe();// 指定消费者的 schemaProducer<User> producer = client.newProducer(Schema.AVRO(User.class))        .topic(topicName)        .create();

每个 topic 的 schema 演变信息都会存在 Pulsar 当中,这样当生产者应用新的 schema 时,Pulsar 会判断新的 schema 是否合乎以后的兼容性设置,如果合乎则更新 topic 对应的 schema,否则的话则会回绝生产者发来的音讯,这样消费者就不会收到不合乎预期的数据了。

在咱们的炸弹人游戏中,玩家能够产生很多不同类型的事件,这些事件应该以什么格局发送到 Pulsar 的 topic 中呢?

我的做法是创立了一个 EventMessage 构造,用 Type 字段标识事件的类型:

type EventMessage struct {    // Event type    Type   string `json:"type"`    Name   string `json:"name"`    Avatar string `json:"avatar"`    // Comment stores extra data    Comment string `json:"comment"`    X       int    `json:"x"`    Y       int    `json:"y"`    Alive   bool   `json:"alive"`    List    []int  `json:"list"`}

而后用 Avro 的形式定义了一个 JSONSchema:

const eventJsonSchemaDef = `{  "type": "record",  "name": "EventMessage",  "namespace": "game",  "fields": [    {      "name": "Type",      "type": "string"    },    {      "name": "Name",      "type": "string"    },    {      "name": "Avatar",      "type": "string"    },    {      "name": "Comment",      "type": "string",      "default": ""    },    {      "name": "X",      "type": "int"    },    {      "name": "Y",      "type": "int"    },    {      "name": "Alive",      "type": "boolean"    },    {      "name": "List",        "type": {            "type": "array",            "items" : {                "type":"int"            }        }    }  ]}`// player event topicNameproducer, err := client.CreateProducer(pulsar.ProducerOptions{    Topic:           topicName,    // use schema to confirm the structure of message    Schema: pulsar.NewJSONSchema(eventJsonSchemaDef, nil),})

这样一来,就能够通过 Schema 的束缚防止将来可能产生的很多问题。

更多高质量干货文章,请关注我的微信公众号 labuladong 和算法博客 labuladong 的算法秘籍