1、消费者食用 DEMO
Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("test"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {String key = record.key();
String value = record.value();
System.err.println(record.toString());
}
}
2、消费者基本概念
kafka
消费者是以 组为根本单位 进行生产的。生产的模型如下
1 个 topic
容许被多个 生产组
生产。再次强调,kafka
生产是以组为单位。
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");
以上这行代码设置了生产组。
2.1、partition
调配
topic
为逻辑上的概念,partition
才是物理上的概念。那么看完这个以上的生产模型图。你可能会很纳闷。当一个组下有多个消费者时,每个消费者是如何生产的?
先阐明:partition
的调配为平均分配
假如一:topic1
上面有 3 个分区。别离如下:p1 – p3。那么 groupA
下的三个消费者生产的对应 partition
为如下
instance1: p1
instance2: p2
instance3: p3
假如二:topic1
上面有 8 个分区。别离为 p1 – p8。那么 groupA
中每个消费者调配到的 partition
就如下
instance1: p1,p2,p3
instance2: p4,p5,p6
instance3: p7,p8
2.2、partition
重调配
假如三:topic1
上面有 8 个分区:P1 – P8。groupA
有三个消费者:c1,c2,c3。此时调配的 partition
如下
c1: p1,p2,p3
c2: p4,p5,p6
c3: p7,p8
如果此时,又有一个新的消费者退出到 groupA
会产生什么呢?partition
会被重新分配
c1: p1,p2
c2: p3,p4
c3: p5,p6
c4: p7,p8
3、消费者端 API
介绍
3.1、订阅主题
void subscribe(Collection<String> topics);
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
从办法上看 kafka
容许一个消费者订阅多个 topic
。
void subscribe(Pattern pattern);
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
入参 Pattern
则示意,能够应用正则表达式匹配多个 topic
. 实例代码如下
Pattern pattern = Pattern.compile("test?");
consumer.subscribe(pattern);
能够订阅主题,那么天然也能够勾销订阅主题
consumer.unsubscribe();
当然,也能够间接获取到生产组订阅的主题
Set<String> topics = consumer.subscription();
一个主题上面有多个 partition
, 那么是否能够指定要生产的队列呢?答案是能够的
TopicPartition p1 = new TopicPartition("test1", 0);
TopicPartition p2 = new TopicPartition("test1", 1);
consumer.assign(Arrays.asList(p1, p2));
不过须要留神的是,如果指定了生产的分区,那么是消费者是无奈主动 rebanlance
的。
3.2、音讯生产
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
从消费者端的这行代码,咱们能够看出,kafka
音讯生产采纳的是拉取模式。当未拉取到音讯时,会阻塞线程。
poll
办法返回的 ConsumerRecords
实现 Iterable
接口,是 ConsumerRecord
的迭代器。ConsumerRecord
属性绝对简略
public class ConsumerRecord<K, V> {
private final String topic; // 主题
private final int partition; // 分区
private final long offset; // 音讯所属分区偏移量
private final long timestamp; // 工夫戳
private final TimestampType timestampType; // 两者类型,音讯创立工夫戳及音讯追加到日志的工夫戳
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers; // 发送的 header
private final K key; // 发送的 key
private final V value; // 发送的内容
private volatile Long checksum; // CRC32 校验值
}
3.3、位移提交
对于分区而言,音讯会有一个惟一 offset
, 示意音讯在分区中的地位,称之为 偏移量
。对于音讯生产而言,也有生产进度的 offset
,称之为 位移
。kafka
将音讯的生产进度存储在 kafka
外部主题 __onsumer_offset
中。kafka
中默认每隔 5s
保留音讯的生产进度。可通过 auto.commit.interval.ms
进行配置。
kafka
提供手动提交的 API
,上面演示一下。
Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("test"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {System.out.println("生产:" + record.toString());
}
consumer.commitSync();}
须要留神的是,须要将 enable.auto.commit
设置为 true
.
3.4、设置新生产组从哪个地位开始生产
kafka
设置 新生产组 从哪个地位开始生产的配置为:auto.offset.reset
该配置有以下 3 个配置项
latest
(默认配置)
默认从最新的地位,开始生产。
earliest
从最早的地位开始生产。当配置为该参数时,kafka
会打印如下日志:Resetting offset for partition
none
当生产组,没有对应生产进度时,会间接抛 NoOffsetForPartitionException
异样
kafka
还提供了 seek(TopicPartition partition, long offset)
办法,容许新的消费者,设置从哪个地位开始生产。
// 因为调配 分区的动作,产生在 pool 中,因而在设置生产偏移量时,须要先拉取音讯
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();}
for (TopicPartition tp : assignment) {consumer.seek(tp, 50);
}
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {System.err.println("生产:" + record.toString());
}
}
更多状况下,咱们可能会指定生产组从指定的工夫点开始生产
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
// 指定从一天前开始生产
timestampToSearch.put(tp, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition tp : assignment) {OffsetAndTimestamp timestamp = offsets.get(tp);
if (null != timestamp) {consumer.seek(tp, timestamp.offset());
}
}
3.5、分区再平衡
在分区再平衡期间,生产组内的消费者是无奈读取音讯的。并且如果之前的消费者没有及时提交生产进度,那么会造成反复生产。
kafka
在 subscribe
的时候,提供了回调函数,容许咱们在触发再平衡时,做管制
void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
看一下 ConsumerRebalanceListener
定义的接口
// 再平衡开始之前和消费者进行读取音讯之前被调用,可利用该会掉,提交生产位移
void onPartitionsRevoked(Collection<TopicPartition> partitions);
// 从新分区后,消费者开始读取音讯之前被调用
void onPartitionsAssigned(Collection<TopicPartition> partitions);
上面演示,如何在再平衡之前,提交生产偏移
consumer.subscribe(Collections.singleton("test"), new ConsumerRebalanceListener() {
// 在再平衡开始之前和消费者进行读取音讯之前被调用,可利用该会掉,提交生产位移
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 提交生产偏移
consumer.commitSync();}
// 从新分区后,消费者开始读取音讯之前被调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}});
3.6、消费者拦截器
消费者,容许在 生产之前 , 生产偏移提交之后 , 敞开之前 ,进行管制,多个拦截器则组成拦截器链, 且多个拦截器之前须要用 ‘,’ 号隔开。
先看拦截器定义的接口
public interface ConsumerInterceptor<K, V> extends Configurable {
// 音讯生产之前
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
// 提交之后调用
void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
// 敞开之前调用
void close();}
Properties prop = new Properties();
prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName() + "," + MyConsumerInterceptor2.class.getName());
3.7、重要的消费者参数
fetch.min.bytes
默认 1B
,poll
时,拉取的最小数据量。
fetch.max.bytes
默认 5242880B
,50MB,poll
时,拉取的最大数据量。
fetch.max.wait.ms
默认 500ms
,如果 kafka
始终没有触发 poll
动作,那么最多期待 fetch.max.wait.ms
。
max.partition.fetch.bytes
默认 1048576B
,1MB,分区拉取时的最大数据量
max.poll.records
默认 500 条
,拉取的最大音讯条数
connections.max.idle.ms
默认 540000ms
, 9 分钟,多久敞开闲置的连贯
receive.buffer.bytes
默认 65536B
,64KB
,SOCKET
承受音讯的缓冲区(SO_RECBUF
)
request.timeout.ms
默认 30000ms
,配置 consumer
期待申请响应的最长工夫
metadata.max.age.ms
默认 300000ms
,5 分钟,配置元数据过期工夫。元数据在限定的工夫内,没有更新,会被强制更新
reconnect.backoff.ms
默认 50ms
,配置尝试连贯指定主机之前的等待时间,防止频繁连贯主机
retry.backoff.ms
默认 100ms
,发送失败时,2 次的间隔时间
4、总结
kafka
生产以组为单位,且容许一个生产组订阅多个topic
partition
重调配算法,为均匀算法KafkaConsumer
为线程不平安。因而poll()
只有以后线程在拉取音讯。kafka
要实现多线程拉取绝对麻烦kafka
消费者端,提供的API
非常灵活,容许从指定的地位生产,容许手动提交某个分区的生产偏移kafka
提供消费者拦截器链,容许在 生产之前,提交生产偏移之后 管制。
5、与 RocketMQ 异同
RocketMQ
倡议 1 个生产组只生产一个topic
, 且在理论开发中,如果消费者订阅多个topic
会无奈失常工作。kafka
中 1 个消费者能够订阅多个topic
。RocketMQ
能够确保生产时,音讯不失落,kafka
无奈保障。RocketMQ
在消费者端,实现了多线程生产,kafka
则没有kafka
默认每5s
长久化生产进度,RocketMQ
也是。不过RocketMQ
会提交偏移量最小的音讯。比方,线程 A 生产了 20 的音讯。线程 B 生产了 10 的音讯。当线程 A 提交生产进度的时候,会提交 10,而不会提交 20。这也是RocketMQ
能够确保音讯生产时不丢的起因。RocketMQ
产生rebalance
,即kafka
的再调配。默认和kafka
统一,采纳的是平均分配算法
。不过RocketMQ
容许自定义再调配算法,且提供了丰盛算法反对。RocketMQ
与kafka
统一,都存在反复生产问题。- 从裸露进去的
API
来看,kafka
客户端会比RocketMQ
更加灵便。 kafka
设置 新的生产组 从哪个地位开始生产,没有额定的条件限度;RocketMQ
只有当旧音讯沉积十分多时,才会无效。