相干常识

1.Kafka提供了Producer类作为Java producer的api,该类有sync和async两种发送形式。默认是sync形式,即producer的调用类在音讯真正发送到队列中去当前才返回。
(1)Kafka提供的java api中的Producer,底层只是保护该topic到每个broker的连贯,并不是一个传统意义上的连接池。在应用sync形式时,咱们应该本人实现一个连接池,外面蕴含若干Producer对象,以实现最大化写入效率。
(2)在写入的数据频率不高或要求取得写入后果时,应应用sync形式,否则会因async的等待时间引入额定的提早。
(3)在写入的数据频率很高时,应应用async形式,以batch的模式写入,取得最大效率。
async形式与sync形式的不同在于,在初始化scala的producer时,会创立一个ProducerSendThread对象。而后,在调用send时,它并不是间接调用eventHandler.handle办法,而是把音讯放入一个长度由queue.buffering.max.messages参数定义的队列(默认10000),当队列满足以下两种条件时,会由ProducerSendThread触发eventHandler.handle办法,把队列中的音讯作为一个batch发送
①工夫超过queue.buffering.max.ms定义的值,默认5000ms
②队列中以后音讯个数超过batch.num.messages定义的值,默认200

2.Kafka的Consumer有两种Consumer的高层API、简略API–SimpleConsumer
(1)Consumer的高层API
次要是Consumer和ConsumerConnector,这里的Consumer是ConsumerConnector的动态工厂类
class Consumer {

public static kafka.javaapi.consumer.ConsumerConnector

createJavaConsumerConnector(config: ConsumerConfig);

}

具体的音讯的生产都是在ConsumerConnector中

创立一个音讯解决的流,蕴含所有的topic,并依据指定的Decoder
public Map>>createMessageStreams(Map topicCountMap, Decoder keyDecoder, Decoder valueDecoder);

创立一个音讯解决的流,蕴含所有的topic,应用默认的Decoder
public Map>> createMessageStreams(Map topicCountMap);

获取指定音讯的topic,并依据指定的Decoder
public List>>createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder keyDecoder, Decoder valueDecoder);

获取指定音讯的topic,应用默认的Decoder
public List> createMessageStreamsByFilter(TopicFilter topicFilter);

提交偏移量到这个消费者连贯的topic
public void commitOffsets();

敞开消费者
public void shutdown();

高层的API中比拟罕用的就是public List> createMessageStreamsByFilter(TopicFilter topicFilter);和public void commitOffsets();

(2)Consumer的简略API–SimpleConsumer

批量获取音讯
public FetchResponse fetch(request: kafka.javaapi.FetchRequest);

获取topic的元信息
public kafka.javaapi.TopicMetadataResponse send(request:kafka.javaapi.TopicMetadataRequest);

获取目前可用的偏移量
public kafka.javaapi.OffsetResponse getOffsetsBefore(request: OffsetRequest);

敞开连贯
public void close();

对于大部分利用来说,高层API就曾经足够应用了,然而若是想做更进一步的管制的话,能够应用简略的API,例如消费者重启的状况下,心愿失去最新的offset,就该应用SimpleConsumer。

零碎环境

Linux Ubuntu 20.04
OpenJDK-11.0.11
kafka_2.13-2.8.0
zookeeper-3.6.3
IntelliJ IDEA 2021.1 (Ultimate Edition)

工作内容
本试验是应用简略Java API来模仿Kafka的producer和consumer,其中producer是通过一个while循环生成内容,而后将内容传递给Kafka,consumer从Kafka中读取内容,并在Console界面中输入。


工作步骤

1.关上Idea,新建一个Java我的项目,将hadoop中的配置文件加到resources中。
2.增加maven依赖

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->    <dependency>        <groupId>org.apache.kafka</groupId>        <artifactId>kafka-clients</artifactId>        <version>1.1.1</version>    </dependency>

3.启动ZooKeeper。切换到/apps/zookeeper/bin目录下,执行ZooKeeper的启动脚本。

cd /apps/zookeeper/bin  ./zkServer.sh start  

查看ZooKeeper的运行状态。

./zkServer.sh status  

4.切换目录到/apps/kafka目录下,启动kafka的server。

cd /apps/kafka  bin/kafka-server-start.sh config/server.properties &  

5.另起一窗口,切换到/apps/kafka下,在kafka中创立topic,命名为dblab01。

cd /apps/kafka  bin/kafka-topics.sh \  --create \  --zookeeper localhost:2181 \  --replication-factor 1 \  --topic dblab01 \  --partitions 1  

查看topic

cd /apps/kafkabin/kafka-topics.sh  --list  --zookeeper  localhost:2181  

6.创立kafka的producer,用于生产数据。在包下,创立Class,命名为MyProducer。

package my.study.kafka;import org.apache.kafka.clients.producer.*;import java.util.Properties;public class MyProducer {    public void produce(){        Properties props = new Properties();        //设置kafka集群的地址        props.put("bootstrap.servers", "localhost:9092");        //ack模式,all是最慢但最平安的        props.put("acks", "all");        //失败重试次数        props.put("retries", 0);        //每个分区未发送音讯总字节大小(单位:字节),超过设置的值就会提交数据到服务端        props.put("batch.size", 16384);        props.put("linger.ms", 1);        //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端        //buffer.memory要大于batch.size,否则会报申请内存不足的谬误        props.put("buffer.memory", 33554432);        //序列化器        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        Producer<String, String> producer = new KafkaProducer<>(props);        for (int i = 0; i < 100; i++)            producer.send(new ProducerRecord<String, String>("dblab01", Integer.toString(i), Integer.toString(i)));        producer.close();    }    public static void main(String[] args) {        new MyProducer().produce();    }}

producer端的代码:首先定义一个topic的名称,而后创立一个properties实例,用来设置produce的参数。接着创立一个producer的实例并将参数配置props作为参数上传进去。在produce办法中定义一个key与data,创立KeyedMessage实例,并将key,data和topic作为参数上传进去,而后把KeyedMessage实例上传给producer。在主函数中间接调用MyProduce的produce()办法,用来实现音讯的上传。