关于云原生:深入浅出Apache-Pulsar3Pulsar-Schema

35次阅读

共计 2571 个字符,预计需要花费 7 分钟才能阅读完成。

Pulsar Schema

Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types to more complex application-specific types.

  • 类型平安 (序列化和反序列化)
  • Schema 帮忙 Pulsar 保留了数据在其余零碎中原有的含意

Schema 类型 (Schema type)

  • Primitive type
Producer<String> producer = client.newProducer(Schema.STRING).create();
producer.newMessage().value("Hello Pulsar!").send();
Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
consumer.receive();
  • Complex type

1. keyvalue key/value pair.

Schema<KeyValue<Integer, String>> schema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);
// Producer
Producer<KeyValue<Integer, String>> producer = client.newProducer(schema)
    .topic(TOPIC)
    .create();
final int key = 100;
final String value = "value-100";
producer.newMessage().value(new KeyValue<>(key, value)).send();
// Consumer
Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(schema)
    .topic(TOPIC).subscriptionName(SubscriptionName).subscribe();
Message<KeyValue<Integer, String>> msg = consumer.receive();

2.struct AVRO, JSON, and Protobuf.

Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage().value(User.builder().userName("pulsar-user").userId(1L).build()).send();
Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();
User user = consumer.receive();

Schema 工作形式 (How does schema work)

Producer

Consumer

Schema 治理 (Schema manual management)

查问 Schema

$ $PULSAR_HOME/bin/pulsar-admin schemas \
get persistent://public/default/spirit-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin schemas \
get persistent://public/default/spirit-avro-topic \
--version=2

更新 Schema

$ $PULSAR_HOME/bin/pulsar-admin schemas upload \
persistent://public/default/test-topic \
--filename $PULSAR_HOME/connectors/json-schema.json

提取 Schema

$ $PULSAR_HOME/bin/pulsar-admin schemas \
extract persistent://public/default/test-topic  \
--classname com.cloudwise.modal.Packet \
--jar ~/cloudwise-pulsar-1.0.0-RELEASE.jar \
--type json
public void schemaInfo() {System.out.println("AvroSchema:" + AvroSchema.of(SeedEvent.class).getSchemaInfo());
    System.out.println("Schema.AVRO:" + Schema.AVRO(SeedEvent.class).getSchemaInfo());
}

删除 Schema

$ $PULSAR_HOME/bin/pulsar-admin schemas \
delete persistent://public/default/spirit-avro-topic

更多福利

云智慧已开源集轻量级、聚合型、智能运维为一体的综合运维治理平台 OMP(Operation Management Platform),具备 纳管、部署、监控、巡检、自愈、备份、复原 等性能,可为用户提供便捷的运维能力和业务管理,在进步运维人员等工作效率的同时,极大晋升了业务的连续性和安全性。点击下方地址链接,欢送大家给 OMP 点赞送 star,理解更多相干内容~

GitHub 地址:https://github.com/CloudWise-OpenSource/OMP

Gitee 地址:https://gitee.com/CloudWise/OMP

微信扫描辨认下方二维码,备注【OMP】退出 AIOps 社区运维治理平台 OMP 开发者交换群,与更多行业大佬一起交流学习~

系列浏览

深入浅出 Apache Pulsar(1):Pulsar vs Kafka
深入浅出 Apache Pulsar(2):Pulsar 音讯机制

正文完
 0