仿照 Kafka,从零开始自实现 MQ,实现了 Kafka 中 80% 的根底性能。学习 Kafka 的话如果只是看文章和源码,可能不久就会忘了,还是本人实现一个「精简版」的 Kafka 吧,
实现性能概览
1、基于内存 Queue 实现生产和生产 API
- [X] 1)创立内存 Queue,作为底层音讯存储
- [X] 2)定义 Topic,反对多个 Topic
- [X] 3)定义 Producer,反对 Send 音讯
- [X] 4)定义 Consumer,反对 Poll 音讯
2、设计自定义 Queue,实现音讯确认和生产 offset
- [X] 1)自定义内存 Message 数组模仿 Queue。
- [X] 2)应用指针记录以后音讯写入地位。
- [X] 3)对于每个命名消费者,用指针记录生产地位
3、拆分 broker 和 client(包含 producer 和 consumer)
- [X] 1)将 Queue 保留到 web server 端
- [X] 2)设计音讯读写 API 接口,确认接口,提交 offset 接口
- [X] 3)producer 和 consumer 通过 httpclient 拜访 Queue
- [X] 4)实现音讯确认,offset 提交
- [X] 5)实现 consumer 从 offset 增量拉取
我的项目目录
bitkylin-mq
我的项目设计及我的项目能力
Server
一、Topic
- 保护 ArrayList 用于模仿长久化音讯「起因:音讯须要随机拜访」
- 设定音讯队列容量,达到容量时无奈再生产音讯
- 以后音讯的最大索引
二、ConsumerGroup
- 消费者组由消费者组名和 topic 名独特决定,即不同 topic 的消费者组互相独立,不会相互影响
- 需依据 topic 创立消费者组,即消费者组必须关联 topic
- 消费者组创立后,默认从头残缺生产关联 topic 的所有音讯
- 同一个消费者组内,各个消费者总共生产一次「起码生产一次」所关联 topic 的所有音讯
三、broker
- 一个 broker 关联一个 ConsumerGroup 列表和一个 Topic 列表
- 通过 broker 裸露的接口,能够展现关联 ConsumerGroup 列表和 Topic 列表的概览信息
- 通过 broker 裸露的接口,能够向一个 topic 中生产音讯
- 通过 broker 裸露的接口,能够依据消费者组名和 topic 名生产音讯
注:本次仅实现单个 broker,broker 后实现了 topic 和 consumerGroup「消费者组」,细节结构图如下:
client
- 客户端通过 topic 名生产音讯
- 客户端依据消费者组名和 topic 名生产音讯
- 客户端生产音讯时,能够同时取得消费者组的 offset「偏移量」
- 客户端生产音讯胜利后,需手动更新消费者组的 offset。若不更新,客户端默认无奈生产前面的音讯。
- 客户端生产音讯失败时,不应更新消费者组的 offset。此时客户端能够反复生产当条音讯。
- 多个客户端能够应用同一个消费者组生产同一个 topic;能够应用不同的消费者组生产同一个 topic;能够应用不同的消费者组生产不同的 topic
客户端工作示意图如下:
我的项目构造
本我的项目共提供四个 module:
bitkylin-mq-server
bitkylin-mq-api
bitkylin-mq-client-producer
bitkylin-mq-client-consumer
各 module 的介绍如下:
1. bitkylin-mq-server
提供 MQ 服务端,提供 broker 以及其关联的 ConsumerGroup 和 Topic 等,次要实现如下性能:
- 展现 MQ 概览信息,包含 topic 和 ConsumerGroup 的详细信息
- 创立消费者组,创立消费者组后,即可应用该消费者组生产音讯
- 生产音讯,将音讯发送至指定 topic
- 基于指定消费者组生产音讯,生产音讯但不更新关联消费者组的 offset
- 基于指定消费者组生产音讯,生产音讯且自动更新关联消费者组的 offset
- 手动更新指定消费者组的偏移量
2. bitkylin-mq-api
提供供客户端应用的 api,通过 feignClient 模式提供,客户端可间接应用,执行 RPC,以后实现如下性能:
- 发送音讯至指定 topic
- 订阅指定 topic 的音讯。主动创立消费者组,应用观察者模式轮询音讯并生产。
3. bitkylin-mq-client-producer
音讯生产客户端,通过 feign-api 生产音讯,以后实现如下演示性能:
随机向 topic 名为「topic-1」和「topic-2」的 topic 中发送音讯,每隔 3 秒发送一次音讯。
4. bitkylin-mq-client-consumer
音讯生产客户端,通过 feign-api 生产音讯,以后实现如下演示性能:
- 创立消费者组「spring-group-1」订阅「topic-1」,并打印订阅的音讯。
- 创立消费者组「spring-group-2」订阅「topic-2」,并打印订阅的音讯。
代码演示
- 运行 module「bitkylin-mq-server」,启动 MQ 的 broker,启动音讯服务。
- 运行 module「bitkylin-mq-client-consumer」和「bitkylin-mq-client-producer」,开启音讯订阅演示工作和音讯发送演示工作。
- 此时可通过「bitkylin-mq-client-consumer」的控制台,看到音讯一直被生产。
2021-01-24 01:55:58.008 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:1
2021-01-24 01:56:00.996 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:2
2021-01-24 01:56:04.000 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:3
2021-01-24 01:56:07.004 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:4
2021-01-24 01:56:10.015 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:5
2021-01-24 01:56:13.011 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:6
2021-01-24 01:56:16.011 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:7
2021-01-24 01:56:19.006 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:8
2021-01-24 01:56:21.997 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:9
2021-01-24 01:56:24.994 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:10
2021-01-24 01:56:28.002 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:11
2021-01-24 01:56:30.991 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:12
2021-01-24 01:56:34.014 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:13
2021-01-24 01:56:37.010 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:14
2021-01-24 01:56:40.004 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:15
- 关上 postman,发送如下申请创立专用于 postman 的消费者组:
POST http://localhost:8080/mq/broker/consumer-group/create
{
"groupName": "postman-group-1",
"topicName": "topic-1"
}
- 发送如下申请即可生产音讯,且主动确认「无需手动更新消费者组的 offset」
POST http://localhost:8080/mq/broker/message/simple-pull
{
"groupName": "postman-group-1",
"topicName": "topic-1"
}
能够发现,postman 能够独立生产指定 topic 的音讯,不受 Spring 程序生产的影响。当然,postman 能够间接应用 Spring 程序统一的消费者组,以独特生产音讯。
此时查问 MQ 的概览信息:
GET http://localhost:8080/mq/broker/overview
响应:
{
"groupList": [
{
"groupName": "spring-group-1",
"topic": {
"name": "topic-1",
"capacity": 1000,
"maxIndex": 14
},
"offset": 15
},
{
"groupName": "postman-group-1",
"topic": {
"name": "topic-1",
"capacity": 1000,
"maxIndex": 14
},
"offset": 5
},
{
"groupName": "spring-group-2",
"topic": {
"name": "topic-2",
"capacity": 1000,
"maxIndex": 17
},
"offset": 18
}
]
}
局限性
- 每个 topic 的队列容量是固定的,队列满后回绝生产音讯,暂不反对清理历史音讯。
- 音讯生产未加锁,如果一个消费者组的多个消费者高并发生产音讯,可能导致同一条音讯被生产屡次。