共计 4131 个字符,预计需要花费 11 分钟才能阅读完成。
相干常识
1.Kafka 提供了 Producer 类作为 Java producer 的 api,该类有 sync 和 async 两种发送形式。默认是 sync 形式,即 producer 的调用类在音讯真正发送到队列中去当前才返回。
(1)Kafka 提供的 java api 中的 Producer,底层只是保护该 topic 到每个 broker 的连贯,并不是一个传统意义上的连接池。在应用 sync 形式时,咱们应该本人实现一个连接池,外面蕴含若干 Producer 对象,以实现最大化写入效率。
(2)在写入的数据频率不高或要求取得写入后果时,应应用 sync 形式,否则会因 async 的等待时间引入额定的提早。
(3)在写入的数据频率很高时,应应用 async 形式,以 batch 的模式写入,取得最大效率。
async 形式与 sync 形式的不同在于,在初始化 scala 的 producer 时,会创立一个 ProducerSendThread 对象。而后,在调用 send 时,它并不是间接调用 eventHandler.handle 办法,而是把音讯放入一个长度由 queue.buffering.max.messages 参数定义的队列(默认 10000),当队列满足以下两种条件时,会由 ProducerSendThread 触发 eventHandler.handle 办法,把队列中的音讯作为一个 batch 发送
①工夫超过 queue.buffering.max.ms 定义的值,默认 5000ms
②队列中以后音讯个数超过 batch.num.messages 定义的值,默认 200
2.Kafka 的 Consumer 有两种 Consumer 的高层 API、简略 API–SimpleConsumer
(1)Consumer 的高层 API
次要是 Consumer 和 ConsumerConnector,这里的 Consumer 是 ConsumerConnector 的动态工厂类
class Consumer {
public static kafka.javaapi.consumer.ConsumerConnector
createJavaConsumerConnector(config: ConsumerConfig);
}
具体的音讯的生产都是在 ConsumerConnector 中
创立一个音讯解决的流,蕴含所有的 topic,并依据指定的 Decoder
public Map>>createMessageStreams(Map topicCountMap, Decoder keyDecoder, Decoder valueDecoder);
创立一个音讯解决的流,蕴含所有的 topic,应用默认的 Decoder
public Map>> createMessageStreams(Map topicCountMap);
获取指定音讯的 topic, 并依据指定的 Decoder
public List>>createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder keyDecoder, Decoder valueDecoder);
获取指定音讯的 topic, 应用默认的 Decoder
public List> createMessageStreamsByFilter(TopicFilter topicFilter);
提交偏移量到这个消费者连贯的 topic
public void commitOffsets();
敞开消费者
public void shutdown();
高层的 API 中比拟罕用的就是 public List> createMessageStreamsByFilter(TopicFilter topicFilter); 和 public void commitOffsets();
(2)Consumer 的简略 API–SimpleConsumer
批量获取音讯
public FetchResponse fetch(request: kafka.javaapi.FetchRequest);
获取 topic 的元信息
public kafka.javaapi.TopicMetadataResponse send(request:kafka.javaapi.TopicMetadataRequest);
获取目前可用的偏移量
public kafka.javaapi.OffsetResponse getOffsetsBefore(request: OffsetRequest);
敞开连贯
public void close();
对于大部分利用来说,高层 API 就曾经足够应用了,然而若是想做更进一步的管制的话,能够应用简略的 API,例如消费者重启的状况下,心愿失去最新的 offset,就该应用 SimpleConsumer。
零碎环境
Linux Ubuntu 20.04
OpenJDK-11.0.11
kafka_2.13-2.8.0
zookeeper-3.6.3
IntelliJ IDEA 2021.1 (Ultimate Edition)
工作内容
本试验是应用简略 Java API 来模仿 Kafka 的 producer 和 consumer,其中 producer 是通过一个 while 循环生成内容,而后将内容传递给 Kafka,consumer 从 Kafka 中读取内容,并在 Console 界面中输入。
工作步骤
1. 关上 Idea,新建一个 Java 我的项目,将 hadoop 中的配置文件加到 resources 中。
2. 增加 maven 依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
</dependency>
3. 启动 ZooKeeper。切换到 /apps/zookeeper/bin 目录下,执行 ZooKeeper 的启动脚本。
cd /apps/zookeeper/bin
./zkServer.sh start
查看 ZooKeeper 的运行状态。
./zkServer.sh status
4. 切换目录到 /apps/kafka 目录下,启动 kafka 的 server。
cd /apps/kafka
bin/kafka-server-start.sh config/server.properties &
5. 另起一窗口,切换到 /apps/kafka 下,在 kafka 中创立 topic,命名为 dblab01。
cd /apps/kafka
bin/kafka-topics.sh \
--create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--topic dblab01 \
--partitions 1
查看 topic
cd /apps/kafka
bin/kafka-topics.sh --list --zookeeper localhost:2181
6. 创立 kafka 的 producer,用于生产数据。在包下,创立 Class,命名为 MyProducer。
package my.study.kafka;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MyProducer {public void produce(){Properties props = new Properties();
// 设置 kafka 集群的地址
props.put("bootstrap.servers", "localhost:9092");
//ack 模式,all 是最慢但最平安的
props.put("acks", "all");
// 失败重试次数
props.put("retries", 0);
// 每个分区未发送音讯总字节大小(单位:字节),超过设置的值就会提交数据到服务端
props.put("batch.size", 16384);
props.put("linger.ms", 1);
// 整个 Producer 用到总内存的大小,如果缓冲区满了会提交数据到服务端
//buffer.memory 要大于 batch.size,否则会报申请内存不足的谬误
props.put("buffer.memory", 33554432);
// 序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("dblab01", Integer.toString(i), Integer.toString(i)));
producer.close();}
public static void main(String[] args) {new MyProducer().produce();}
}
producer 端的代码:首先定义一个 topic 的名称,而后创立一个 properties 实例,用来设置 produce 的参数。接着创立一个 producer 的实例并将参数配置 props 作为参数上传进去。在 produce 办法中定义一个 key 与 data, 创立 KeyedMessage 实例,并将 key,data 和 topic 作为参数上传进去,而后把 KeyedMessage 实例上传给 producer。在主函数中间接调用 MyProduce 的 produce()办法,用来实现音讯的上传。