如何生产数据

咱们曾经晓得了如何发送数据到Kafka,既然有数据发送,那么必定就有数据生产,消费者也是Kafka整个体系中不可短少的一环

public class KafkaConsumerDemo {public static void main(String[] args) throws InterruptedException {    Properties props = new Properties();    // 必须设置的属性    props.put("bootstrap.servers", "192.168.239.131:9092");    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    props.put("group.id", "group1");        // 可选设置属性    props.put("enable.auto.commit", "true");    // 主动提交offset,每1s提交一次    props.put("auto.commit.interval.ms", "1000");    props.put("auto.offset.reset","earliest ");    props.put("client.id", "zy_client_id");    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);    // 订阅test1 topic    consumer.subscribe(Collections.singletonList("test1"));    while(true) {        //  从服务器开始拉取数据        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));        records.forEach(record -> {            System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),                    record.offset(), record.key(), record.value());        });    }  }}

push 还是 pull

Kafka Consumer采纳的是被动拉取broker数据进行生产的。个别消息中间件存在推送(server推送数据给consumer)和拉取(consumer被动取服务器取数据)两种形式,这两种形式各有优劣。

如果是抉择推送的形式最大的妨碍就是服务器不分明consumer的生产速度,如果consumer中执行的操作又是比拟耗时的,那么consumer可能会不堪重负,甚至会导致系统挂掉。

而采纳拉取的形式则能够解决这种状况,consumer依据本人的状态来拉取数据,能够对服务器的数据进行提早解决。然而这种形式也有一个劣势就是服务器没有数据的时候可能会始终轮询,不过还好Kafka在poll()有参数容许消费者申请在“长轮询”中阻塞,期待数据达到(并且可选地期待直到给定数量的字节可用以确保传输大小)。

必须属性

下面代码中消费者必须的属性有4个,这里着重说一下group.id这个属性,kafka Consumer和Producer不一样,Consummer中有一个Consumer group(生产组),由它来决定同一个Consumer group中的消费者具体拉取哪个partition的数据,所以这里必须指定group.id属性。

  1. bootstrap.servers
    连贯Kafka集群的地址,多个地址以逗号分隔
  2. key.deserializer
    音讯中key反序列化类,须要和Producer中key序列化类绝对应
  3. value.deserializer
    音讯中value的反序列化类,须要和Producer中Value序列化类绝对应
  4. group.id
    消费者所属生产组的惟一标识

订阅/勾销主题

  1. 应用subscribe()办法订阅主题
  2. 应用assign()办法订阅确定主题和分区
List<PartitionInfo> partitionInfoList = consumer.partitionsFor("topic1");if(null != partitionInfoList) {  for(PartitionInfo partitionInfo : partitionInfoList) {      consumer.assign(Collections.singletonList(        new TopicPartition(partitionInfo.topic(), partitionInfo.partition())));  }}

通过subscribe()办法订阅主题具备消费者主动再平衡(reblance)的性能,存在多个消费者的状况下能够依据分区调配策略来主动调配各个消费者与分区的关系。当组内的消费者减少或者缩小时,分区关系会主动调整。实现生产负载平衡以及故障主动转移。应用assign()办法订阅则不具备该性能。

  1. 勾销主题
consumer.unsubscribe();consumer.subscribe(new ArrayList<>());consumer.assign(new ArrayList<TopicPartition>());

下面的三行代码作用雷同,都是勾销订阅,其中unsubscribe()办法即能够勾销通过subscribe()形式实现的订阅,还能够勾销通过assign()形式实现的订阅。

如何更好的生产数据

结尾处的代码展现了咱们是如何生产数据的,然而代码未免过于简略,咱们测试的时候这样写没有问题,然而理论开发过程中咱们并不会这样写,咱们会抉择更加高效的形式,这里提供两种形式供大家参考。

  1. 一个Consumer group,多个consumer,数量小于等于partition的数量

