共计 11220 个字符,预计需要花费 29 分钟才能阅读完成。
一、创立 maven 工程并增加 jar 包
创立 maven 工程并增加以下依赖 jar 包的坐标到 pom.xml
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
二、生产者代码
1、应用生产者,生产数据
/**
* 订单的生产者代码,*/
public class OrderProducer {public static void main(String[] args) throws InterruptedException {
/* 1、连贯集群,通过配置文件的形式
* 2、发送数据 -topic:order,value
*/
Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092"); props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>
(props);
for (int i = 0; i < 1000; i++) {
// 发送数据 , 须要一个 producerRecord 对象, 起码参数 String topic, V value kafkaProducer.send(new ProducerRecord<String, String>("order", " 订单信
息!"+i));
Thread.sleep(100);
}
}
}
2、kafka 当中的数据分区
kafka 生产者发送的音讯,都是保留在 broker 当中,咱们能够自定义分区规定,决定音讯发送到哪个 partition 外面去进行保留
查看 ProducerRecord 这个类的源码,就能够看到 kafka 的各种不同分区策略
kafka 当中反对以下四种数据的分区形式:
第一种分区策略,如果既没有指定分区号,也没有指定数据 key,那么就会应用轮询的形式将数据平均的发送到不同的分区外面去
//ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("mypartition", "mymessage" + i);
//kafkaProducer.send(producerRecord1);
第二种分区策略 如果没有指定分区号,指定了数据 key,通过 key.hashCode % numPartitions 来计算数据到底会保留在哪一个分区外面
// 留神:如果数据 key,没有变动 key.hashCode % numPartitions = 固定值 所有的数据都会写入到某一个分区外面去
//ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("mypartition", "mykey", "mymessage" + i);
//kafkaProducer.send(producerRecord2);
第三种分区策略:如果指定了分区号,那么就会将数据间接写入到对应的分区外面去
// ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>("mypartition", 0, "mykey", "mymessage" + i);
// kafkaProducer.send(producerRecord3);
第四种分区策略:自定义分区策略。如果不自定义分区规定,那么会将数据应用轮询的形式平均的发送到各个分区外面去
kafkaProducer.send(new ProducerRecord<String, String>("mypartition","mymessage"+i));
自定义分区策略
public class KafkaCustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) { }
@Override
public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionNum = partitions.size();
Random random = new Random();
int partition = random.nextInt(partitionNum);
return partition;
}
@Override
public void close() {}
}
主代码中增加配置
@Test
public void kafkaProducer() throws Exception {
//1、筹备配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("partitioner.class", "cn.itcast.kafka.partitioner.KafkaCustomPartitioner");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2、创立 KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i=0;i<100;i++){
//3、发送数据
kafkaProducer.send(new ProducerRecord<String, String>("testpart","0","value"+i));
}
kafkaProducer.close();}
三、消费者代码
生产必要条件
消费者要从 kafka Cluster 进行生产数据,必要条件有以下四个
1、地址
bootstrap.servers=node01:9092
2、序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
3、主题(topic)须要制订具体的某个 topic(order)即可。
4、消费者组 group.id=test
1、主动提交 offset
生产实现之后,主动提交 offset
/**
* 生产订单数据 --- javaben.tojson
*/
public class OrderConsumer {public static void main(String[] args) {
// 1\ 连贯集群
Properties props = new Properties(); props.put("bootstrap.servers", "hadoop-01:9092"); props.put("group.id", "test");
// 以下两行代码 --- 消费者主动提交 offset 值
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>
(props);
// 2、发送数据 发送数据须要,订阅下要生产的 topic。order kafkaConsumer.subscribe(Arrays.asList("order"));
while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);// jdk queue offer 插入、poll 获取元素。blockingqueue put 插入原生,take 获取元素
for (ConsumerRecord<String, String> record : consumerRecords) {System.out.println("生产的数据为:" + record.value());
}
}
}
}
2、手动提交 offset
如果 Consumer 在获取数据后,须要退出解决,数据结束后才确认 offset,须要程序来管制 offset 的确认?敞开主动提交确认选项
props.put("enable.auto.commit", "false");
手动提交 o?set 值
kafkaConsumer.commitSync();
残缺代码如下所示:Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 敞开主动提交确认选项
props.put("enable.auto.commit", "false");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {buffer.add(record);
}
if (buffer.size() >= minBatchSize) {insertIntoDb(buffer);
// 手动提交 offset 值
consumer.commitSync();
buffer.clear();}
}
3、生产完每个分区之后手动提交 offset
下面的示例应用 commitSync 将所有已接管的记录标记为已提交。大数据培训在某些状况下,您可能心愿通过明确指定偏移量 来更好地管制已提交的记录。在上面的示例中,在实现解决每个分区中的记录后提交偏移量。
try {while(running) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ":" + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() -1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}
finally {consumer.close();
注意事项:
提交的偏移量应始终是应用程序将读取的下一条音讯的偏移量。因而,在调用 commitSync(偏移量)时,应该 在最初解决的音讯的偏移量中增加一个
4、指定分区数据进行生产
1、如果过程正在保护与该分区关联的某种本地状态(如本地磁盘上的键值存储),那么它应该只获取它在磁盘上 保护的分区的记录。
2、如果过程自身具备高可用性,并且如果失败则将重新启动(可能应用 YARN,Mesos 或 AWS 工具等集群治理框 架,或作为流解决框架的一部分)。在这种状况下,Kafka 不须要检测故障并重新分配分区,因为耗费过程将在另 一台机器上重新启动。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//consumer.subscribe(Arrays.asList("foo", "bar"));
// 手动指定生产指定分区的数据 ---start
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
// 手动指定生产指定分区的数据 ---end
while (true) {ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
注意事项:
1、要应用此模式,您只需应用要应用的分区的残缺列表调用 assign(Collection),而不是应用 subscribe 订阅 主题。
2、主题与分区订阅只能二选一
5、反复生产与数据失落
曾经生产的数据对于 kafka 来说,会将生产组外面的 offset 值进行批改,那什么时候进行批改了?是在数据生产 实现之后,比方在控制台打印完后主动提交;
提交过程:是通过 kafka 将 offset 进行挪动到下个 message 所处的 offset 的地位。
拿到数据后,存储到 hbase 中或者 mysql 中,如果 hbase 或者 mysql 在这个时候连贯不上,就会抛出异样,如果在解决数据的时候曾经进行了提交,那么 kafka 伤的 offset 值曾经进行了批改了,然而 hbase 或者 mysql 中没有数据,这个时候就会呈现数据失落。
什么时候提交 offset 值?在 Consumer 将数据处理实现之后,再来进行 offset 的批改提交。默认状况下 offset 是 主动提交,须要批改为手动提交 offset 值。
如果在解决代码中失常解决了,然而在提交 offset 申请的时候,没有连贯到 kafka 或者呈现了故障,那么该次修 改 offset 的申请是失败的,那么下次在进行读取同一个分区中的数据时,会从曾经解决掉的 offset 值再进行解决一 次,那么在 hbase 中或者 mysql 中就会产生两条一样的数据,也就是数据反复
6、consumer 消费者生产数据流程
流程形容
Consumer 连贯指定的 Topic partition 所在 leader broker,采纳 pull 形式从 kafkalogs 中获取音讯。对于不同的生产模式,会将 offset 保留在不同的中央
官网对于 high level API 以及 low level API 的简介
http://kafka.apache.org/0100/…
高阶 API(High Level API)
kafka 消费者高阶 API 简略;暗藏 Consumer 与 Broker 细节;相干信息保留在 zookeeper 中。
/* create a connection to the cluster */
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
This method is used to get a list of KafkaStreams, which are iterators over
MessageAndMetadata objects from which you can obtain messages and their
associated metadata (currently only topic).
Input: a map of <topic, #streams>
Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
You can also obtain a list of KafkaStreams, that iterate over messages
from topics that match a TopicFilter. (A TopicFilter encapsulates a
whitelist or a blacklist which is a standard Java regex.)
*/
public List<KafkaStream> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
/* Commit the offsets of all messages consumed so far. */ public commitOffsets()
/* Shut down the connector */ public shutdown()}
阐明:大部分的操作都曾经封装好了,比方:以后生产到哪个地位下了,然而不够灵便(工作过程举荐应用)
低级 API(Low Level API)
kafka 消费者低级 API 非常灵活;须要本人负责保护连贯 Controller Broker。保留 offset,Consumer Partition 对应 关系。
class SimpleConsumer {
/* Send fetch request to a broker and get back a set of messages. */
public ByteBufferMessageSet fetch(FetchRequest request);
/* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
Get a list of valid offsets (up to maxSize) before the given time.
The result is a list of offsets, in descending order.
@param time: time in millisecs,
if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest
offset available. if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest
available. public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
* offset
*/
阐明:没有进行包装,所有的操作有用户决定,如本人的保留某一个分区下的记录,你以后生产到哪个地位。
四、kafka Streams API 开发
需要:应用 StreamAPI 获取 test 这个 topic 当中的数据,而后将数据全副转为大写,写入到 test2 这个 topic 当中去
第一步:创立一个 topic
node01 服务器应用以下命令来常见一个 topic 名称为 test2
cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
第二步:开发 StreamAPI
public class StreamAPI {public static void main(String[] args) {Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
builder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();}
}
第三步:生产数据
node01 执行以下命令,向 test 这个 topic 当中生产数据
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
第四步:生产数据
node02 执行一下命令生产 test2 这个 topic 当中的数据
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-consumer.sh –from-beginning –topic test2 –zookeeper node01:2181,node02:2181,node03:2181