对于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。
GitHub 地址:http://github.com/apache/pulsar/
在进行 Schema 治理前需保障 Pulsar 失常收发的应用没有问题。首先明确一下什么是 Schema?
在数据库中 Schema 是数据的组织和构造,如果把 Pulsar 比做关系型数据库,那么 Topic 就存储着关系型数据库磁盘文件中的字节,而 Schema 就起着将关系型数据库磁盘文件里的字节转成有具体的类型数据库表一样的作用,属于数据表的元信息。那在音讯队列中咱们为什么须要 Schema 治理呢?上面咱们带着疑难来看 Pulsar Schema 的应用。
问题背景
以后音讯队列整体零碎可用性趋于稳定,然而在应用过程中,上下游数据的安全性还没有失去无效保障,举个栗子:
type TestCodeGenMsg struct {
- Orderid int64 `json:"orderid"`
+ Orderid string `json:"orderid"`
Uid int64 `json:"uid"`
Flowid string `json:"flowid"`
}
这种“不兼容”的格局将毁坏大多数上游服务,因为他们冀望数字类型但当初失去一个字符串。咱们不可能提前晓得会造成多少侵害。例子中,人们很容易将责任归咎于“沟通不畅”或“不足适当的流程”。
首先在开发中 API 被视为微服务架构中的一等公民,因为 API 是一种契约,有较强的约束性,任何协定改变都能提前很快感知,然而音讯队列的事件生产往往并不能疾速做出响应和测试,当大规模修模型时,尤其是波及到写数据库时,很可能造成和 API 失败一样的负面后果。这里我举荐 Gwen Shapira 之前写了一篇文章,介绍了数据契约和 schema 治理,咱们冀望基于简略的兼容策略管理 Schema 的变动,让数据安全地演进,解耦团队并容许他们独立疾速口头开发。这就是咱们为什么须要 Schema 治理。
冀望达到的指标
基于兼容策略,治理起 schema,让数据安全地演进,如:
type TestCodeGenMsg struct {
Orderid int64 `json:"orderid"`
Uid int64 `json:"uid"`
Flowid string `json:"flowid"`
+ Username string `json:"username"`
}
如下则不通过:
// 校验不通过
type TestCodeGenMsg struct {
- Orderid int64 `json:"orderid"`
+ Orderid string `json:"orderid"`
Uid int64 `json:"uid"`
Flowid string `json:"flowid"`
}
咱们如何用
音讯模型和 API 之间的次要区别在于,事件及其模型的存储工夫很长。一旦您降级完所有调用此 API 的应用程序从 v1 –> v2,您就能够释怀地假如应用 v1 的服务曾经隐没了。这可能须要一些工夫,但通常以周而不是年来掂量。然而对于能够永远存储旧版本音讯队列的事件,状况并非如此。须要思考以下问题:咱们首先降级谁——消费者还是生产者?新的消费者是否解决依然存储在 Pulsar 中的旧事件?在降级消费者之前,咱们须要期待吗?老消费者是否解决新生产者编写的事件?
Pulsar Schema 定义了一些兼容性规定,这些规定波及咱们能够在不毁坏消费者的状况下对 Schema 进行哪些更改,以及如何解决不同类型 Schema 更改的降级。具体怎么做呢?咱们须要首先在 broker 上确认咱们是否反对主动演进和以后 namespace 下的 schema 兼容策略,其中兼容策略有:点击详情,或参考如下表格:
咱们通过 CLI 进行操作
// 查问以后 namespace 是否反对 schema 主动演进
./pulsar-admin namespaces get-is-allow-auto-update-schema tenant/namespace
// 如果不反对则关上
./pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace
// 查问以后 namespace 的 schema 演进策略
./pulsar-admin namespaces get-schema-compatibility-strategy tenant/namespace
// 这么多策略,总有一款适宜你
./pulsar-admin namespaces set-schema-compatibility-strategy -c FORWARD_TRANSITIVE tenant/namespace
生产者
而后接入生产者,首先看上面示例:
package main
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
)
type TestSchema struct {
Age int `json:"age"`
Name string `json:"name"`
Addr string `json:"addr"`
}
const AvroSchemaDef = "{"type":"record","name":"test","namespace":"CodeGenTest","fields":[{"name":"age","type":"int"},{"name":"name","type":"string"},{"name":"addr","type":"string"}]}"
var client *pulsar.Client
func main() {
// 创立 client
cp := pulsar.ClientOptions{
URL: "pulsar://xxx.xxx.xxx.xxx:6650",
OperationTimeout: 30 * time.Second,
}
var err error
client, err = pulsar.NewClient(cp)
if err != nil {fmt.Println("NewClient error:", err.Error())
return
}
defer client.Close()
if err := Produce(); err != nil{fmt.Println("Produce error:", err.Error())
return
}
if err := Consume(); err != nil{fmt.Println("Consume error:", err.Error())
return
}
}
func Produce() error {
// 创立 schema
properties := make(map[string]string)
pas := pulsar.NewAvroSchema(AvroSchemaDef, properties)
po := pulsar.ProducerOptions{
Topic: "persistent://test/schema/topic",
Name: "test_group",
SendTimeout: 30 * time.Second,
Schema: pas,
}
// 创立生产者
producer, err := client.CreateProducer(po)
if err != nil {fmt.Println("CreateProducer error:", err.Error())
return err
}
defer producer.Close()
// 写音讯
t := TestSchema{
Age: 10,
Name: "test",
Addr: "test_addr",
}
id, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
Key: t.Age,
Value: t,
EventTime: time.Now(),})
if err != nil {fmt.Println("Send error:", err.Error())
return err
}
fmt.Println("msgId:", id)
}
以上 demo 实现了一个带有 schema 的生产者,咱们翻阅生产者 ProducerOptions 类 (struct),发现有 Schema 成员,于是晓得须要传入一个 Schema 对象进去。咱们接着到 new 一个 Schema 对象进去,通过:
properties := make(map[string]string)
jas := pulsar.NewAvroSchema(jsonAvroSchemaDef, properties)
咱们创立除了一个 Avro 类型的 schema,除此之外还有很多,例如:json、pb 等,可依据需要本人抉择如果您有趣味浏览更多相干内容,Martin Kleppmann 写了一篇很好的博客文章,比拟不同数据格式中的模型演变。而后来看一下是什么把数据结构进行了限度。其中有一个常量如下:
const jsonAvroSchemaDef = "{"type":"record","name":"test","namespace":"CodeGenTest","fields":[{"name":"age","type":"int"},{"name":"name","type":"string"},{"name":"addr","type":"string"}]}"
开展来看:
{
"type":"record",
"name":"test",
"namespace":"Test",
"fields":[
{
"name":"age",
"type":"int
},
{
"name":"name",
"type":["null","string"] // 示意可选字段
},
{
"name":"addr",
"type":"string"
"default":"beijing", // 示意默认字段
}
]
}
这是一个 avro schema(所有的校验类型都用这个写法),其中 fields 示意须要的字段名以及类型,同时要设置 schema 的 name 并指定 namespace,能力应用兼容性策略。对于 avro 的语法介绍可参考专栏 [[4]](#),以及如下表类型:
消费者
首先看代码:
func Consume(ctx context.Context) error {cas := pulsar.NewAvroSchema(AvroSchemaDef, properties)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://base/test/topic",
SubscriptionName: "test",
Type: pulsar.Failover,
Schema: cas,
})
if err != nil {return err}
defer consumer.Close()
for {msg, err := consumer.Receive(ctx)
if err != nil {return err}
t := TestSchema{}
if err := msg.GetSchemaValue(&t);err != nil{continue}
consumer.Ack(msg)
fmt.Println("msgId:", msg.ID(), "Payload:", string(msg.Payload()), "t:", t)
}
}
咱们能够看到,如果应用 schema 的话,咱们最初要用 GetSchemaValue() 办法反序列化音讯,能力真正保障平安,整个生产生产的框架大体如此。之后咱们就波及到一个概念,即 schema 演进:schema 原理 Schema 的工作流程,如图:
Confluent 公司在 Kafka 中开发了一个独立于 broker 协调的 schema registry server。它的工作流程是:
- 咱们向 Kafka 发送数据时,须要先向 Schema Registry 注册 schema,而后序列化发送到 Kafka 里;
- Schema Registry server 为每个注册的 schema 提供一个全局惟一 ID,调配的 ID 保障枯燥递增,但不肯定是间断的;
- 当咱们须要从 Kafka 生产数据时,消费者在反序列化前,会先判断 schema 是否在本地内存中,如果不在本地内存中,则须要从 Schema Registry 中获取 schema,否则,无需获取。
Pulsar 不同的是:
- Pulsar 自带的 schema 演进的治理,并把相干 schema 信息存储在 bookie 上;
- schema 信息不在 Pulsar 的音讯协定里;
- 生产端须要本人传入 schema。
尽管其原理也是和 Kafka 相似,然而 Pulsar 采纳 schema server 和 broker 不拆散的设计,schema 的信息存储在 bookie 上,这样解决了 schema server 高可用的问题,其中 schema 演进的兼容检测是在 broker 侧进行的(这里不是说的序列化和反序列化)。
那客户端做了什么?依据如上咱们得悉,最初保障 schema 平安的实际上是一次相应类型的 decode 和 encode 的查看,从源码看,在创立生产者和消费者过程中,都会对传入的 schema 进行检测,这里是一个独立的音讯构造。
而消费者用到的办法实际上就是咱们方才说的 decode() 办法。
相应类型只须要实现 schema 接口:
type Schema interface {Encode(v interface{}) ([]byte, error)
Decode(data []byte, v interface{}) error
Validate(message []byte) error
GetSchemaInfo() *SchemaInfo}
具体实现可参考 Pulsar Go Client 相干文件,其中有多种序列化数据类型的实现。
补充
Schema 作为 Pulsar Topic 的元数据能够提供给 Pulsar SQL 进行应用,Pulsar SQL 存储层实现了 Presto connector 的接口,Schema 会作为 Presto payload 的元数据在 SQL 层进行展现,大大不便了咱们查看音讯,数据分析等工作,以上同我补充中所说的就是咱们须要 Schema 治理的理由。感激浏览。
作者简介
我叫侯盛鑫,也能够我叫大云,目前就任于伴鱼基础架构,负责音讯队列的保护与相干开发,Rust 日报小组中的菜鸡成员,喜爱钻研存储,服务治理等方向。首次接触 Pulsar 就对存储和计算拆散的构造所吸引,顺滑的生产者消费者接入和高吞吐让我好奇这个我的项目的实现,冀望之后能在 Pulsar 的相干性能中做些奉献。
举荐浏览
- 博文举荐|深度解读 Pulsar Schema
- Pulsar IO 中 Schema 的调用流程
点击链接,查看更多 Pulsar 集锦