仿照Kafka,从零开始自实现 MQ,实现了 Kafka 中 80% 的根底性能。学习 Kafka 的话如果只是看文章和源码,可能不久就会忘了,还是本人实现一个「精简版」的 Kafka 吧,
实现性能概览
1、基于内存Queue实现生产和生产API
- [X] 1) 创立内存Queue, 作为底层音讯存储
- [X] 2) 定义Topic, 反对多个Topic
- [X] 3) 定义Producer, 反对Send音讯
- [X] 4) 定义Consumer, 反对Poll音讯
2、设计自定义Queue,实现音讯确认和生产offset
- [X] 1) 自定义内存Message数组模仿Queue。
- [X] 2) 应用指针记录以后音讯写入地位。
- [X] 3) 对于每个命名消费者, 用指针记录生产地位
3、拆分broker和client(包含producer和consumer)
- [X] 1) 将Queue保留到web server端
- [X] 2) 设计音讯读写API接口, 确认接口, 提交offset接口
- [X] 3) producer和consumer通过httpclient拜访Queue
- [X] 4) 实现音讯确认, offset提交
- [X] 5) 实现consumer从offset增量拉取
我的项目目录
bitkylin-mq
我的项目设计及我的项目能力
Server
一、Topic
- 保护ArrayList用于模仿长久化音讯「起因:音讯须要随机拜访」
- 设定音讯队列容量,达到容量时无奈再生产音讯
- 以后音讯的最大索引
二、ConsumerGroup
- 消费者组由消费者组名和topic名独特决定,即不同topic的消费者组互相独立,不会相互影响
- 需依据topic创立消费者组,即消费者组必须关联topic
- 消费者组创立后,默认从头残缺生产关联topic的所有音讯
- 同一个消费者组内,各个消费者总共生产一次「起码生产一次」所关联topic的所有音讯
三、broker
- 一个broker关联一个ConsumerGroup列表和一个Topic列表
- 通过broker裸露的接口,能够展现关联ConsumerGroup列表和Topic列表的概览信息
- 通过broker裸露的接口,能够向一个topic中生产音讯
- 通过broker裸露的接口,能够依据消费者组名和topic名生产音讯
注:本次仅实现单个broker,broker后实现了topic和consumerGroup「消费者组」,细节结构图如下:
client
- 客户端通过topic名生产音讯
- 客户端依据消费者组名和topic名生产音讯
- 客户端生产音讯时,能够同时取得消费者组的offset「偏移量」
- 客户端生产音讯胜利后,需手动更新消费者组的offset。若不更新,客户端默认无奈生产前面的音讯。
- 客户端生产音讯失败时,不应更新消费者组的offset。此时客户端能够反复生产当条音讯。
- 多个客户端能够应用同一个消费者组生产同一个topic;能够应用不同的消费者组生产同一个topic;能够应用不同的消费者组生产不同的topic
客户端工作示意图如下:
我的项目构造
本我的项目共提供四个module:
bitkylin-mq-serverbitkylin-mq-apibitkylin-mq-client-producerbitkylin-mq-client-consumer
各module的介绍如下:
1. bitkylin-mq-server
提供MQ服务端,提供broker以及其关联的ConsumerGroup和Topic等,次要实现如下性能:
- 展现MQ概览信息,包含topic和ConsumerGroup的详细信息
- 创立消费者组,创立消费者组后,即可应用该消费者组生产音讯
- 生产音讯,将音讯发送至指定topic
- 基于指定消费者组生产音讯,生产音讯但不更新关联消费者组的offset
- 基于指定消费者组生产音讯,生产音讯且自动更新关联消费者组的offset
- 手动更新指定消费者组的偏移量
2. bitkylin-mq-api
提供供客户端应用的api,通过feignClient模式提供,客户端可间接应用,执行RPC,以后实现如下性能:
- 发送音讯至指定topic
- 订阅指定topic的音讯。主动创立消费者组,应用观察者模式轮询音讯并生产。
3. bitkylin-mq-client-producer
音讯生产客户端,通过feign-api生产音讯,以后实现如下演示性能:
随机向topic名为「topic-1」和「topic-2」的topic中发送音讯,每隔3秒发送一次音讯。
4. bitkylin-mq-client-consumer
音讯生产客户端,通过feign-api生产音讯,以后实现如下演示性能:
- 创立消费者组「spring-group-1」订阅「topic-1」,并打印订阅的音讯。
- 创立消费者组「spring-group-2」订阅「topic-2」,并打印订阅的音讯。
代码演示
- 运行module「bitkylin-mq-server」,启动MQ的broker,启动音讯服务。
- 运行module「bitkylin-mq-client-consumer」和「bitkylin-mq-client-producer」,开启音讯订阅演示工作和音讯发送演示工作。
- 此时可通过「bitkylin-mq-client-consumer」的控制台,看到音讯一直被生产。
2021-01-24 01:55:58.008 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:12021-01-24 01:56:00.996 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:22021-01-24 01:56:04.000 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:32021-01-24 01:56:07.004 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:42021-01-24 01:56:10.015 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:52021-01-24 01:56:13.011 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:62021-01-24 01:56:16.011 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:72021-01-24 01:56:19.006 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:82021-01-24 01:56:21.997 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:92021-01-24 01:56:24.994 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:102021-01-24 01:56:28.002 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:112021-01-24 01:56:30.991 INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:122021-01-24 01:56:34.014 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:132021-01-24 01:56:37.010 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:142021-01-24 01:56:40.004 INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:15
- 关上postman,发送如下申请创立专用于postman的消费者组:
POST http://localhost:8080/mq/broker/consumer-group/create{ "groupName": "postman-group-1", "topicName": "topic-1"}
- 发送如下申请即可生产音讯,且主动确认「无需手动更新消费者组的offset」
POST http://localhost:8080/mq/broker/message/simple-pull{ "groupName": "postman-group-1", "topicName": "topic-1"}
能够发现,postman能够独立生产指定topic的音讯,不受Spring程序生产的影响。当然,postman能够间接应用Spring程序统一的消费者组,以独特生产音讯。
此时查问MQ的概览信息:
GET http://localhost:8080/mq/broker/overview
响应:
{ "groupList": [ { "groupName": "spring-group-1", "topic": { "name": "topic-1", "capacity": 1000, "maxIndex": 14 }, "offset": 15 }, { "groupName": "postman-group-1", "topic": { "name": "topic-1", "capacity": 1000, "maxIndex": 14 }, "offset": 5 }, { "groupName": "spring-group-2", "topic": { "name": "topic-2", "capacity": 1000, "maxIndex": 17 }, "offset": 18 } ]}
局限性
- 每个topic的队列容量是固定的,队列满后回绝生产音讯,暂不反对清理历史音讯。
- 音讯生产未加锁,如果一个消费者组的多个消费者高并发生产音讯,可能导致同一条音讯被生产屡次。