共计 6277 个字符,预计需要花费 16 分钟才能阅读完成。
kafka 学习
windows 上的 kafka 学习和装置相干的问题
一. 下载 kafka
kafka 应用 java 实现并且官网提供了 windows 的反对, 所以间接下载就完事了, 将其解压到一个文件夹下, 如我在 D:\kafka, 这里版本是 3.20, 其余版本可能会有点不同. 整体门路如下, 别离是
- bin: 提供的一些曾经写好了的 shell 命令文件和 windows 上面的 bat 文件
- config: 一些曾经配置好的文件, 如 kafka server 的配置,zookeeper 的配置,consumer 和 producer 的配置
- libs: jar 包和一些依赖
- licenses: 开源协定证书
二. 启动 kafka 单实例
激动人心的时刻来了, 咱们下载了文件, 装置了 jdk 环境 (个别都会有环境吧), 而后设置 properties 文件, 在这里我轻易贴一下要留神的 properties 文件, 定义了前面须要用的端口:
# file:config/server.properties | |
listeners=PLAINTEXT://127.0.0.1:9092 # 指定端口 | |
log.dirs=E:\\kafka-logs-1 # 我感觉指定个文件夹比拟好 | |
zookeeper.connect=localhost:2181 # 指定 zookeeper 服务器 |
# file:config/zookeeper.properties | |
dataDir=E:\\zookeeper | |
# the port at which the clients will connect | |
clientPort=2181 |
1. 启动 zookeeper
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties | |
# 如果应用 wsl 或者 bash 上面 | |
.\bin\zookeeper-server-start.sh .\config\zookeeper.properties |
2. 启动 kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties | |
# 如果应用 wsl 或者 bash 上面 | |
.\bin\kafka-server-start.sh .\config\zookeeper.properties |
这样子就算是启动胜利了, 并且能够看到启动的实例连贯的 zookeeper 和 broker 提供的 ip.
3. 创立 topic
# 老版本应用 zookeeper-server 确定对应的 kafka 集群, 然而新版本应用 bootstrap-server 确定连贯的集群 | |
.\bin\windows\kafka-topics --create --bootstrap-server 127.0.0.1:9092 --topic test |
咱们查看当初的集群里的 topic 状况能够应用上面的命令:
.\bin\windows\kafka-topics --describe --bootstrap-server 127.0.0.1:9092
能够看到自身实际上存在一个 top 叫做__consumer_=offsets 去保留对应的 consumer 的 offeset 数据
4. 向 topic 写入数据和读取数据
.\bin\windows\kafka-console-producer --bootstrap-server 127.0.0.1:9092 --topic test1
输出日志数据, 之后再读出来
.\bin\windows\kafka-console-consumer.bat --topic test1 --bootstrap-server 127.0.0.1:9093 # --from-beginning 能够看到当初还保留的音讯
那么至此咱们就实现了最根本的 kafka 的操作, 创立主题 \ 写入数据 \ 读出数据
三. 编写本人的代码
1. 编写本人的 producer
依据 kafka 自身的教程,kafka-clients 自身提供了三个 send 模式, 别离是阻塞和非阻塞以及实现好了的 future 回调.
package com.lixiande.kafkaLearn; | |
import org.apache.kafka.clients.producer.Callback; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.clients.producer.RecordMetadata; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.Date; | |
import java.util.Properties; | |
import java.util.concurrent.Future; | |
public class Producer {Logger logger = LoggerFactory.getLogger(Producer.class); | |
private Properties kafkaProducerProps; | |
private KafkaProducer kafkaProducer; | |
public static void main(String[] args) {Producer producer = new Producer(); | |
try {while (true) {producer.SendCallBack("test", "what fuck about key value", "this is nothing about kafka and send with Callback" + new Date().toString()); | |
producer.SendBlock("test", "what fuck about key value", "this is nothing about kafka and send by blocking" + new Date().toString()); | |
producer.SendAsync("test", "what fuck about key value", "this is nothing about kafka and send by async" + new Date().toString()); | |
} | |
} finally {producer.kafkaProducer.close(); | |
} | |
} | |
public void SendBlock(String topic, String key, String value) { | |
try {System.out.println("block send :" + kafkaProducer.send(new ProducerRecord<String, String>(topic, key, value)).get().toString()); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
public Future SendAsync(String topic, String key, String value) {return kafkaProducer.send(new ProducerRecord<String, String>(topic, key, value)); | |
} | |
public void SendCallBack(String topic, String key, String value) {kafkaProducer.send(new ProducerRecord<String, String>(topic, key, value), new Callback() { | |
@Override | |
public void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println(recordMetadata.toString()); | |
if (e != null) | |
System.out.println(e.toString()); | |
} | |
}); | |
} | |
public Producer() {kafkaProducerProps = new Properties(); | |
kafkaProducerProps.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class.getName()); | |
kafkaProducerProps.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class.getName()); | |
kafkaProducerProps.put("bootstrap.servers", "127.0.0.1:9092"); | |
kafkaProducer = new KafkaProducer<String, String>(kafkaProducerProps); | |
} | |
} |
2. 编写本人的 consumer
同样的 consumer 也是能够有很多种形式, 比方订阅 topic, 订阅 topic 外面的某些 partition, 以及订阅正则匹配的 topics
package com.lixiande.kafkaLearn; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.common.PartitionInfo; | |
import org.apache.kafka.common.errors.WakeupException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.*; | |
public class Consumer {private static Logger logger = LoggerFactory.getLogger(Consumer.class); | |
private KafkaConsumer kafkaConsumer; | |
private Properties kafkaConsumerProps; | |
private Map<String, Integer> consumeMap; | |
public static void main(String[] args) {Consumer consumer = new Consumer(); | |
Thread mainThread = Thread.currentThread(); | |
Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("consumer starting exiting"); | |
consumer.kafkaConsumer.wakeup(); | |
try {mainThread.join(); | |
} catch (InterruptedException e) {e.printStackTrace(); | |
} | |
})); | |
consumer.listen();} | |
public Consumer() {kafkaConsumerProps = new Properties(); | |
kafkaConsumerProps.put("bootstrap.servers", "127.0.0.1:9092"); | |
kafkaConsumerProps.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class.getName()); | |
kafkaConsumerProps.put("value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class.getName()); | |
kafkaConsumerProps.put("group.id", "loopConsumer"); | |
kafkaConsumer = new KafkaConsumer<String, String>(kafkaConsumerProps); | |
kafkaConsumer.subscribe(Collections.singletonList("test")); | |
// kafkaConsumer.subscribe(Pattern.compile("test*")); // 也能够订阅所有的 test* 的主题 | |
List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor("test"); | |
// XXX: 这里能够用于获取特定的 topic 的分区, 从而实现不同的消费者手动调配, 而不会走平衡 | |
/* | |
// if (partitionInfoList != null){// for (PartitionInfo info : partitionInfoList){// partitions.add(new TopicPartition(info.topic(), info.partition())); | |
// } | |
// kafkaConsumer.assign(partitions); | |
// } | |
*/ | |
consumeMap = new HashMap<>();} | |
public void listen() { | |
try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100); | |
for (ConsumerRecord<String, String> record : records) {logger.warn(record.toString()); | |
int updatedCount = 1; | |
if (consumeMap.containsValue(record.value())) {updatedCount = consumeMap.get(record.value()) + 1; | |
} | |
consumeMap.put(record.value(), updatedCount); | |
} | |
System.out.println("\n-------------------------------------------------\n"); | |
System.out.println(consumeMap); | |
System.out.println("\n-------------------------------------------------\n"); | |
consumeMap.clear(); | |
kafkaConsumer.commitAsync();} | |
} catch (WakeupException e) { } finally {kafkaConsumer.close(); | |
System.out.println("Closed Consumer and we are done"); | |
} | |
} | |
} |
正文完