关于java:kafka-系列-31生产者客户端基本使用

3次阅读

共计 1918 个字符,预计需要花费 5 分钟才能阅读完成。

必要的参数

  • bootstrap.servers

该参数为 broker 地址,不须要全副都填,因为 kafka 会从以后 broker 中获取其余 broker 信息。不过为了某个 broker 挂掉,个别填多个 broker 地址

  • key.serializer

音讯 key 如何序列化

  • value.serializer

音讯内容如何序列化

示例代码

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

生产者拦截器

在音讯发送前,对音讯进行解决,该动作产生在 序列化器 分区器 之前。

实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口,即可自定义拦截器

介绍一下接口定义的办法

  • ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)

在音讯发送之前,能够对音讯进行解决

  • void onAcknowledgement(RecordMetadata metadata, Exception exception

音讯被应答之前或者音讯发送失败时被调用

  • void close()

producer 被敞开时,会调用

kafka 容许配置拦截器链,多个拦截器用 , 号隔开即可。

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TestProducerInterceptor.class.getName() + "," + TestProducerInterceptor2.class.getName());

序列化

序列化产生在 分区器 之前
实现 org.apache.kafka.common.serialization.StringSerializer 接口即可自定义序列化

介绍一下接口定义的办法

  • void configure(Map<String, ?> configs, boolean isKey)

StringSerializer 实现中,用于设置编码

  • byte[] serialize(String topic, String data)

定义如何序列化

  • void close()

producer 敞开时,被调用

分区器

实现 org.apache.kafka.clients.producer.Partitioner 即可自定义分区器

kafka 可按 key 进行哈希(MurmurHash2),将音讯发往同一个分区。如果未指定 key,那么将会把音讯发往随机的一个分区。

介绍一下接口定义的办法

  • int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)

定义发往哪个分区;具体的实现可参考DefaultPartitioner

  • void close()

producer 敞开时,被调用

与 RocketMQ 异同

  1. kafka 统一,rocketMQ 容许生产者将音讯发送到指定的 ‘partition’ 中
  2. rocketMQ 没有 序列化器 的概念。音讯内容由 rocketMQ 自行序列化
  3. 从集体目前的应用状况,rocketMQ 也没有提供相似 拦截器 概念
  4. rocketMQ 提供了 hock 以此在音讯发送前,和音讯发送后,对音讯进行解决

例如:

DefaultMQProducer producer = new DefaultMQProducer("default");

producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() {
    @Override
    public String hookName() {return null;}

    @Override
    public void sendMessageBefore(SendMessageContext context) { }

    @Override
    public void sendMessageAfter(SendMessageContext context) {}});
正文完
 0