  1. 一个consumer,多线程处理事件

第一种形式每个consumer都要保护一个独立的TCP连贯,如果分区数和创立consumer线程的数量过多,会造成不小零碎开销。然而如果解决音讯足够疾速,生产性能也会晋升,如果慢的话就会导致生产性能升高。

第二种形式是采纳一个consumer,多个音讯解决线程来解决音讯,其实在生产中,瓶颈个别是集中在音讯解决上的(可能会插入数据到数据库,或者申请第三方API),所以咱们采纳多个线程来解决这些音讯。

当然能够联合第一二种形式,采纳多consumer+多个音讯解决线程来生产Kafka中的数据,外围代码如下:

for (int i = 0; i < consumerNum; i++) {  //依据属性创立Consumer  final Consumer<String, byte[]> consumer = consumerFactory.getConsumer(getServers(), groupId);  consumerList.add(consumer);  //订阅主题  consumer.subscribe(Arrays.asList(this.getTopic()));  //consumer.poll()拉取数据  BufferedConsumerRecords bufferedConsumerRecords = new BufferedConsumerRecords(consumer);  getExecutor().scheduleWithFixedDelay(() -> {      long startTime = System.currentTimeMillis();      //进行音讯解决      consumeEvents(bufferedConsumerRecords);      long sleepTime = intervalMillis - (System.currentTimeMillis() - startTime);      if (sleepTime > 0) {        Thread.sleep(sleepTime);      }  }, 0, 1000, TimeUnit.MILLISECONDS);}

不过这种形式不能程序解决数据,如果你的业务是程序解决,那么第一种形式可能更适宜你。所以理论生产中请依据业务抉择最适宜本人的形式。

生产数据思考哪些问题?

在Kafka中无论是producer往topic中写数据,还是consumer从topic中读数据,都防止不了和offset打交道,对于offset次要有以下几个概念。

  • Last Committed Offset:consumer group最新一次 commit 的 offset,示意这个 group 曾经把 Last Committed Offset 之前的数据都生产胜利了。
  • Current Position:consumer group 以后生产数据的 offset,也就是说,Last Committed Offset 到 Current Position 之间的数据曾经拉取胜利,可能正在解决,然而还未 commit。
  • Log End Offset(LEO):记录底层日志(log)中的下一条音讯的 offset。,对producer来说,就是行将插入下一条音讯的offset。
  • High Watermark(HW):曾经胜利备份到其余 replicas 中的最新一条数据的 offset,也就是说 Log End Offset 与 High Watermark 之间的数据曾经写入到该 partition 的 leader 中,然而还未齐全备份到其余的 replicas 中,consumer是无奈生产这部分音讯(未提交音讯)。

每个Kafka正本对象都有两个重要的属性:LEO和HW。留神是所有的正本,而不只是leader正本。对于这两者更具体解释,倡议参考这篇文章。

对于消费者而言,咱们更多时候关注的是生产实现之后如何和服务器进行生产确认,通知服务器这部分数据我曾经生产过了。

这里就波及到了2个offset,一个是current position,一个是处理完毕向服务器确认的committed offset。显然,异步模式下committed offset是落后于current position的。如果consumer挂掉了,那么下一次生产数据又只会从committed offset的地位拉取数据,就会导致数据被反复生产。

提交策略如何抉择

Kafka提供了3种提交offset的形式

  1. 主动提交
// 主动提交,默认trueprops.put("enable.auto.commit", "true");// 设置主动每1s提交一次props.put("auto.commit.interval.ms", "1000");
  1. 手动同步提交offset
consumer.commitSync();
  1. 手动异步提交offset
consumer.commitAsync();

下面说了既然异步提交offset可能会反复生产,那么我应用同步提交是否就能够表明这个问题呢?我只能说too young,too sample。

while(true) {  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  records.forEach(record -> {      insertIntoDB(record);      consumer.commitSync();  });}

很显著不行,因为insertIntoDB和commitSync()做不到原子操作,如果insertIntoDB()胜利了,然而提交offset的时候consumer挂掉了,而后服务器重启,依然会导致反复生产问题。

是否须要做到不反复生产?

只有保障解决音讯和提交offset得操作是原子操作,就能够做到不反复生产。咱们能够本人治理committed offset,而不让kafka来进行治理。

比方如下应用形式:

  1. 如果生产的数据刚好须要存储在数据库,那么能够把offset也存在数据库,就能够就能够在一个事物中提交这两个后果,保障原子操作。
  2. 借助搜索引擎,把offset和数据一起放到索引外面,比方Elasticsearch

每条记录都有本人的offset,所以如果要治理本人的offset还得要做上面事件

  1. 设置enable.auto.commit=false
  2. 应用每个ConsumerRecord提供的offset来保留生产的地位。
  3. 在重新启动时应用seek(TopicPartition, long)复原上次生产的地位。

通过下面的形式就能够在生产端实现"Exactly Once"的语义,即保障只生产一次。然而是否真的须要保障不反复生产呢?这个得看具体业务,反复生产数据对整体有什么影响在来决定是否须要做到不反复生产。

再平衡(reblance)怎么办?

再平衡是指分区的所属权从一个消费者转移到另一个消费者的行为,再平衡期间,生产组内的生产组无奈读取音讯。为了更准确的管制音讯的生产,咱们能够再订阅主题的时候,通过指定监听器的形式来设定产生再平衡动作前后的一些筹备或者收尾的动作。

consumer.subscribe(Collections.singletonList("test3"), new ConsumerRebalanceListener() {  @Override  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {      //再平衡之前和消费者进行读取音讯之后被调用  }  @Override  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {      //重新分配分区之后和消费者开始生产之前被调用  }});

具体如何做得依据具体的业务逻辑来实现,如果音讯比拟重要,你能够在再平衡的时候解决offset,如果不够重要,你能够什么都不做。

无奈生产的数据怎么办?

可能因为你的业务逻辑有些数据没法生产这个时候怎么办?同样的还是的看你认为这个数据有多重要或者多不重要,如果重要能够记录日志,把它存入文件或者数据库,以便于稍候进行重试或者定向剖析。如果不重要就当做什么事件都没有产生好了。

理论开发中我的解决形式

我开发的我的项目中,用到kafka的其中一个中央是音讯告诉(谁给你发了音讯,点赞,评论等),大略的流程就是用户在client端做了某些操作,就会发送数据到kafka,而后把这些数据进行肯定的解决之后插入到HBase中。

其中采纳了 N consumer thread + N Event Handler的形式来生产数据,并采纳主动提交offset。对于无奈生产的数据往往只是简略解决下,打印下日志以及音讯体(无奈生产的状况十分非常少)。

得益于HBase的多version管制,即便是反复生产了数据也无关紧要。这样做没有去防止反复生产的问题次要是基于以下几点思考

  1. 反复生产的概率较低,服务器整体性能稳固
  2. 即使是反复生产了数据,入库了HBase,获取数据也是只有一条,不影响后果的正确性
  3. 有更高的吞吐量
  4. 编程简略,不必独自去解决以及保留offset

几个重要的消费者参数

  • fetch.min.bytes

    配置poll()拉取申请过程种能从Kafka拉取的最小数据量,如果可用数据量小于它指定的大小会等到有足够可用数据时才会返回给消费者,其默认值时1B

  • fetch.max.wait.ms

    和fetch.min.bytes无关,用于指定Kafka的等待时间,默认工夫500ms。如果fetch.min.bytes设置为1MB,fetch.max.wait.ms设置为100ms,Kafka收到消费者申请后,要么返回1MB数据,要么在100ms后返回所有可用数据,就看哪个提交失去满足。

  • max.poll.records

    用于管制单次调用poll()能返回的最大记录数量,默认为500条数据

  • partition.assignment.stragety

    分区会被调配给群组的消费者,这个参数用于指定分区调配策略。默认是RangeAssignore,可选的还有RoundRobinAssignor。同样它还反对自定义

其余更多参数请参考官网文档。