共计 3136 个字符,预计需要花费 8 分钟才能阅读完成。
简介
kafka 是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点,并已在成千上万家公司运行。
目标
了解 kafka 的基本原理
掌握 kafka 的基本操作
kafka 的深度探究在另一篇文章。
相关概念
producer:生产者,就是它来生产“叉烧包”的饭堂阿姨。consumer:消费者,生产出来的“叉烧包”它来消费。topic:你把它理解为标签,生产者每生产出来一个叉烧包就贴上一个标签(topic),消费者可不是谁生产的“叉烧包”都吃的,这样不同的生产者生产出来的“叉烧包”,消费者就可以选择性的“吃”了。broker:就是蒸笼了。
所以整个过程可以如下形象的说明:
饭堂阿姨制作一个叉烧包,消费者就消费一个叉烧包。1. 假设消费者消费叉烧包的时候噎住了(系统宕机了),生产者还在生产叉烧包,那新生产的叉烧包就丢失了。2. 再比如生产者很强劲(大交易量的情况),生产者 1 秒钟生产 100 个叉烧包,消费者 1 秒钟只能吃 50 个叉烧包,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”叉烧包“又丢失了。3. 这个时候我们放个篮子在它们中间,生产出来的叉烧包都放到篮子里,消费者去篮子里拿叉烧包,这样叉烧包就不会丢失了,都在篮子里,而这个篮子就是”kafka“。4. 叉烧包其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是 tcp、http 什么的),也称为报文,也叫“消息”。5. 消息队列满了,其实就是篮子满了,”叉烧包“放不下了,那赶紧多放几个篮子,其实就是 kafka 的扩容。所以说 kafka == 篮子。
安装
1. 由于 kafka 需要 zookeeper 的。所以您可以参考【谈谈 zookeeper】2.kafka 安装 2.1 下载地址:http://mirror.bit.edu.cn/apac…2.2 配置:(注:KAFKA_HOME 为你配置的环境变量。hadoop01 为你配置 hosts)编辑 $KAFKA_HOME/config/ 下的 server.properties 文件 server.properties
broker.id=0
#listeners=PLAINTEXT://:9092
log.dirs=/root/app/tmp/kafkalog
num.partitions=1
zookeeper.connect=hadoop01:2181
2.3 多 broker 的 kafka 安装配置
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
常用操作命令
启动 kafka
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
创建 topic
bin/kafka-topics.sh –create –zookeeper hadoop01:2181 –replication-factor 1 –partitions 1 –topic hello_topic
查看 topic
./kafka-topics.sh –list –zookeeper hadoop01:2181
查看指定 topic 的详细信息
kafka-topics.sh –describe –zookeeper hadoop01:2181
生产消息
./kafka-console-producer.sh –broker-list hadoop01:9092 –topic hello_topic
消费消息
./kafka-console-consumer.sh –bootstrap-server hadoop01:9092 –topic hello_topic –from-beginning
0.9.0 版本的用下面的命令
./kafka-console-consumer.sh –zookeeper hadoop01:2181 –topic hello_topic –from-beginning
解析:–from-beginning:是从 producer 开始的位置开始拿数据的。
Springboot 操作 kafka
特别注意(巨坑):kafka 有很多版本的。各版本对应使用的 springboot 或者 jar 是不一样。请参考 spring 官网的说明:http://spring.io/projects/spr…
本文使用的是 springboot1.5 系列 +0.10.0.x 的 pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
生产者代码主要是向 kafka 服务发送消息(生产消息)。
/**
* 测试 kafka 生产者
*/
@RestController
@RequestMapping(“kafka”)
public class TestKafkaProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping(“send”)
public String send(String msg){
kafkaTemplate.send(“hello_topic”, msg);
return “success”;
}
}
消费者代码从主题(topic)中获取消息进行消费。
/**
* kafka 消费者测试
*/
@Component
public class TestConsumer {
@KafkaListener(topics = “hello_topic”)
public void listen (ConsumerRecord<?, ?> record) throws Exception {
System.out.printf(“topic = %s, offset = %d, value = %s \n”, record.topic(), record.offset(), record.value());
}
}
yml 配置文件主要是配置 kafka 的服务地址。
spring:
kafka:
bootstrap-servers: 120.79.xxx.x:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
最后
本人水平有限,欢迎各位建议以及指正。顺便关注一下公众号呗,会经常更新文章的哦。