消费者和消费者群组
1. 一个消费者 从一个 Topic 中生产数据:
2. 消费者群组 :
当生产者向 Topic 写入音讯的速度超过了现有消费者的处理速度,此时须要对消费者进行横向伸缩,用多个消费者从同一个主题读取音讯,对音讯进行分流。同一个分区不能被一个组中的多个 consumer 生产。
Kafka 消费者代码样例
读取 Kafka 音讯只须要创立一个 KafkaConsumer,除此之外还须要应用四个根本属性,bootstrap.servers、key.deserializer、value.deserializer 和 group.id。
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
/**
* @Author Natasha
* @Description
* @Date 2020/11/3 14:14
**/
public class ConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();
//bootstrap.servers 是 broker 服务器列表
properties.put("bootstrap.servers", "120.27.233.226:9092");
//group.id 是指是消费者的生产组
properties.put("group.id", "test");
//key.deserializer 和 value.deserializer 是用来做反序列化的,也就是将字节数组转换成对象
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
//kafkaConsumer.subscribe(Collections.singletonList("test"));
kafkaConsumer.assign(Arrays.asList(new TopicPartition("test",0)));
try{
// 有限循环消解决数据
while (true) {
// 一直调用 poll 拉取数据,如果进行拉取,那么 Kafka 会认为此消费者曾经死亡并进行重均衡
// 参数值 100 是一个超时工夫,指明线程如果没有数据会期待多长时间,0 示意不期待立刻返回
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
// 每条记录蕴含 key/value 以及主题、分区、位移信息
for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, value = %s\n", record.offset(), record.value());
}
}
} catch (WakeupException e) {// ignore for shutdown} finally {
// 此办法会提交位移,同时发送一个退出生产组的音讯到 Kafka 的组协调者,组协调者收到音讯后会立刻进行重均衡而无需期待此消费者会话过期。kafkaConsumer.close();}
}
}
提交偏移量
1. 主动提交
最简略的提交形式是让消费者主动提交偏移量,如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会主动把从 poll() 办法接管到的最大偏移量提交下来。
可能造成的问题:数据反复读
假如咱们依然应用默认的 5s 提交工夫距离,在最近一次提交之后的 3s 产生了再平衡,再平衡之后,消费者从最初一次提交的偏移量地位开始读取音讯。这个时候偏移量曾经落后了 3s,所以在这 3s 内达到的音讯会被反复解决。能够通过批改提交工夫距离来更频繁地提交偏移量,减小可能呈现反复音讯的工夫窗,不过这种状况是无奈完全避免的。
properties.put("enable.auto.commit", "true");
2. 手动提交
2.1 同步提交
public static void main(String[] args) {Properties properties = new Properties();
properties.put("bootstrap.servers", "120.27.233.226:9092");
properties.put("group.id", "test");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 把 auto.commit.offset 设为 false,让应用程序决定何时提交偏移量
properties.put("auto.commit.offset", false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList("test"));
try{while(true) {ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record : records) {System.out.println("value =" + record.value() + ", topic =" + record.topic() + ", partition =" + record.partition() + ", offset =" + record.offset());
}
try{// 只有没有产生不可复原的谬误,commitSync() 办法会始终尝试直至提交胜利
consumer.commitSync();}catch(CommitFailedException e) {
// 如果提交失败,咱们也只能把异样记录到谬误日志里
System.err.println("commit failed!" + e.getMessage());
}
}
}finally {consumer.close();
}
}
2.2 异步提交
手动提交有一个不足之处,在 broker 对提交申请作出回应之前,应用程序会始终阻塞,这样会限度应用程序的吞吐量。咱们能够通过升高提交频率来晋升吞吐量,但如果产生了再平衡,会减少反复音讯的数量。
这时能够应用异步提交,只管发送提交申请,无需期待 broker 的响应。它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量曾经提交胜利。
假如咱们收回一个申请用于提交偏移量 2000,这个时候产生了短暂的通信问题,服务器收不到申请,天然也不会作出任何响应。与此同时,咱们解决了另外一批音讯,并胜利提交了偏移量 3000。如果 commitAsync()从新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交胜利。这个时候如果产生再平衡,就会呈现反复音讯。
try{while(true) {ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record : records) {System.out.println("value =" + record.value() + ", topic =" + record.topic() + ", partition =" + record.partition() + ", offset =" + record.offset());
}
// 提交最初一个偏移量,而后持续做其余事件。consumer.commitAsync();}
}finally {consumer.close();
}
commitAsync()也反对回调,在 broker 作出响应时会执行回调,回调常常被用于记录提交谬误或生成度量指标;如果要用它来进行重试,肯定要留神提交的程序。
try{while(true) {ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record : records) {System.out.println("value =" + record.value() + ", topic =" + record.topic() + ", partition =" + record.partition() + ", offset =" + record.offset());
}
// 反对回调函数,用来记录提交谬误等
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if(exception != null) {log.error("kafka send msg err, exception = {}, offsets = {}", exception, offsets);
}
}
});
}
}finally {consumer.close();
}
}
能够在回调中重试失败的提交的思路:
应用一个枯燥递增的序列号来保护异步提交的程序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先查看回调的序列号和行将提交的偏移量是否相等,如果相等,阐明没有新的提交,那么能够平安地进行重试。如果序列号比拟大,阐明有一个新的提交曾经发送进来了,应该进行重试。
2.3 混合同步提交与异步提交
个别状况下,针对偶然呈现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为长期问题导致的,那么后续的提交总会有胜利的。但如果这是产生在敞开消费者或再平衡前的最初一次提交,就要确保可能提交胜利。
try {while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {System.out.println("value =" + record.value() + ", topic =" + record.topic() + ", partition =" + record.partition() + ", offset =" + record.offset());
}
// 如果一切正常,咱们应用 commitAsync() 办法来提交
// 这样速度更快,而且即便这次提交失败,下一次提交很可能会胜利
consumer.commitAsync();}
} catch (Exception e) {e.printStackTrace();
} finally {
try {// 应用 commitSync() 办法会始终重试,直到提交胜利或产生无奈复原的谬误
// 确保敞开消费者之前胜利提交了偏移量
consumer.commitSync();}finally {consumer.close();
}
}
从特定偏移量开始解决记录
不论是主动提交还是应用 commitAsync()或者 commitSync()来提交偏移量,提交的都是 poll() 办法返回的那批数据的最大偏移量。KafkaConsumer API 容许在调用 commitSync()
和 commitAsync()
办法容许咱们指定特定的位移参数,参数为提交的分区与偏移量的 map。因为消费者可能不只读取一个分区,须要跟踪所有分区的偏移量,所以在这个层面上管制偏移量的提交会让代码变简单。
while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {System.out.println("value =" + record.value() + ", topic =" + record.topic() + ", partition =" + record.partition() + ", offset =" + record.offset());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
if (count % 1000 == 0) {// 这里调用的是 commitAsync(),不过调用 commitSync() 也是齐全能够的
// 当然,在提交特定偏移量时,依然要解决可能产生的谬误
consumer.commitAsync(currentOffsets, null);
}
count++;
}
}
数据库的 Exactly Once 语义的实现思路
当解决 Kafka 中的数据波及到数据库时:假如把数据存储到数据库后,如果没有来得及提交偏移量程序就因某种原因挂掉了,那么程序再次启动后就会反复解决数据,数据库中会有反复的数据。
如果把存储到数据库和提交偏移量在一个原子操作里实现,就能够防止这样的问题,但数据存到数据库,偏移量保留到 kafka 是无奈实现原子操作的,而如果把数据存储到数据库中,偏移量也存储到数据库中,这样就能够利用数据库的事务来把这两个操作设为一个原子操作,同时联合再平衡监听器就能够实现 Exactly Once 语义,以下为伪代码:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections<String> topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 产生分区再平衡之前,提交事务
commitDBTransaction();}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 再平衡之后,从数据库取得生产偏移量
for(TopicPartition topicPartition : partitions) {consumer.seek(topicPartition, getOffsetFromDB(topicPartition));
}
}
});
/**
* 生产之前调用一次 poll(),让消费者退出到生产组中,并获取调配的分区
* 而后马上调用 seek() 办法定位分区的偏移量
* seek() 设置生产偏移量,设置的偏移量是从数据库读出来的,阐明本次设置的偏移量曾经被解决过
* 下一次调用 poll() 就会在本次设置的偏移量上加 1,开始解决没有解决过的数据
* 如果 seek()产生谬误,比方偏移量不存在,则会抛出异样
*/
consumer.poll(0);
for(TopicPartition topicPartition : consumer.assignment()) {consumer.seek(topicPartition, getOffsetFromDB(topicPartition));
}
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
try {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1);
// 解决数据
processRecord(record);
// 把数据存储到数据库中
storeRecordInDB(record);
// 提交偏移量
consumer.commitAsync(currentOffsets, null);
}
}
} finally {consumer.close();
}
把偏移量和记录保留到用一个内部零碎来实现 Exactly Once 有很多办法,但核心思想都是:联合 ConsumerRebalanceListener 和 seek() 办法来确保可能及时保留偏移量,并保障消费者总是可能从正确的地位开始读取音讯。
再平衡
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再平衡(Rebalance)。再平衡十分重要,为消费者组带来了高可用性和伸缩性,能够释怀的减少或移除消费者。以下是触发再平衡的三种行为:
- 当一个 消费者 退出组时,读取了本来由其余消费者读取的分区,会触发再平衡。
- 当一个 消费者 来到组时(被敞开或产生解体),本来由它读取的分区将被组里的其余 消费者 来读取,会触发再平衡。
- 当 Topic 发生变化时,比方增加了新的分区,会产生分区重调配,会触发再平衡。
消费者通过向作为组协调器的 broker 发送心跳来维持 和群组以及分区 的关系。心跳的意思是表明消费者在读取分区里的音讯。消费者会在轮询音讯或提交偏移量时发送心跳。如果消费者超过肯定工夫没有发送心跳,会话就会过期,组协调器认为该消费者宕机,会触发再平衡。能够看到,从消费者会话过期到宕机是有肯定工夫的,这段时间内该消费者的分区都不能进行音讯生产。
在 Kafka 0.10.1 版本,Kafka 对心跳机制进行了批改,将发送心跳与拉取音讯进行拆散,这样使得发送心跳的频率不受拉取的频率影响
再平衡监听器
在分区重均衡前,如果消费者晓得它行将不再负责某个分区,那么它须要将它曾经解决过的音讯位移提交。Kafka 的 API 容许咱们在消费者新增分区或者失去分区时进行解决,咱们只须要在调用 subscribe()办法时传入 ConsumerRebalanceListener 对象,该对象有两个办法:
public void onPartitionRevoked(Collection partitions):此办法会在消费者进行生产后,在重均衡开始前调用。public void onPartitionAssigned(Collection partitions):此办法在分区调配给消费者后,在消费者开始读取音讯前调用。
相干的实战我的项目具体代码能够看 (四)Kafka 再平衡监听器 实战小例子,能够把代码拷下来在本地运行。
序列化和反序列化
Kafka 生产者将对象序列化成字节数组并发送到服务器,消费者须要将字节数组转换成对象(反序列化)。序列化与反序列化须要匹配,与生产者相似,举荐应用 Avro 序列化形式。(对于生产者的序列化能够参考 (三)Kafka 的生产者原理及应用详解)
Properties props = new Properties();
props.put("bootstrap.servers", "120.27.233.226:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", schemaUrl);
String topic = "test"
KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));
while (true) {
// 这里应用之前生产者应用的 Avro 生成的 Customer 类
ConsumerRecords<String, Customer> records = consumer.poll(1000);
for (ConsumerRecord<String, Customer> record: records) {System.out.println("Current customer name is:" + record.value().getName());
}
consumer.commitSync();}
Kafka 的分区调配过程
- 确定
群组协调器(GroupCoordinator)
,每当咱们创立一个生产组,kafka 会为咱们调配一个 broker 作为该生产组的 coordinator。 - 注册消费者 并选出
leader consumer
,当咱们的有了 coordinator 之后,消费者将会开始往该 coordinator 上进行注册,第一个注册的消费者将成为该生产组的 leader,其余的为 follower. - 当 leader 选出来后,他会从 coordinator 那里实时获取分区 和 consumer 信息,并依据分区策略给每个 consumer 调配分区,并将调配后果通知 coordinator。
- follower 消费者将从 coordinator 那里获取到本人相干的分区信息进行生产,对于所有的 follower 消费者而言,他们只晓得本人生产的分区,并不知道其余消费者的存在。
- 至此,消费者都晓得本人的生产的分区,分区过程完结,当产生 分区再平衡 的时候,leader 将会反复调配过程。
消费者分区调配策略
1. Range
将 partitions 的个数除于消费者线程的总数来决定每个消费者线程生产几个分区。如果除不尽,那么后面几个消费者线程将会多生产一个分区。
咱们有 10 个分区,3 个消费者线程,10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多生产一个分区:
- C1-0 将生产 0, 1, 2, 3 分区
- C2-0 将生产 4, 5, 6 分区
- C2-1 将生产 7, 8, 9 分区
2. RoundRobin
RoundRobin 策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,而后对 TopicAndPartition 列表依照 hashCode 进行排序,而后通过轮询形式一一将分区以此调配给每个消费者。
如依照 hashCode 排序完的 topic-partitions 组顺次为 T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,咱们的消费者线程排序为 C1-0, C1-1, C2-0, C2-1,最初分区调配的后果为:
- C1-0 将生产 T1-5, T1-2, T1-6 分区;
- C1-1 将生产 T1-3, T1-1, T1-9 分区;
- C2-0 将生产 T1-0, T1-4 分区;
- C2-1 将生产 T1-8, T1-7 分区;
消费者拦截器
消费者拦截器次要在生产到音讯或在提交生产位移时进行一些定制化的操作,只须要实现 ConsumerInterceptor 类中的办法就能够:
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
它会在 poll 办法返回之前调用拦截器的 onConsume()办法来对音讯进行相应的定制化操作,比方批改返回的音讯内容,依照某些规定进行过滤数据。
它会在提交完生产位移之后调用拦截器的 onCommit()办法,能够应用这个办法来记录跟踪所提交的位移信息。
properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
package interceptor;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Author Natasha
* @Description
* @Date 2020/11/4 14:32
**/
public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> recs = records.records(partition);
List<ConsumerRecord<String, String>> newRecs = new ArrayList<>();
for (ConsumerRecord<String, String> rec : recs) {String newValue = "interceptor-" + rec.value();
ConsumerRecord<String, String> newRec = new ConsumerRecord<>(rec.topic(), rec.partition(), rec.offset(), rec.key(), newValue);
newRecs.add(newRec);
}
newRecords.put(partition, newRecs);
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {offsets.forEach((tp, offsetAndMetadata) -> {System.out.println(tp + ":" + offsetAndMetadata.offset());
});
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}}
独立的消费者
个别状况下咱们都是应用生产组(即使只有一个消费者)来生产音讯的,因为这样能够在减少或缩小消费者时主动进行分区重均衡。这种形式是举荐的形式。
如果在晓得主题和分区的状况下,咱们也能够应用单个消费者来进行生产。对于这种状况,咱们须要的是给消费者调配生产分区,而不是让消费者订阅(成为生产组)主题。
List<PartitionInfo> partitionInfos = null;
List<TopicPartition> partitions = new ArrayList<>();
// 被动获取主题下所有的分区。如果你晓得所指定的分区,能够跳过这一步
partitionInfos = kafkaConsumer.partitionsFor("rebalance-topic-three-part");
if (partitionInfos != null) {for (PartitionInfo partition : partitionInfos){partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
// 为消费者指定分区
kafkaConsumer.assign(partitions);
while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record: records) {System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
kafkaConsumer.commitSync();}
}
除了须要被动获取分区以及没有分区重均衡,其余的解决逻辑都是一样的。须要留神的是,如果增加了新的分区,这个消费者是感知不到的,须要通过 consumer.partitionsFor()来从新获取分区。
消费者配置
Kafka 与消费者相干的配置大部分参数都有正当的默认值,个别不须要批改,不过有一些参数与消费者的性能和可用性有很大关系。接下来介绍这些重要的属性。
fetch.min.bytes
指定消费者从服务器获取记录的最小字节数。如果服务器在收到消费者的数据小于 fetch.min.bytes
,那么会等到有足够的可用数据时才返回给消费者。
正当的设置能够升高消费者和 broker 的工作负载,在 Topic 音讯生产不沉闷时,缩小解决音讯次数。
如果没有很多可用数据,但消费者的 CPU 使用率却很高,须要调高该属性的值。
如果消费者的数量比拟多,调高该属性的值也能够升高 broker 的工作负载。
fetch.max.wait.ms
指定在 broker 中的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,即数据量没有达到 fetch.min.bytes
,500ms 后会返回数据给消费者。
fetch.max.wait.ms
和 fetch.min.bytes
有一个满足条件就会返回数据。
max.parition.fetch.bytes
指定了服务器从每个分区里的数据返回给消费者的最大字节数,默认值是 1MB。
如果一个主题有 20 个分区和 5 个消费者(同一个组内),那么每个消费者须要至多 4MB 的可用内存(每个消费者读取 4 个分区)来接管记录。如果组内有消费者产生解体,剩下的消费者须要解决更多的分区。
max.parition.fetch.bytes
必须比 broker 可能接管的最大音讯的字节数(max.message.size
)大,否则消费者可能无奈读取这些音讯,导致消费者始终重试。
另一个须要思考的因素是消费者解决数据的工夫。消费者须要频繁调用
poll()
办法来防止会话过期和产生分区再平衡,如果单次调用poll()
返回的数据太多,消费者须要更多的工夫来解决,可能无奈及时进行下一个轮询来防止会话过期。如果呈现这种状况,能够把max.parition.fetch.bytes
值改小或者缩短会话过期工夫。
session.timeout.ms
指定了消费者与服务器断开连接的最大工夫,默认是 3s。如果消费者没有在指定的工夫内发送心跳给 GroupCoordinator,就被认为曾经死亡,会触发再平衡,把它的分区调配给其余消费者。
该属性与
heartbeat.interval.ms
严密相干,heartbeat.interval.ms
指定了poll()
办法向协调器发送心跳的频率,session.timeout.ms
指定了消费者最长多久不发送心跳。所以,个别须要同时批改这两个属性,heartbeat.interval.ms
必须比session.timeout.ms
小,个别是session.timeout.ms
的三分之一,如果session.timeout.ms
是 3s,那么heartbeat.interval.ms
应该是 1s。
auto.offset.reset
指定了消费者在读取一个没有偏移量的分区或者偏移量有效的状况下默认是 latest
,另一个值是 earliest
,消费者将从起始地位读取分区的记录。
enable.auto.commit
指定了消费者是否主动提交偏移量,默认值是 true
,主动提交。如果设为 true
,须要通过配置 auto.commit.interval.ms
属性来管制提交的频率。设为 false
能够程序本人管制何时提交偏移量。
partition.assignment.strategy
决定哪些分区应该被调配给哪个消费者。Kafka 有两个默认的调配策略:
- Range,把 Topic 的若干个间断的分区调配给消费者。
- RoundRobin,把所有分区一一调配给消费者。
默认值是 org.apache.kafka.clients.consumer.RangeAssignor
,这个类实现了 Range
策略。
receive.buffer.bytes
和send.buffer.bytes
别离指定了 TCP socket 接管和发送数据包的缓冲区大小。如果设为 - 1 就应用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么能够适当增大这些值,因为跨数据中心的网络个别都有比拟高的提早和比拟低的带宽。
优雅退出
package graceexit;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
/**
* @Author Natasha
* @Description
* @Date 2020/11/4 14:21
**/
public class QuitConsumer {public static void main(String[] args) {Properties props = new Properties();
props.put("bootstrap.servers", "120.27.233.226:9092");
props.put("group.id", "test");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
final Thread mainThread = Thread.currentThread();
/*
* 注册 JVM 敞开时的回调,当 JVM 敞开时调用
* 退出循环须要通过另一个线程调用 consumer.wakeup()办法
* 调用 consumer.wakeup()能够退出 poll(), 并抛出 WakeupException 异样
* 咱们不须要解决 WakeupException, 因为它只是用于跳出循环的一种形式
* consumer.wakeup()是消费者惟一一个能够从其余线程里平安调用的办法
*/
Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {System.out.println("Starting exit...");
// 调用消费者的 wakeup 办法告诉主线程退出
consumer.wakeup();
try {
// 主线程继续执行,以便能够敞开 consumer,提交偏移量
mainThread.join();} catch (InterruptedException e) {e.printStackTrace();
}
}
});
try {while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {System.out.println("topic =" + record.topic() + ", partition =" + record.partition() + ", offset =" + record.offset());
}
consumer.commitAsync();}
}catch (WakeupException e) {// 不解决异样} finally {// 在退出线程之前调用 consumer.close()是很有必要的,它会提交任何还没有提交的货色,并向组协调器发送音讯,告知本人要来到群组。// 接下来就会触发再平衡,而不须要期待会话超时。consumer.commitSync();
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
}
github
本文章中相干代码样例已上传 github:https://github.com/ShawnVanorGit/hello_kafka