本文形容了一些 Pulsar 客户端编码相干的最佳实际,并提供了可商用的样例代码,供大家研发的时候参考,晋升大家接入 Pulsar 的效率。在生产环境上,Pulsar 的地址信息往往都通过配置核心或者是 K8s 域名发现的形式取得,这块不是这篇文章形容的重点,以 PulsarConstant.SERVICE_HTTP_URL
代替。本文中的例子均已上传到 Github。
后期 Client 初始化和配置
初始化 Client–demo 级别
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClient;
/**
* @author hezhangjian
*/
@Slf4j
public class DemoPulsarClientInit {private static final DemoPulsarClientInit INSTANCE = new DemoPulsarClientInit();
private PulsarClient pulsarClient;
public static DemoPulsarClientInit getInstance() {return INSTANCE;}
public void init() throws Exception {pulsarClient = PulsarClient.builder()
.serviceUrl(PulsarConstant.SERVICE_HTTP_URL)
.build();}
public PulsarClient getPulsarClient() {return pulsarClient;}
}
Demo 级别的 Pulsar client 初始化的时候没有配置任何自定义参数,并且初始化的时候没有思考异样,init
的时候会间接抛出异样。
初始化 Client– 可上线级别
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author hezhangjian
*/
@Slf4j
public class DemoPulsarClientInitRetry {private static final DemoPulsarClientInitRetry INSTANCE = new DemoPulsarClientInitRetry();
private volatile PulsarClient pulsarClient;
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-cli-init"));
public static DemoPulsarClientInitRetry getInstance() {return INSTANCE;}
public void init() {executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS);
}
private void initWithRetry() {
try {pulsarClient = PulsarClient.builder()
.serviceUrl(PulsarConstant.SERVICE_HTTP_URL)
.build();
log.info("pulsar client init success");
this.executorService.shutdown();} catch (Exception e) {log.error("init pulsar error, exception is", e);
}
}
public PulsarClient getPulsarClient() {return pulsarClient;}
}
在理论的环境中,咱们往往要做到 pulsar client
初始化失败后不影响微服务的启动,即待微服务启动后,再始终重试创立 pulsar client
。
下面的代码示例通过 volatile
加一直循环重建实现了这一指标,并且在客户端胜利创立后,销毁了定时器线程。
初始化 Client– 商用级别
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SizeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author hezhangjian
*/
@Slf4j
public class DemoPulsarClientInitUltimate {private static final DemoPulsarClientInitUltimate INSTANCE = new DemoPulsarClientInitUltimate();
private volatile PulsarClient pulsarClient;
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-cli-init"));
public static DemoPulsarClientInitUltimate getInstance() {return INSTANCE;}
public void init() {executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS);
}
private void initWithRetry() {
try {pulsarClient = PulsarClient.builder()
.serviceUrl(PulsarConstant.SERVICE_HTTP_URL)
.ioThreads(4)
.listenerThreads(10)
.memoryLimit(64, SizeUnit.MEGA_BYTES)
.operationTimeout(5, TimeUnit.SECONDS)
.connectionTimeout(15, TimeUnit.SECONDS)
.build();
log.info("pulsar client init success");
this.executorService.shutdown();} catch (Exception e) {log.error("init pulsar error, exception is", e);
}
}
public PulsarClient getPulsarClient() {return pulsarClient;}
}
商用级别的 Pulsar Client
新增了 5 个配置参数:
- ioThreads netty 的 ioThreads 负责网络 IO 操作,如果业务流量较大,能够调高
ioThreads
个数; - listenersThreads 负责调用以
listener
模式启动的消费者的回调函数,倡议配置大于该 client 负责的partition
数目; - memoryLimit 以后用于限度
pulsar
生产者可用的最大内存,能够很好地防止网络中断、Pulsar 故障等场景下,音讯积压在producer
侧,导致 Java 程序 OOM; - operationTimeout 一些元数据操作的超时工夫,Pulsar 默认为 30s,有些激进,能够依据本人的网络状况、解决性能来适当调低;
- connectionTimeout 连贯 Pulsar 的超时工夫,配置准则同上。
客户端进阶参数(内存调配相干)
咱们还能够通过传递 Java 的 property 来管制 Pulsar 客户端内存调配的参数,这里列举几个重要参数:
- pulsar.allocator.pooled 为 true 则应用堆外内存池,false 则应用堆内存调配,不走内存池。默认应用高效的堆外内存池;
- pulsar.allocator.exit_on_oom 如果内存溢出,是否敞开 jvm,默认为 false;
- pulsar.allocator.out_of_memory_policy 在 https://github.com/apache/pul… 引入,目前还没有正式 release 版本,用于配置当堆外内存不够应用时的行为,可选项为
FallbackToHeap
和ThrowException
,默认为FallbackToHeap
,如果你不心愿音讯序列化的内存影响到堆内存调配,则能够配置成ThrowException
。
生产者
初始化 producer 重要参数
maxPendingMessages
生产者音讯发送队列,依据理论 topic 的量级合理配置,防止在网络中断、Pulsar 故障场景下的 OOM。倡议和 client 侧的配置 memoryLimit
之间挑一个进行配置。
messageRoutingMode
音讯路由模式。默认为 RoundRobinPartition
。依据业务需要抉择,如果须要保序,个别抉择 SinglePartition
,把雷同 key 的音讯发到同一个 partition
。
autoUpdatePartition
自动更新 partition 信息。如 topic
中 partition
信息不变则不须要配置,升高集群的耗费。
batch 相干参数
因为批量发送模式底层由定时工作实现,如果该 topic 上音讯数较小,则不倡议开启 batch
。尤其是大量的低工夫距离的定时工作会导致 netty 线程 CPU 飙高。
- enableBatching 是否启用批量发送;
- batchingMaxMessages 批量发送最大音讯条数
- batchingMaxPublishDelay 批量发送定时工作距离。
动态 producer 初始化
动态 producer,指不会随着业务的变动进行 producer 的启动或敞开。那么就在微服务启动实现、client 初始化实现之后,初始化 producer,样例如下:
一个生产者一个线程,实用于生产者数目较少的场景
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author hezhangjian
*/
@Slf4j
public class DemoPulsarStaticProducerInit {private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-producer-init"));
private final String topic;
private volatile Producer<byte[]> producer;
public DemoPulsarStaticProducerInit(String topic) {this.topic = topic;}
public void init() {executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS);
}
private void initWithRetry() {
try {final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance();
producer = instance.getPulsarClient().newProducer().topic(topic).create();} catch (Exception e) {log.error("init pulsar producer error, exception is", e);
}
}
public Producer<byte[]> getProducer() {return producer;}
}
多个生产者一个线程,实用于生产者数目较多的场景
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author hezhangjian
*/
@Slf4j
public class DemoPulsarStaticProducersInit {private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-consumer-init"));
private CopyOnWriteArrayList<Producer<byte[]>> producers;
private int initIndex;
private List<String> topics;
public DemoPulsarStaticProducersInit(List<String> topics) {this.topics = topics;}
public void init() {executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS);
}
private void initWithRetry() {if (initIndex == topics.size()) {return;}
for (; initIndex < topics.size(); initIndex++) {
try {final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance();
final Producer<byte[]> producer = instance.getPulsarClient().newProducer().topic(topics.get(initIndex)).create();;
producers.add(producer);
} catch (Exception e) {log.error("init pulsar producer error, exception is", e);
break;
}
}
}
public CopyOnWriteArrayList<Producer<byte[]>> getProducers() {return producers;}
}
动静生成销毁的 producer 示例
还有一些业务,咱们的 producer 可能会依据业务来进行动静的启动或销毁,如接管路线上车辆的数据并发送给指定的 topic。咱们不会让内存外面驻留所有的 producer,这会导致占用大量的内存,咱们能够采纳相似于 LRU Cache 的形式来治理 producer 的生命周期。
/**
* @author hezhangjian
*/
@Slf4j
public class DemoPulsarDynamicProducerInit {
/**
* topic -- producer
*/
private AsyncLoadingCache<String, Producer<byte[]>> producerCache;
public DemoPulsarDynamicProducerInit() {this.producerCache = Caffeine.newBuilder()
.expireAfterAccess(600, TimeUnit.SECONDS)
.maximumSize(3000)
.removalListener((RemovalListener<String, Producer<byte[]>>) (topic, value, cause) -> {log.info("topic {} cache removed, because of {}", topic, cause);
try {value.close();
} catch (Exception e) {log.error("close failed,", e);
}
})
.buildAsync(new AsyncCacheLoader<>() {
@Override
public CompletableFuture<Producer<byte[]>> asyncLoad(String topic, Executor executor) {return acquireFuture(topic);
}
@Override
public CompletableFuture<Producer<byte[]>> asyncReload(String topic, Producer<byte[]> oldValue,
Executor executor) {return acquireFuture(topic);
}
});
}
private CompletableFuture<Producer<byte[]>> acquireFuture(String topic) {CompletableFuture<Producer<byte[]>> future = new CompletableFuture<>();
try {ProducerBuilder<byte[]> builder = DemoPulsarClientInit.getInstance().getPulsarClient().newProducer().enableBatching(true);
final Producer<byte[]> producer = builder.topic(topic).create();
future.complete(producer);
} catch (Exception e) {log.error("create producer exception", e);
future.completeExceptionally(e);
}
return future;
}
}
这个模式下,能够依据返回的 CompletableFuture<Producer<byte[]>>
来优雅地进行流式解决。
能够承受音讯失落的发送
final CompletableFuture<Producer<byte[]>> cacheFuture = producerCache.get(topic);
cacheFuture.whenComplete((producer, e) -> {if (e != null) {log.error("create pulsar client exception", e);
return;
}
try {producer.sendAsync(msg).whenComplete(((messageId, throwable) -> {if (throwable != null) {log.error("send producer msg error", throwable);
return;
}
log.info("topic {} send success, msg id is {}", topic, messageId);
}));
} catch (Exception ex) {log.error("send async failed", ex);
}
});
以上为正确处理 Client
创立失败和发送失败的回调函数。然而因为在生产环境下,Pulsar 并不是始终放弃可用的,会因为虚拟机故障、Pulsar 服务降级等导致发送失败。这个时候如果要保障音讯发送胜利,就须要对音讯发送进行重试。
能够容忍极其场景下的发送失落
final Timer timer = new HashedWheelTimer();
private void sendMsgWithRetry(String topic, byte[] msg, int retryTimes) {final CompletableFuture<Producer<byte[]>> cacheFuture = producerCache.get(topic);
cacheFuture.whenComplete((producer, e) -> {if (e != null) {log.error("create pulsar client exception", e);
return;
}
try {producer.sendAsync(msg).whenComplete(((messageId, throwable) -> {if (throwable == null) {log.info("topic {} send success, msg id is {}", topic, messageId);
return;
}
if (retryTimes == 0) {timer.newTimeout(timeout -> DemoPulsarDynamicProducerInit.this.sendMsgWithRetry(topic, msg, retryTimes - 1), 1 << retryTimes, TimeUnit.SECONDS);
}
log.error("send producer msg error", throwable);
}));
} catch (Exception ex) {log.error("send async failed", ex);
}
});
}
这里在发送失败后,做了退却重试,能够容忍 pulsar
服务端故障一段时间。比方退却 7 次、首次距离为 1s,那么就能够容忍 1+2+4+8+16+32+64=127s
的故障。这曾经足够满足大部分生产环境的要求了。
因为实践上存在超过 127s 的故障,所以还是要在极其场景下,向上游返回失败。
生产者 Partition 级别严格保序
生产者严格保序的要点:一次只发送一条音讯,确认发送胜利后再发送下一条音讯。实现上能够应用同步异步两种模式:
- 同步模式的要点就是循环发送,直到上一条音讯发送胜利后,再启动下一条音讯发送;
- 异步模式的要点是观测上一条音讯发送的 future,如果失败也始终重试,胜利则启动下一条音讯发送。
值得一提的是,这个模式下,partition 间是能够并行的,能够应用 OrderedExecutor
或 per partition per thread
。
同步模式举例:
/**
* @author hezhangjian
*/
@Slf4j
public class DemoPulsarProducerSyncStrictlyOrdered {Producer<byte[]> producer;
public void sendMsg(byte[] msg) {while (true) {
try {final MessageId messageId = producer.send(msg);
log.info("topic {} send success, msg id is {}", producer.getTopic(), messageId);
break;
} catch (Exception e) {log.error("exception is", e);
}
}
}
}
消费者
初始化消费者重要参数
receiverQueueSize
留神: 解决不过来时,生产缓冲队列会积压在内存中,合理配置避免 OOM。
autoUpdatePartition
自动更新 partition 信息。如 topic
中 partition
信息不变则不须要配置,升高集群的耗费。
subscribeType
订阅类型,依据业务需要决定。
subscriptionInitialPosition
订阅开始的地位,依据业务需要决定最前或者最初。
messageListener
应用 listener 模式生产,只须要提供回调函数,不须要被动执行 receive()
拉取。个别没有非凡诉求,倡议采纳 listener 模式。
ackTimeout
当服务端推送音讯,但消费者未及时回复 ack 时,通过 ackTimeout 后,会从新推送给消费者解决,即 redeliver
机制。
留神在利用 redeliver
机制的时候,肯定要留神仅仅应用重试机制来重试可复原的谬误。举个例子,如果代码外面对音讯进行解码,解码失败就不适宜利用 redeliver
机制。这会导致客户端始终处于重试之中。
如果拿捏不准,还能够通过上面的 deadLetterPolicy
配置死信队列,避免音讯始终重试。
negativeAckRedeliveryDelay
当客户端调用 negativeAcknowledge
时,触发 redeliver
机制的工夫。redeliver
机制的留神点同 ackTimeout
。
须要留神的是, ackTimeout
和 negativeAckRedeliveryDelay
倡议不要同时应用,个别倡议应用 negativeAck
,用户能够有更灵便的控制权。一旦 ackTimeout
配置的不合理,在生产工夫不确定的状况下可能会导致音讯不必要的重试。
deadLetterPolicy
配置 redeliver
的最大次数和死信 topic。
初始化消费者准则
消费者只有创立胜利能力工作,不像生产者能够向上游返回失败,所以消费者要始终重试创立。示例代码如下:留神:消费者和 topic 能够是一对多的关系,消费者能够订阅多个 topic。
一个消费者一个线程,实用于消费者数目较少的场景
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author hezhangjian
*/
@Slf4j
public class DemoPulsarConsumerInit {private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-consumer-init"));
private final String topic;
private volatile Consumer<byte[]> consumer;
public DemoPulsarConsumerInit(String topic) {this.topic = topic;}
public void init() {executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS);
}
private void initWithRetry() {
try {final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance();
consumer = instance.getPulsarClient().newConsumer().topic(topic).messageListener(new DemoMessageListener<>()).subscribe();} catch (Exception e) {log.error("init pulsar producer error, exception is", e);
}
}
public Consumer<byte[]> getConsumer() {return consumer;}
}
多个消费者一个线程,实用于消费者数目较多的场景
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author hezhangjian
*/
@Slf4j
public class DemoPulsarConsumersInit {private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-consumer-init"));
private CopyOnWriteArrayList<Consumer<byte[]>> consumers;
private int initIndex;
private List<String> topics;
public DemoPulsarConsumersInit(List<String> topics) {this.topics = topics;}
public void init() {executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS);
}
private void initWithRetry() {if (initIndex == topics.size()) {return;}
for (; initIndex < topics.size(); initIndex++) {
try {final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance();
final Consumer<byte[]> consumer = instance.getPulsarClient().newConsumer().topic(topics.get(initIndex)).messageListener(new DemoMessageListener<>()).subscribe();
consumers.add(consumer);
} catch (Exception e) {log.error("init pulsar producer error, exception is", e);
break;
}
}
}
public CopyOnWriteArrayList<Consumer<byte[]>> getConsumers() {return consumers;}
}
消费者达到至多一次语义
应用手动回复 ack 模式,确保解决胜利后再 ack。如果解决失败能够本人重试或通过 negativeAck
机制进行重试
同步模式举例
这里须要留神,如果解决音讯时长差距比拟大,同步解决的形式可能会让原本能够很快解决的音讯得不到解决的机会。
/**
* @author hezhangjian
*/
@Slf4j
public class DemoMessageListenerSyncAtLeastOnce<T> implements MessageListener<T> {
@Override
public void received(Consumer<T> consumer, Message<T> msg) {
try {final boolean result = syncPayload(msg.getData());
if (result) {consumer.acknowledgeAsync(msg);
} else {consumer.negativeAcknowledge(msg);
}
} catch (Exception e) {
// 业务办法可能会抛出异样
log.error("exception is", e);
consumer.negativeAcknowledge(msg);
}
}
/**
* 模仿同步执行的业务办法
* @param msg 音讯体内容
* @return
*/
private boolean syncPayload(byte[] msg) {return System.currentTimeMillis() % 2 == 0;
}
}
异步模式举例
异步的话须要思考内存的限度,因为异步的形式能够很快地从 broker
生产,不会被业务操作阻塞,这样 inflight 的音讯可能会十分多。如果是 Shared
或 KeyShared
模式,能够通过 maxUnAckedMessage
进行限度。如果是 Failover
模式,能够通过上面的 消费者忙碌时阻塞拉取音讯,不再进行业务解决
通过判断 inflight 音讯数来阻塞解决。
/**
* @author hezhangjian
*/
@Slf4j
public class DemoMessageListenerAsyncAtLeastOnce<T> implements MessageListener<T> {
@Override
public void received(Consumer<T> consumer, Message<T> msg) {
try {asyncPayload(msg.getData(), new DemoSendCallback() {
@Override
public void callback(Exception e) {if (e == null) {consumer.acknowledgeAsync(msg);
} else {log.error("exception is", e);
consumer.negativeAcknowledge(msg);
}
}
});
} catch (Exception e) {
// 业务办法可能会抛出异样
consumer.negativeAcknowledge(msg);
}
}
/**
* 模仿异步执行的业务办法
* @param msg 音讯体
* @param demoSendCallback 异步函数的 callback
*/
private void asyncPayload(byte[] msg, DemoSendCallback demoSendCallback) {if (System.currentTimeMillis() % 2 == 0) {demoSendCallback.callback(null);
} else {demoSendCallback.callback(new Exception("exception"));
}
}
}
消费者忙碌时阻塞拉取音讯,不再进行业务解决
当消费者解决不过来时,通过阻塞 listener
办法,不再进行业务解决。防止在微服务积攒太多音讯导致 OOM,能够通过 RateLimiter 或者 Semaphore 管制解决。
/**
* @author hezhangjian
*/
@Slf4j
public class DemoMessageListenerAsyncAtLeastOnce<T> implements MessageListener<T> {
@Override
public void received(Consumer<T> consumer, Message<T> msg) {
try {asyncPayload(msg.getData(), new DemoSendCallback() {
@Override
public void callback(Exception e) {if (e == null) {consumer.acknowledgeAsync(msg);
} else {log.error("exception is", e);
consumer.negativeAcknowledge(msg);
}
}
});
} catch (Exception e) {
// 业务办法可能会抛出异样
consumer.negativeAcknowledge(msg);
}
}
/**
* 模仿异步执行的业务办法
* @param msg 音讯体
* @param demoSendCallback 异步函数的 callback
*/
private void asyncPayload(byte[] msg, DemoSendCallback demoSendCallback) {if (System.currentTimeMillis() % 2 == 0) {demoSendCallback.callback(null);
} else {demoSendCallback.callback(new Exception("exception"));
}
}
}
消费者严格按 partition 保序
为了实现 partition
级别消费者的严格保序,须要对单 partition
的音讯,一旦解决失败,在这条音讯重试胜利之前不能解决该 partition
的其余音讯。示例如下:
/**
* @author hezhangjian
*/
@Slf4j
public class DemoMessageListenerSyncAtLeastOnceStrictlyOrdered<T> implements MessageListener<T> {
@Override
public void received(Consumer<T> consumer, Message<T> msg) {retryUntilSuccess(msg.getData());
consumer.acknowledgeAsync(msg);
}
private void retryUntilSuccess(byte[] msg) {while (true) {
try {final boolean result = syncPayload(msg);
if (result) {break;}
} catch (Exception e) {log.error("exception is", e);
}
}
}
/**
* 模仿同步执行的业务办法
*
* @param msg 音讯体内容
* @return
*/
private boolean syncPayload(byte[] msg) {return System.currentTimeMillis() % 2 == 0;
}
}
致谢
感激鹏辉和罗天的审稿。
作者简介
贺张俭,Apache Pulsar Contributor,西安电子科技大学毕业,华为云物联网高级工程师,目前 Pulsar 曾经在华为云物联网大规模商用,理解更多内容能够拜访他的简书博客地址。
相干链接
- 最佳实际|Apache Pulsar 在华为云物联网之旅
退出 Apache Pulsar 中文交换群 👇🏻
点击 链接,查看 Apache Pulsar 干货集锦