共计 4670 个字符,预计需要花费 12 分钟才能阅读完成。
一、Kafka 的 Producer
Producer 就是用于向 Kafka 发送数据。如下:
1、增加依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
2、发送音讯
2.1 启动 kafka(Kafka shell 命令)
[root@hadoop001 ~]# kafka-server-start.sh -daemon /opt/lns/server.properties
2.2 创立 topic:kb09two,3 个分区,每个分区有 1 个正本
[root@hadoop001 ~]# kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic kb09two --partitions 3 --replication-factor 1
[root@hadoop001 ~]# kafka-topics.sh --zookeeper 192.168.247.201:2181 --list
2.3 创立生产者(Java 代码)
public class MyProducer {public static void main(String[] args) {Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
prop.put(ProducerConfig.ACKS_CONFIG,"-1");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
for (int i = 0; i < 200; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kb09two", "hello world" + i);
producer.send(producerRecord);
try {Thread.sleep(100);
} catch (Exception e) {e.printStackTrace();
}
}
System.out.println("game over");
}
}
执行 Java 代码,发送音讯
2.4 查看音讯队列中每个分区中的数量(Kafka shell 命令)
[root@hadoop001 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.247.201:9092 --topic kb09two --time -1 --offsets 1
2.5 生产音讯(Kafka shell 命令)
[root@hadoop001 ~]# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kb09two --from-beginning
补充内容
kafka 客户端 producer 配置参数
二、Consumer 概要
consumer 中的要害术语:
消费者 (consumer):从 kafka 中拉取数据并进行解决
消费者组 (consumer group):一个消费者组由一个或者多个 consumer 实例组成
位移 (offset):记录以后分区生产数据的地位
位移提交 (offset commit):将生产实现的音讯的最大 offset 提交确认
位移 topic(_consumer_offset):保留生产位移的 topic
1. 生产形式:(poll/push)
Kafka Consumer 采纳的是被动拉取 broker 数据进行生产的。个别消息中间件存在推送 (server 推送数据给 consumer) 和拉取 (consumer 被动取服务器取数据) 两种形式,这两种形式各有优劣。
如果是抉择推送的形式最大的妨碍就是服务器不分明 consumer 的生产速度,如果 consumer 中执行的操作又是比拟耗时的,那么 consumer 可能会不堪重负, 甚至会导致系统挂掉。
而采纳拉取的形式则能够解决这种状况,consumer 依据本人的状态来拉取数据, 能够对服务器的数据进行提早解决。然而这种形式也有一个劣势就是服务器没有数据的时候可能会始终轮询,不过还好 Kafka 在 poll()有参数容许消费者申请在“长轮询”中阻塞,期待数据达到(并且可选地期待直到给定数量的字节可用以确保传输大小)。
2.Consumer 罕用参数阐明
3.Consumer 程序开发
构建 Consumer
Consumer 有三种生产交付语义
1、至多一次:音讯不会失落,但可能被反复解决(实现简略)
2、最多一次:音讯可能失落可能会被解决,但最多只会被解决一次(实现简略)
3、准确一次:音讯被解决并且只会被解决一次(比拟难实现)
一个消费者组 G1 里只有一个消费者(单线程)
public class MyConsumer {public static void main(String[] args) {Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 音讯 key 反序列化器
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);// 音讯 value 反序列化器
prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); // 消费者和群组协调器的最大心跳工夫
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 设置手动提交形式
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); // 主动提交的工夫
// 当消费者读取偏移量有效的状况下,须要重置生产起始地位,默认为 latest(从消费者启动后生成的记录),另外一个选项值是 earliest,将从无效的最小位移地位开始生产
//(1)earliest,会从该分区以后最开始的 offset 音讯开始生产(即从头生产),如果最开始的音讯 offset 是 0,那么消费者的 offset 就会被更新为 0.
//(2)latest,只生产以后消费者启动实现后生产者新生产的数据。旧数据不会再生产。offset 被重置为分区的 HW。//(3)none,启动消费者时,该消费者所生产的主题的分区没有被生产过,就会抛异样。prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1"); // 标识一个 consumer 组的名称
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("kb09two"));
// 一个消费者组 G1 里只有一个消费者(单线程)while (true){ConsumerRecords<String, String> poll = consumer.poll(100);
for (ConsumerRecord<String, String> record : poll) {System.out.println(record.offset()+"t"+record.key()+"t"+record.value());
}
}
模仿多消费者在同一个生产组 G2(多线程)
public class MyConsumer {public static void main(String[] args) {Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 音讯 key 反序列化器
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);// 音讯 value 反序列化器
prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); // 消费者和群组协调器的最大心跳工夫
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 设置手动提交形式
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); // 主动提交的工夫
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G2");
// 模仿多消费者在同一个生产组 G2(多线程)for (int i = 0; i < 4; i++) {new Thread(new Runnable() {
@Override
public void run() {KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("kb09two"));
while (true){ConsumerRecords<String, String> poll = consumer.poll(100);
for (ConsumerRecord<String, String> record : poll) {System.out.println(Thread.currentThread().getName()+"t"+
record.offset()+"t"+ record.key()+"t"+ record.value());
}
}
}
}).start();}
}
}