共计 3261 个字符,预计需要花费 9 分钟才能阅读完成。
Kafka 学习笔记
Apache Kafka 是一个 分布式 音讯公布订阅零碎。它最后由 LinkedIn 公司基于独特的设计实现为一个分布式的提交日志零碎 (a distributed commit log),之后成为 Apache 我的项目的一部分。Kafka 零碎疾速、可扩大并且可长久化。它的分区个性,可复制和可容错都是其不错的个性
介绍 Kafka 前,先来回顾一下消息中间件(MQ)的常识~~
1. 消息中间件介绍
- 消息中间件相比大家应该不生疏,个别咱们也习惯称之为 MQ。它其实没有很高大上,也是一个基于语言开发出的零碎,例如像:RocketMQ 是阿里巴巴基于 Java 开发的
- 作用有三:利用解耦、异步解决、限流削峰
- 在市面上,罕用的 MQ 有:ActiveMQ,RabbitMQ,RocketMQ,Kafka
MQ 详解及四大 MQ 比拟 – 云 + 社区 – 腾讯云
2. 消息中间件的概念
-
三大外围组件
- Broker:音讯服务器,作为 server 提供音讯外围服务
- Provider:音讯提供者
- Consumer:音讯消费者
(集群模式)
-
两大通信模式
- 公布 - 订阅 模式(也是我工作中所应用的)
- 点对点 模式
Kafka 的概念
-
Partition(分区):后面咱们提到,能够把 Topic 了解为一个 数据汇合,那么一个 Topic 能够分成多个 Partition(区),其中每个区的音讯是有序的
若你须要所有音讯都是有序的,那么你最好只用一个分区。
另外 partition 反对音讯位移读取,音讯位移有消费者本身治理
-
Consumer Group(消费者组):一群消费者的汇合。向 Topic 订阅生产音讯的单位是 Consumers=ConsumerGroup,只不过 Group 能够是一个 Consumer,也能够是多个
在我的微服务项目开发中,个别把 Consumer Group 设置成 以 微服务 为单位
对于此图的具体介绍,能够参考官网翻译:https://scala.cool/2018/03/le…
Kafka 的小 Demo
-
首先第一步须要装置环境啦,我是应用 Centos + docker + Portainer(Docker UI 管理工具)去装置
Kafka 须要装置 Zookeeper,Kafka 和 Kafka Manager(UI 治理页面)
装置也比较简单,步骤可参考:docker 下装置 kafka 和 kafka-manager – 简书 -
在 Kafka Manager 里创立 Cluster(因为 Kafka 是人造分布式的,都是以集群为单位创立和部署的),而后能够去创立一个 Topic
-
配置 Yaml,再别离搭建 Provider 和 Consumer
spring: # Kafka BeanConfig kafka: bootstrap-servers: 192.XXX.XXX.X:9082 # Kafka Broker address # Kafka Producer Config producer: acks: 1 retries: 3 # retry-count key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # Kafka Consumer Config consumer: auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: cn.xxx.kafkademo.message # Kafka Consumer Listener Config listener: missing-topics-fatal: false
@Component public class DemoProducer { @Resource private KafkaTemplate<Object, Object> kafkaTemplate; public SendResult syncSend(Integer id) throws ExecutionException, InterruptedException { // 创立 DemoMessage 音讯 DemoMessage message = new DemoMessage(); message.setId(id); // 同步发送音讯 return kafkaTemplate.send(DemoMessage.TOPIC, message).get();} }
@Component public class DemoConsumer {private Logger logger = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = DemoMessage.TOPIC, groupId = "demo-consumer-group-" + DemoMessage.TOPIC) public void onMessage(DemoMessagemessage) {logger.info("[onMessage][线程编号:{} 音讯内容:{}]", Thread.currentThread().getId(), message); } }
-
而后再模仿 Provider 往 Topic 里发 msg,此时 Consumer 应该能够收到 msg
Provider 发送音讯 log
[testSyncSend][发送编号:[1650810788] 发送后果:[SendResult [producerRecord=ProducerRecord(topic=KAFKA_DEMO_01, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 110, 46, 105, 111, 99, 111, 100, 101, 114, 46, 115, 112, 114, 105, 110, 103,98, 111, 111, 116, 46, 108, 97, 98, 48, 51, 46, 107, 97, 102, 107, 97, 100, 101, 109, 111, 46, 109, 101, 115, 115, 97, 103, 101, 46, 68, 101, 109, 111, 48, 49, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=Demo01Message{id=1650810788}, timestamp=null), recordMetadata=KAFKA_DEMO_01-0@1]]] [onMessage][线程编号:21 音讯内容:DemoMessage{id=1650810788}]
Consumer 生产音讯 log
[onMessage][线程编号:77 音讯内容:ConsumerRecord(topic = KAFKA_DEMO_01, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1650810788610, serialized key size = -1, serialized value size = 17, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Demo01Message{id=1650810788})] [onMessage][线程编号:79 音讯内容:DemoMessage{id=1650810788}]
- 最初看回 Kafka Manager,能够看到 Topic 的信息