共计 1918 个字符,预计需要花费 5 分钟才能阅读完成。
必要的参数
bootstrap.servers
该参数为 broker
地址,不须要全副都填,因为 kafka
会从以后 broker
中获取其余 broker
信息。不过为了某个 broker
挂掉,个别填多个 broker
地址
key.serializer
音讯 key
如何序列化
value.serializer
音讯内容如何序列化
示例代码
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
生产者拦截器
在音讯发送前,对音讯进行解决,该动作产生在 序列化器
、 分区器
之前。
实现 org.apache.kafka.clients.producer.ProducerInterceptor
接口,即可自定义拦截器
介绍一下接口定义的办法
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
在音讯发送之前,能够对音讯进行解决
void onAcknowledgement(RecordMetadata metadata, Exception exception
音讯被应答之前或者音讯发送失败时被调用
void close()
producer
被敞开时,会调用
kafka 容许配置拦截器链,多个拦截器用 , 号隔开即可。
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TestProducerInterceptor.class.getName() + "," + TestProducerInterceptor2.class.getName());
序列化
序列化产生在 分区器
之前
实现 org.apache.kafka.common.serialization.StringSerializer
接口即可自定义序列化
介绍一下接口定义的办法
void configure(Map<String, ?> configs, boolean isKey)
在 StringSerializer
实现中,用于设置编码
byte[] serialize(String topic, String data)
定义如何序列化
void close()
producer
敞开时,被调用
分区器
实现 org.apache.kafka.clients.producer.Partitioner
即可自定义分区器
kafka 可按 key 进行哈希(MurmurHash2),将音讯发往同一个分区。如果未指定 key,那么将会把音讯发往随机的一个分区。
介绍一下接口定义的办法
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
定义发往哪个分区;具体的实现可参考DefaultPartitioner
void close()
producer
敞开时,被调用
与 RocketMQ 异同
- 与
kafka
统一,rocketMQ
容许生产者将音讯发送到指定的 ‘partition’ 中 rocketMQ
没有序列化器
的概念。音讯内容由rocketMQ
自行序列化- 从集体目前的应用状况,
rocketMQ
也没有提供相似拦截器
概念 rocketMQ
提供了hock
以此在音讯发送前,和音讯发送后,对音讯进行解决
例如:
DefaultMQProducer producer = new DefaultMQProducer("default");
producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() {
@Override
public String hookName() {return null;}
@Override
public void sendMessageBefore(SendMessageContext context) { }
@Override
public void sendMessageAfter(SendMessageContext context) {}});
正文完