共计 6710 个字符,预计需要花费 17 分钟才能阅读完成。
如何生产数据
咱们曾经晓得了如何发送数据到 Kafka, 既然有数据发送, 那么必定就有数据生产, 消费者也是 Kafka 整个体系中不可短少的一环
public class KafkaConsumerDemo {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();
// 必须设置的属性
props.put("bootstrap.servers", "192.168.239.131:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "group1");
// 可选设置属性
props.put("enable.auto.commit", "true");
// 主动提交 offset, 每 1s 提交一次
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset","earliest");
props.put("client.id", "zy_client_id");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅 test1 topic
consumer.subscribe(Collections.singletonList("test1"));
while(true) {
// 从服务器开始拉取数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
});
}
}
}
push 还是 pull
Kafka Consumer 采纳的是被动拉取 broker 数据进行生产的。个别消息中间件存在推送 (server 推送数据给 consumer) 和拉取 (consumer 被动取服务器取数据) 两种形式,这两种形式各有优劣。
如果是抉择推送的形式最大的妨碍就是服务器不分明 consumer 的生产速度,如果 consumer 中执行的操作又是比拟耗时的,那么 consumer 可能会不堪重负, 甚至会导致系统挂掉。
而采纳拉取的形式则能够解决这种状况,consumer 依据本人的状态来拉取数据, 能够对服务器的数据进行提早解决。然而这种形式也有一个劣势就是服务器没有数据的时候可能会始终轮询,不过还好 Kafka 在 poll()有参数容许消费者申请在“长轮询”中阻塞,期待数据达到(并且可选地期待直到给定数量的字节可用以确保传输大小)。
必须属性
下面代码中消费者必须的属性有 4 个, 这里着重说一下 group.id 这个属性,kafka Consumer 和 Producer 不一样,Consummer 中有一个 Consumer group(生产组),由它来决定同一个 Consumer group 中的消费者具体拉取哪个 partition 的数据, 所以这里必须指定 group.id 属性。
- bootstrap.servers
连贯 Kafka 集群的地址,多个地址以逗号分隔 - key.deserializer
音讯中 key 反序列化类, 须要和 Producer 中 key 序列化类绝对应 - value.deserializer
音讯中 value 的反序列化类, 须要和 Producer 中 Value 序列化类绝对应 - group.id
消费者所属生产组的惟一标识
订阅 / 勾销主题
- 应用 subscribe()办法订阅主题
- 应用 assign()办法订阅确定主题和分区
List<PartitionInfo> partitionInfoList = consumer.partitionsFor("topic1");
if(null != partitionInfoList) {for(PartitionInfo partitionInfo : partitionInfoList) {
consumer.assign(Collections.singletonList(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())));
}
}
通过 subscribe()办法订阅主题具备消费者主动再平衡 (reblance) 的性能,存在多个消费者的状况下能够依据分区调配策略来主动调配各个消费者与分区的关系。当组内的消费者减少或者缩小时,分区关系会主动调整。实现生产负载平衡以及故障主动转移。应用 assign()办法订阅则不具备该性能。
- 勾销主题
consumer.unsubscribe();
consumer.subscribe(new ArrayList<>());
consumer.assign(new ArrayList<TopicPartition>());
下面的三行代码作用雷同,都是勾销订阅,其中 unsubscribe()办法即能够勾销通过 subscribe()形式实现的订阅,还能够勾销通过 assign()形式实现的订阅。
如何更好的生产数据
结尾处的代码展现了咱们是如何生产数据的, 然而代码未免过于简略, 咱们测试的时候这样写没有问题, 然而理论开发过程中咱们并不会这样写,咱们会抉择更加高效的形式, 这里提供两种形式供大家参考。
- 一个 Consumer group, 多个 consumer, 数量小于等于 partition 的数量
- 一个 consumer, 多线程处理事件
第一种形式每个 consumer 都要保护一个独立的 TCP 连贯,如果分区数和创立 consumer 线程的数量过多,会造成不小零碎开销。然而如果解决音讯足够疾速,生产性能也会晋升, 如果慢的话就会导致生产性能升高。
第二种形式是采纳一个 consumer,多个音讯解决线程来解决音讯,其实在生产中,瓶颈个别是集中在音讯解决上的(可能会插入数据到数据库,或者申请第三方 API),所以咱们采纳多个线程来解决这些音讯。
当然能够联合第一二种形式,采纳多 consumer+ 多个音讯解决线程来生产 Kafka 中的数据, 外围代码如下:
for (int i = 0; i < consumerNum; i++) {
// 依据属性创立 Consumer
final Consumer<String, byte[]> consumer = consumerFactory.getConsumer(getServers(), groupId);
consumerList.add(consumer);
// 订阅主题
consumer.subscribe(Arrays.asList(this.getTopic()));
//consumer.poll()拉取数据
BufferedConsumerRecords bufferedConsumerRecords = new BufferedConsumerRecords(consumer);
getExecutor().scheduleWithFixedDelay(() -> {long startTime = System.currentTimeMillis();
// 进行音讯解决
consumeEvents(bufferedConsumerRecords);
long sleepTime = intervalMillis - (System.currentTimeMillis() - startTime);
if (sleepTime > 0) {Thread.sleep(sleepTime);
}
}, 0, 1000, TimeUnit.MILLISECONDS);
}
不过这种形式不能程序解决数据,如果你的业务是程序解决,那么第一种形式可能更适宜你。所以理论生产中请依据业务抉择最适宜本人的形式。
生产数据思考哪些问题?
在 Kafka 中无论是 producer 往 topic 中写数据, 还是 consumer 从 topic 中读数据, 都防止不了和 offset 打交道, 对于 offset 次要有以下几个概念。
- Last Committed Offset:consumer group 最新一次 commit 的 offset,示意这个 group 曾经把 Last Committed Offset 之前的数据都生产胜利了。
- Current Position:consumer group 以后生产数据的 offset,也就是说,Last Committed Offset 到 Current Position 之间的数据曾经拉取胜利,可能正在解决,然而还未 commit。
- Log End Offset(LEO):记录底层日志 (log) 中的 下一条音讯的 offset。, 对 producer 来说,就是行将插入下一条音讯的 offset。
- High Watermark(HW):曾经胜利备份到其余 replicas 中的最新一条数据的 offset,也就是说 Log End Offset 与 High Watermark 之间的数据曾经写入到该 partition 的 leader 中,然而还未齐全备份到其余的 replicas 中,consumer 是无奈生产这部分音讯(未提交音讯)。
每个 Kafka 正本对象都有两个重要的属性:LEO 和 HW。留神是所有的正本,而不只是 leader 正本。对于这两者更具体解释,倡议参考这篇文章。
对于消费者而言,咱们更多时候关注的是生产实现之后如何和服务器进行生产确认,通知服务器这部分数据我曾经生产过了。
这里就波及到了 2 个 offset,一个是 current position, 一个是处理完毕向服务器确认的 committed offset。显然, 异步模式下 committed offset 是落后于 current position 的。如果 consumer 挂掉了, 那么下一次生产数据又只会从 committed offset 的地位拉取数据,就会导致数据被反复生产。
提交策略如何抉择
Kafka 提供了 3 种提交 offset 的形式
- 主动提交
// 主动提交, 默认 true
props.put("enable.auto.commit", "true");
// 设置主动每 1s 提交一次
props.put("auto.commit.interval.ms", "1000");
- 手动同步提交 offset
consumer.commitSync();
- 手动异步提交 offset
consumer.commitAsync();
下面说了既然异步提交 offset 可能会反复生产, 那么我应用同步提交是否就能够表明这个问题呢?我只能说 too young,too sample。
while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {insertIntoDB(record);
consumer.commitSync();});
}
很显著不行, 因为 insertIntoDB 和 commitSync()做不到原子操作, 如果 insertIntoDB()胜利了,然而提交 offset 的时候 consumer 挂掉了,而后服务器重启,依然会导致反复生产问题。
是否须要做到不反复生产?
只有保障解决音讯和提交 offset 得操作是原子操作,就能够做到不反复生产。咱们能够本人治理 committed offset, 而不让 kafka 来进行治理。
比方如下应用形式:
- 如果生产的数据刚好须要存储在数据库,那么能够把 offset 也存在数据库,就能够就能够在一个事物中提交这两个后果,保障原子操作。
- 借助搜索引擎,把 offset 和数据一起放到索引外面,比方 Elasticsearch
每条记录都有本人的 offset, 所以如果要治理本人的 offset 还得要做上面事件
- 设置 enable.auto.commit=false
- 应用每个 ConsumerRecord 提供的 offset 来保留生产的地位。
- 在重新启动时应用 seek(TopicPartition, long)复原上次生产的地位。
通过下面的形式就能够在生产端实现 ”Exactly Once” 的语义, 即保障只生产一次。然而是否真的须要保障不反复生产呢?这个得看具体业务, 反复生产数据对整体有什么影响在来决定是否须要做到不反复生产。
再平衡 (reblance) 怎么办?
再平衡是指分区的所属权从一个消费者转移到另一个消费者的行为,再平衡期间,生产组内的生产组无奈读取音讯。为了更准确的管制音讯的生产,咱们能够再订阅主题的时候,通过指定监听器的形式来设定产生再平衡动作前后的一些筹备或者收尾的动作。
consumer.subscribe(Collections.singletonList("test3"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {// 再平衡之前和消费者进行读取音讯之后被调用}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {// 重新分配分区之后和消费者开始生产之前被调用}
});
具体如何做得依据具体的业务逻辑来实现, 如果音讯比拟重要,你能够在再平衡的时候解决 offset, 如果不够重要,你能够什么都不做。
无奈生产的数据怎么办?
可能因为你的业务逻辑有些数据没法生产这个时候怎么办?同样的还是的看你认为这个数据有多重要或者多不重要,如果重要能够记录日志, 把它存入文件或者数据库,以便于稍候进行重试或者定向剖析。如果不重要就当做什么事件都没有产生好了。
理论开发中我的解决形式
我开发的我的项目中, 用到 kafka 的其中一个中央是音讯告诉(谁给你发了音讯, 点赞, 评论等), 大略的流程就是用户在 client 端做了某些操作,就会发送数据到 kafka, 而后把这些数据进行肯定的解决之后插入到 HBase 中。
其中采纳了 N consumer thread + N Event Handler 的形式来生产数据, 并采纳主动提交 offset。对于无奈生产的数据往往只是简略解决下,打印下日志以及音讯体(无奈生产的状况十分非常少)。
得益于 HBase 的多 version 管制, 即便是反复生产了数据也无关紧要。这样做没有去防止反复生产的问题次要是基于以下几点思考
- 反复生产的概率较低,服务器整体性能稳固
- 即使是反复生产了数据, 入库了 HBase, 获取数据也是只有一条, 不影响后果的正确性
- 有更高的吞吐量
- 编程简略,不必独自去解决以及保留 offset
几个重要的消费者参数
- fetch.min.bytes
配置 poll()拉取申请过程种能从 Kafka 拉取的最小数据量,如果可用数据量小于它指定的大小会等到有足够可用数据时才会返回给消费者,其默认值时 1B
- fetch.max.wait.ms
和 fetch.min.bytes 无关, 用于指定 Kafka 的等待时间,默认工夫 500ms。如果 fetch.min.bytes 设置为 1MB,fetch.max.wait.ms 设置为 100ms,Kafka 收到消费者申请后, 要么返回 1MB 数据, 要么在 100ms 后返回所有可用数据, 就看哪个提交失去满足。
- max.poll.records
用于管制单次调用 poll()能返回的最大记录数量,默认为 500 条数据
- partition.assignment.stragety
分区会被调配给群组的消费者, 这个参数用于指定分区调配策略。默认是 RangeAssignore, 可选的还有 RoundRobinAssignor。同样它还反对自定义
其余更多参数请参考官网文档。