音讯机制
Pulsar 采纳公布 - 订阅 (pub-sub) 的设计模式。该设计模式中,producer 公布音讯到 topic,Consumer 订阅 topic、解决公布的音讯,并在解决实现后发送确认。
一旦创立订阅,即便 consumer 断开连接,Pulsar 依然能够保留所有音讯。在 consumer 确认音讯已解决胜利后,才会删除音讯。
主题(Topic)
逻辑上一个 Topic 是日志构造,每个音讯都在这个日志构造中有一个偏移量。Apache Pulsar 应用游标来跟踪偏移量(Cursor Tracking)。
Pulsar 反对两种根本的 topic 类型:长久 topic 与非长久 topic。
{persistent|non-persistent}://tenant/namespace/topic
- Non-Partitioned topics
$ $PULSAR_HOME/bin/pulsar-admin topics \
list public/default
$ $PULSAR_HOME/bin/pulsar-admin topics \
create persistent://public/default/input-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
lookup persistent://public/default/input-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
delete persistent://public/default/input-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
stats persistent://public/default/input-seed-avro-topic
$ curl http://server-101:8080/admin/v2/persistent/public/default/exclamation-input/stats | python -m json.tool
Partitioned topics
$ $PULSAR_HOME/bin/pulsar-admin topics \
create-partitioned-topic persistent://public/default/output-seed-avro-topic \
--partitions 2
$ $PULSAR_HOME/bin/pulsar-admin topics \
list-partitioned-topics public/default
$ $PULSAR_HOME/bin/pulsar-admin topics \
get-partitioned-topic-metadata persistent://public/default/output-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
delete-partitioned-topic persistent://public/default/output-seed-avro-topic
音讯(Message)
Messages are the basic “unit” of Pulsar.
public interface Message<T> {Map<String, String> getProperties();
boolean hasProperty(String var1);
String getProperty(String var1);
byte[] getData();
T getValue();
MessageId getMessageId();
long getPublishTime();
long getEventTime();
long getSequenceId();
String getProducerName();
boolean hasKey();
String getKey();
boolean hasBase64EncodedKey();
byte[] getKeyBytes();
boolean hasOrderingKey();
byte[] getOrderingKey();
String getTopicName();
Optional<EncryptionContext> getEncryptionCtx();
int getRedeliveryCount();
byte[] getSchemaVersion();
boolean isReplicated();
String getReplicatedFrom();}
生产者(Producer)
public void send() throws PulsarClientException {
final String serviceUrl = "pulsar://server-100:6650";
// final String serviceUrl = "pulsar://server-101:6650,server-102:6650,server-103:6650";
// http://pulsar.apache.org/docs/en/client-libraries-java/#client
final PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.connectionTimeout(10000, TimeUnit.MILLISECONDS)
.build();
final String topic = "persistent://public/default/topic-sensor-temp";
// http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
final Producer<byte[]> producer = client.newProducer()
.producerName("sensor-temp")
.topic(topic)
.compressionType(CompressionType.LZ4)
.enableChunking(true)
.enableBatching(true)
.batchingMaxBytes(1024)
.batchingMaxMessages(10)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.blockIfQueueFull(true)
.maxPendingMessages(512)
.sendTimeout(1, TimeUnit.SECONDS)
.create();
MessageId mid = producer.send("sensor-temp".getBytes());
System.out.printf("\nmessage with ID %s successfully sent", mid);
mid = producer.newMessage()
.key("sensor-temp-key")
.value("sensor-temp-key".getBytes())
.property("my-key", "my-value")
.property("my-other-key", "my-other-value")
.send();
System.out.printf("message-key with ID %s successfully sent", mid);
producer.close();
client.close();}
消费者(Consumer)
public void consume() throws PulsarClientException {
final String serviceUrl = "pulsar://server-101:6650";
final String topic = "input-seed-avro-topic";
final PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.enableTcpNoDelay(true)
.build();
final Consumer<byte[]> consumer = client
.newConsumer()
.consumerName("seed-avro-consumer")
.subscriptionName("seed-avro-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic(topic)
.receiverQueueSize(10)
.subscribe();
final AvroSchema<SeedEvent> schema = AvroSchema.of(SeedEvent.class);
while (true) {
try {final Message<byte[]> msg = consumer.receive();
LOG.info("接管音讯:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}",
Thread.currentThread().getId(),
msg.getTopicName(),
msg.getMessageId(),
msg.getSequenceId(),
msg.getEventTime(),
msg.getPublishTime(),
msg.getProducerName(),
msg.getKey(), schema.decode(msg.getValue()));
try {consumer.acknowledge(msg);
} catch (final PulsarClientException e) {consumer.negativeAcknowledge(msg);
LOG.error("acknowledge:" + e.getLocalizedMessage(), e);
}
} catch (final PulsarClientException e) {LOG.error("receive:" + e.getLocalizedMessage(), e);
}
}
}
订阅(Subscriptions)
消费者通过订阅来生产 Topic 中的音讯。订阅是游标 (跟踪偏移量) 的逻辑实体,一个 Topic 能够增加多个订阅。订阅不蕴含音讯的数据,只蕴含元数据和游标。
每个 Subscription 都存储一个 Cursor。Cursor 是日志中的以后偏移量。Subscription 将其 Cursor 存储至 BookKeeper 的 Ledger 中。这使 Cursor 跟踪能够像 Topic 一样进行扩大。
订阅类型(subscription-type)
-
Exclusive 独享
一个订阅只能有一个音讯者生产音讯。
-
Failover 灾备
一个订阅同时只有一个消费者,能够有多个备份消费者。一旦主消费者故障则备份消费者接管。不会呈现同时有两个沉闷的消费者。
-
Shared 共享
一个订阅中同时能够有多个消费者,多个消费者共享 Topic 中的音讯。
- Key_Shared
有序性保障(Ordering guarantee)
如果对程序性有要求,能够应用 Exclusive 和 Failover 的订阅模式,这样同一个 Topic 只有一个 Consumer 在生产,能够保障程序性。
如果应用 Shared 订阅模式,多个 Consumer 能够并发生产同一个 Topic。通过动静减少 Consumer 的数量,能够减速 Topic 的生产,缩小音讯在服务端的沉积。
KeyShared 模式保障在 Shared 模式下同一个 Key 的音讯也会发送到同一个 Consumer,在并发的同时也保障了程序性。
多主题订阅(Multi-topic subscriptions)
Pattern:
- persistent://public/default/.*
- persistent://public/default/foo.*
Reader
public void read() throws IOException {
final String serviceUrl = "pulsar://server-101:6650";
final PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
// http://pulsar.apache.org/docs/en/client-libraries-java/#reader
final Reader<byte[]> reader = client.newReader()
.topic("my-topic")
.startMessageId(MessageId.earliest()) // MessageId.latest
.create();
while (true)
final Message<byte[]> message = reader.readNext();
System.out.println(new String(message.getData()));
}
}
分片主题(Partitioned topics)
音讯保留和过期(Message retention and expiry)
如果没有对 Topic 设置数据保留策略,一旦一个 Topic 的所有订阅的游标都曾经胜利生产到一个偏移量时,此偏移量后面的音讯就会被主动删除。
如果 Topic 设置了数据保留策略,曾经生产确认的音讯超过保留策略阈值 (Topic 的音讯存储大小、Topic 中音讯保留的工夫) 后会被删除。
conf/broker.conf
# Default message retention time
# 默认 0, 批改为 3 天 =60*24*3
defaultRetentionTimeInMinutes=4320
# Default retention size
# 默认为 0, 批改为 10G
defaultRetentionSizeInMB=10240
# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0
retention policy (for a namespace)
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
get-retention public/default
$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/retention | python -m json.tool
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
set-retention public/default \
--size 1024M \
--time 5m
$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/retention \
--header "Content-Type:application/json" \
--data '{"retentionTimeInMinutes": 5,"retentionSizeInMB" : 1024}'
message expiry / message-ttl
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
get-message-ttl public/default
$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/messageTTL
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
set-message-ttl public/default \
--messageTTL 1800
$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/messageTTL \
--header "Content-Type:application/json" \
--data '1800'
更多福利
云智慧已开源集轻量级、聚合型、智能运维为一体的综合运维治理平台 OMP(Operation Management Platform),具备 纳管、部署、监控、巡检、自愈、备份、复原 等性能,可为用户提供便捷的运维能力和业务管理,在进步运维人员等工作效率的同时,极大晋升了业务的连续性和安全性。点击下方地址链接,欢送大家给 OMP 点赞送 star,理解更多相干内容~
GitHub 地址:https://github.com/CloudWise-OpenSource/OMP
Gitee 地址:https://gitee.com/CloudWise/OMP
微信扫描辨认下方二维码,备注【OMP】退出 AIOps 社区运维治理平台 OMP 开发者交换群,与更多行业大佬一起交流学习~
系列浏览
深入浅出 Apache Pulsar(1):Pulsar vs Kafka