仿照Kafka,从零开始自实现 MQ,实现了 Kafka 中 80% 的根底性能。学习 Kafka 的话如果只是看文章和源码,可能不久就会忘了,还是本人实现一个「精简版」的 Kafka 吧,

实现性能概览

1、基于内存Queue实现生产和生产API

  • [X] 1) 创立内存Queue, 作为底层音讯存储
  • [X] 2) 定义Topic, 反对多个Topic
  • [X] 3) 定义Producer, 反对Send音讯
  • [X] 4) 定义Consumer, 反对Poll音讯

2、设计自定义Queue,实现音讯确认和生产offset

  • [X] 1) 自定义内存Message数组模仿Queue。
  • [X] 2) 应用指针记录以后音讯写入地位。
  • [X] 3) 对于每个命名消费者, 用指针记录生产地位

3、拆分broker和client(包含producer和consumer)

  • [X] 1) 将Queue保留到web server端
  • [X] 2) 设计音讯读写API接口, 确认接口, 提交offset接口
  • [X] 3) producer和consumer通过httpclient拜访Queue
  • [X] 4) 实现音讯确认, offset提交
  • [X] 5) 实现consumer从offset增量拉取

我的项目目录

bitkylin-mq

我的项目设计及我的项目能力

Server

一、Topic

  1. 保护ArrayList用于模仿长久化音讯「起因:音讯须要随机拜访」
  2. 设定音讯队列容量,达到容量时无奈再生产音讯
  3. 以后音讯的最大索引

二、ConsumerGroup

  1. 消费者组由消费者组名和topic名独特决定,即不同topic的消费者组互相独立,不会相互影响
  2. 需依据topic创立消费者组,即消费者组必须关联topic
  3. 消费者组创立后,默认从头残缺生产关联topic的所有音讯
  4. 同一个消费者组内,各个消费者总共生产一次「起码生产一次」所关联topic的所有音讯

三、broker

  1. 一个broker关联一个ConsumerGroup列表和一个Topic列表
  2. 通过broker裸露的接口,能够展现关联ConsumerGroup列表和Topic列表的概览信息
  3. 通过broker裸露的接口,能够向一个topic中生产音讯
  4. 通过broker裸露的接口,能够依据消费者组名和topic名生产音讯

注:本次仅实现单个broker,broker后实现了topic和consumerGroup「消费者组」,细节结构图如下:

client

  1. 客户端通过topic名生产音讯
  2. 客户端依据消费者组名和topic名生产音讯
  3. 客户端生产音讯时,能够同时取得消费者组的offset「偏移量」
  4. 客户端生产音讯胜利后,需手动更新消费者组的offset。若不更新,客户端默认无奈生产前面的音讯。
  5. 客户端生产音讯失败时,不应更新消费者组的offset。此时客户端能够反复生产当条音讯。
  6. 多个客户端能够应用同一个消费者组生产同一个topic;能够应用不同的消费者组生产同一个topic;能够应用不同的消费者组生产不同的topic

客户端工作示意图如下:

我的项目构造

本我的项目共提供四个module:

bitkylin-mq-serverbitkylin-mq-apibitkylin-mq-client-producerbitkylin-mq-client-consumer

各module的介绍如下:

1. bitkylin-mq-server

提供MQ服务端,提供broker以及其关联的ConsumerGroup和Topic等,次要实现如下性能:

  • 展现MQ概览信息,包含topic和ConsumerGroup的详细信息
  • 创立消费者组,创立消费者组后,即可应用该消费者组生产音讯
  • 生产音讯,将音讯发送至指定topic
  • 基于指定消费者组生产音讯,生产音讯但不更新关联消费者组的offset
  • 基于指定消费者组生产音讯,生产音讯且自动更新关联消费者组的offset
  • 手动更新指定消费者组的偏移量

2. bitkylin-mq-api

提供供客户端应用的api,通过feignClient模式提供,客户端可间接应用,执行RPC,以后实现如下性能:

  • 发送音讯至指定topic
  • 订阅指定topic的音讯。主动创立消费者组,应用观察者模式轮询音讯并生产。

3. bitkylin-mq-client-producer

音讯生产客户端,通过feign-api生产音讯,以后实现如下演示性能:
随机向topic名为「topic-1」和「topic-2」的topic中发送音讯,每隔3秒发送一次音讯。

4. bitkylin-mq-client-consumer

音讯生产客户端,通过feign-api生产音讯,以后实现如下演示性能:

  • 创立消费者组「spring-group-1」订阅「topic-1」,并打印订阅的音讯。
  • 创立消费者组「spring-group-2」订阅「topic-2」,并打印订阅的音讯。

代码演示

  1. 运行module「bitkylin-mq-server」,启动MQ的broker,启动音讯服务。
  2. 运行module「bitkylin-mq-client-consumer」和「bitkylin-mq-client-producer」,开启音讯订阅演示工作和音讯发送演示工作。
  3. 此时可通过「bitkylin-mq-client-consumer」的控制台,看到音讯一直被生产。
2021-01-24 01:55:58.008  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:12021-01-24 01:56:00.996  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:22021-01-24 01:56:04.000  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:32021-01-24 01:56:07.004  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:42021-01-24 01:56:10.015  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:52021-01-24 01:56:13.011  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:62021-01-24 01:56:16.011  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:72021-01-24 01:56:19.006  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:82021-01-24 01:56:21.997  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:92021-01-24 01:56:24.994  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:102021-01-24 01:56:28.002  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:112021-01-24 01:56:30.991  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:122021-01-24 01:56:34.014  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:132021-01-24 01:56:37.010  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:142021-01-24 01:56:40.004  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:15
  1. 关上postman,发送如下申请创立专用于postman的消费者组:
POST http://localhost:8080/mq/broker/consumer-group/create{    "groupName": "postman-group-1",    "topicName": "topic-1"}
  1. 发送如下申请即可生产音讯,且主动确认「无需手动更新消费者组的offset」
POST http://localhost:8080/mq/broker/message/simple-pull{    "groupName": "postman-group-1",    "topicName": "topic-1"}

能够发现,postman能够独立生产指定topic的音讯,不受Spring程序生产的影响。当然,postman能够间接应用Spring程序统一的消费者组,以独特生产音讯。

此时查问MQ的概览信息:

GET http://localhost:8080/mq/broker/overview

响应:

{  "groupList": [    {      "groupName": "spring-group-1",      "topic": {        "name": "topic-1",        "capacity": 1000,        "maxIndex": 14      },      "offset": 15    },    {      "groupName": "postman-group-1",      "topic": {        "name": "topic-1",        "capacity": 1000,        "maxIndex": 14      },      "offset": 5    },    {      "groupName": "spring-group-2",      "topic": {        "name": "topic-2",        "capacity": 1000,        "maxIndex": 17      },      "offset": 18    }  ]}

局限性

  1. 每个topic的队列容量是固定的,队列满后回绝生产音讯,暂不反对清理历史音讯。
  2. 音讯生产未加锁,如果一个消费者组的多个消费者高并发生产音讯,可能导致同一条音讯被生产屡次。