kafka学习

windows 上的kafka学习和装置相干的问题

一. 下载kafka

kafka应用java实现并且官网提供了windows的反对,所以间接下载就完事了,将其解压到一个文件夹下,如我在D:\kafka,这里版本是3.20,其余版本可能会有点不同.整体门路如下,别离是

  • bin: 提供的一些曾经写好了的shell命令文件和windows上面的bat文件
  • config: 一些曾经配置好的文件,如kafka server的配置,zookeeper的配置,consumer和producer的配置
  • libs: jar包和一些依赖
  • licenses: 开源协定证书

二. 启动kafka 单实例

激动人心的时刻来了,咱们下载了文件,装置了jdk环境(个别都会有环境吧),而后设置properties文件,在这里我轻易贴一下要留神的properties文件,定义了前面须要用的端口:

# file:config/server.propertieslisteners=PLAINTEXT://127.0.0.1:9092 # 指定端口log.dirs=E:\\kafka-logs-1 # 我感觉指定个文件夹比拟好zookeeper.connect=localhost:2181 # 指定zookeeper服务器
# file:config/zookeeper.propertiesdataDir=E:\\zookeeper# the port at which the clients will connectclientPort=2181

1. 启动zookeeper

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties# 如果应用wsl或者bash上面.\bin\zookeeper-server-start.sh .\config\zookeeper.properties

2. 启动kafka

.\bin\windows\kafka-server-start.bat .\config\server.properties# 如果应用wsl或者bash上面.\bin\kafka-server-start.sh .\config\zookeeper.properties

这样子就算是启动胜利了,并且能够看到启动的实例连贯的zookeeper和broker提供的ip.

3. 创立topic

# 老版本应用zookeeper-server 确定对应的kafka集群,然而新版本应用bootstrap-server确定连贯的集群.\bin\windows\kafka-topics --create --bootstrap-server 127.0.0.1:9092 --topic test

咱们查看当初的集群里的topic状况能够应用上面的命令:

.\bin\windows\kafka-topics --describe --bootstrap-server 127.0.0.1:9092

能够看到自身实际上存在一个top叫做__consumer_=offsets去保留对应的consumer的offeset数据

4. 向topic写入数据和读取数据

.\bin\windows\kafka-console-producer --bootstrap-server 127.0.0.1:9092 --topic test1

输出日志数据,之后再读出来

 .\bin\windows\kafka-console-consumer.bat --topic test1 --bootstrap-server 127.0.0.1:9093 # --from-beginning 能够看到当初还保留的音讯

那么至此咱们就实现了最根本的kafka的操作,创立主题\写入数据\读出数据

三. 编写本人的代码

1. 编写本人的producer

依据kafka自身的教程,kafka-clients自身提供了三个send模式,别离是阻塞和非阻塞以及实现好了的future回调.

package com.lixiande.kafkaLearn;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Date;import java.util.Properties;import java.util.concurrent.Future;public class Producer {    Logger logger = LoggerFactory.getLogger(Producer.class);    private Properties kafkaProducerProps;    private KafkaProducer kafkaProducer;    public static void main(String[] args) {        Producer producer = new Producer();        try {            while (true) {                producer.SendCallBack("test", "what fuck about key value", "this is nothing about kafka and send with Callback" + new Date().toString());                producer.SendBlock("test", "what fuck about key value", "this is nothing about kafka and send by blocking" + new Date().toString());                producer.SendAsync("test", "what fuck about key value", "this is nothing about kafka and send by async" + new Date().toString());            }        } finally {            producer.kafkaProducer.close();        }    }    public void SendBlock(String topic, String key, String value) {        try {            System.out.println("block send :" + kafkaProducer.send(new ProducerRecord<String, String>(topic, key, value)).get().toString());        } catch (Exception e) {            e.printStackTrace();        }    }    public Future SendAsync(String topic, String key, String value) {        return kafkaProducer.send(new ProducerRecord<String, String>(topic, key, value));    }    public void SendCallBack(String topic, String key, String value) {        kafkaProducer.send(new ProducerRecord<String, String>(topic, key, value), new Callback() {            @Override            public void onCompletion(RecordMetadata recordMetadata, Exception e) {                System.out.println(recordMetadata.toString());                if (e != null)                    System.out.println(e.toString());            }        });    }    public Producer() {        kafkaProducerProps = new Properties();        kafkaProducerProps.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class.getName());        kafkaProducerProps.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class.getName());        kafkaProducerProps.put("bootstrap.servers", "127.0.0.1:9092");        kafkaProducer = new KafkaProducer<String, String>(kafkaProducerProps);    }}

2.编写本人的consumer

同样的consumer也是能够有很多种形式,比方订阅topic,订阅topic外面的某些partition,以及订阅正则匹配的topics

package com.lixiande.kafkaLearn;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.PartitionInfo;import org.apache.kafka.common.errors.WakeupException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.*;public class Consumer {    private static Logger logger = LoggerFactory.getLogger(Consumer.class);    private KafkaConsumer kafkaConsumer;    private Properties kafkaConsumerProps;    private Map<String, Integer> consumeMap;    public static void main(String[] args) {        Consumer consumer = new Consumer();        Thread mainThread = Thread.currentThread();        Runtime.getRuntime().addShutdownHook(new Thread(() -> {            System.out.println("consumer starting exiting");            consumer.kafkaConsumer.wakeup();            try {                mainThread.join();            } catch (InterruptedException e) {                e.printStackTrace();            }        }));        consumer.listen();    }    public Consumer() {        kafkaConsumerProps = new Properties();        kafkaConsumerProps.put("bootstrap.servers", "127.0.0.1:9092");        kafkaConsumerProps.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class.getName());        kafkaConsumerProps.put("value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class.getName());        kafkaConsumerProps.put("group.id", "loopConsumer");        kafkaConsumer = new KafkaConsumer<String, String>(kafkaConsumerProps);        kafkaConsumer.subscribe(Collections.singletonList("test"));        // kafkaConsumer.subscribe(Pattern.compile("test*")); // 也能够订阅所有的test*的主题        List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor("test");        // XXX:这里能够用于获取特定的topic的分区,从而实现不同的消费者手动调配,而不会走平衡        /*        // if (partitionInfoList != null){        //     for (PartitionInfo info : partitionInfoList){        //         partitions.add(new TopicPartition(info.topic(), info.partition()));        //     }        //     kafkaConsumer.assign(partitions);        // }        */        consumeMap = new HashMap<>();    }    public void listen() {        try {            while (true) {                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);                for (ConsumerRecord<String, String> record : records) {                    logger.warn(record.toString());                    int updatedCount = 1;                    if (consumeMap.containsValue(record.value())) {                        updatedCount = consumeMap.get(record.value()) + 1;                    }                    consumeMap.put(record.value(), updatedCount);                }                System.out.println("\n-------------------------------------------------\n");                System.out.println(consumeMap);                System.out.println("\n-------------------------------------------------\n");                consumeMap.clear();                kafkaConsumer.commitAsync();            }        } catch (WakeupException e) {        } finally {            kafkaConsumer.close();            System.out.println("Closed Consumer and we are done");        }    }}