关于消息队列:仿照Kafka从零开始自实现-MQ

27次阅读

共计 4847 个字符,预计需要花费 13 分钟才能阅读完成。

仿照 Kafka,从零开始自实现 MQ,实现了 Kafka 中 80% 的根底性能。学习 Kafka 的话如果只是看文章和源码,可能不久就会忘了,还是本人实现一个「精简版」的 Kafka 吧,

实现性能概览

1、基于内存 Queue 实现生产和生产 API

  • [X] 1)创立内存 Queue,作为底层音讯存储
  • [X] 2)定义 Topic,反对多个 Topic
  • [X] 3)定义 Producer,反对 Send 音讯
  • [X] 4)定义 Consumer,反对 Poll 音讯

2、设计自定义 Queue,实现音讯确认和生产 offset

  • [X] 1)自定义内存 Message 数组模仿 Queue。
  • [X] 2)应用指针记录以后音讯写入地位。
  • [X] 3)对于每个命名消费者,用指针记录生产地位

3、拆分 broker 和 client(包含 producer 和 consumer)

  • [X] 1)将 Queue 保留到 web server 端
  • [X] 2)设计音讯读写 API 接口,确认接口,提交 offset 接口
  • [X] 3)producer 和 consumer 通过 httpclient 拜访 Queue
  • [X] 4)实现音讯确认,offset 提交
  • [X] 5)实现 consumer 从 offset 增量拉取

我的项目目录

bitkylin-mq

我的项目设计及我的项目能力

Server

一、Topic

  1. 保护 ArrayList 用于模仿长久化音讯「起因:音讯须要随机拜访」
  2. 设定音讯队列容量,达到容量时无奈再生产音讯
  3. 以后音讯的最大索引

二、ConsumerGroup

  1. 消费者组由消费者组名和 topic 名独特决定,即不同 topic 的消费者组互相独立,不会相互影响
  2. 需依据 topic 创立消费者组,即消费者组必须关联 topic
  3. 消费者组创立后,默认从头残缺生产关联 topic 的所有音讯
  4. 同一个消费者组内,各个消费者总共生产一次「起码生产一次」所关联 topic 的所有音讯

三、broker

  1. 一个 broker 关联一个 ConsumerGroup 列表和一个 Topic 列表
  2. 通过 broker 裸露的接口,能够展现关联 ConsumerGroup 列表和 Topic 列表的概览信息
  3. 通过 broker 裸露的接口,能够向一个 topic 中生产音讯
  4. 通过 broker 裸露的接口,能够依据消费者组名和 topic 名生产音讯

注:本次仅实现单个 broker,broker 后实现了 topic 和 consumerGroup「消费者组」,细节结构图如下:

client

  1. 客户端通过 topic 名生产音讯
  2. 客户端依据消费者组名和 topic 名生产音讯
  3. 客户端生产音讯时,能够同时取得消费者组的 offset「偏移量」
  4. 客户端生产音讯胜利后,需手动更新消费者组的 offset。若不更新,客户端默认无奈生产前面的音讯。
  5. 客户端生产音讯失败时,不应更新消费者组的 offset。此时客户端能够反复生产当条音讯。
  6. 多个客户端能够应用同一个消费者组生产同一个 topic;能够应用不同的消费者组生产同一个 topic;能够应用不同的消费者组生产不同的 topic

客户端工作示意图如下:

我的项目构造

本我的项目共提供四个 module:

bitkylin-mq-server
bitkylin-mq-api
bitkylin-mq-client-producer
bitkylin-mq-client-consumer

各 module 的介绍如下:

1. bitkylin-mq-server

提供 MQ 服务端,提供 broker 以及其关联的 ConsumerGroup 和 Topic 等,次要实现如下性能:

  • 展现 MQ 概览信息,包含 topic 和 ConsumerGroup 的详细信息
  • 创立消费者组,创立消费者组后,即可应用该消费者组生产音讯
  • 生产音讯,将音讯发送至指定 topic
  • 基于指定消费者组生产音讯,生产音讯但不更新关联消费者组的 offset
  • 基于指定消费者组生产音讯,生产音讯且自动更新关联消费者组的 offset
  • 手动更新指定消费者组的偏移量

2. bitkylin-mq-api

提供供客户端应用的 api,通过 feignClient 模式提供,客户端可间接应用,执行 RPC,以后实现如下性能:

  • 发送音讯至指定 topic
  • 订阅指定 topic 的音讯。主动创立消费者组,应用观察者模式轮询音讯并生产。

3. bitkylin-mq-client-producer

音讯生产客户端,通过 feign-api 生产音讯,以后实现如下演示性能:
随机向 topic 名为「topic-1」和「topic-2」的 topic 中发送音讯,每隔 3 秒发送一次音讯。

4. bitkylin-mq-client-consumer

音讯生产客户端,通过 feign-api 生产音讯,以后实现如下演示性能:

  • 创立消费者组「spring-group-1」订阅「topic-1」,并打印订阅的音讯。
  • 创立消费者组「spring-group-2」订阅「topic-2」,并打印订阅的音讯。

代码演示

  1. 运行 module「bitkylin-mq-server」,启动 MQ 的 broker,启动音讯服务。
  2. 运行 module「bitkylin-mq-client-consumer」和「bitkylin-mq-client-producer」,开启音讯订阅演示工作和音讯发送演示工作。
  3. 此时可通过「bitkylin-mq-client-consumer」的控制台,看到音讯一直被生产。
2021-01-24 01:55:58.008  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:1
2021-01-24 01:56:00.996  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:2
2021-01-24 01:56:04.000  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:3
2021-01-24 01:56:07.004  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:4
2021-01-24 01:56:10.015  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:5
2021-01-24 01:56:13.011  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:6
2021-01-24 01:56:16.011  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:7
2021-01-24 01:56:19.006  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:8
2021-01-24 01:56:21.997  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:9
2021-01-24 01:56:24.994  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:10
2021-01-24 01:56:28.002  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:11
2021-01-24 01:56:30.991  INFO 2516 --- [pool-1-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-1: topic-1-msg:12
2021-01-24 01:56:34.014  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:13
2021-01-24 01:56:37.010  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:14
2021-01-24 01:56:40.004  INFO 2516 --- [pool-2-thread-1] .c.c.BitkylinMqClientConsumerApplication : 收到音讯:spring-group-2: topic-2-msg:15
  1. 关上 postman,发送如下申请创立专用于 postman 的消费者组:
POST http://localhost:8080/mq/broker/consumer-group/create

{
    "groupName": "postman-group-1",
    "topicName": "topic-1"
}
  1. 发送如下申请即可生产音讯,且主动确认「无需手动更新消费者组的 offset」
POST http://localhost:8080/mq/broker/message/simple-pull

{
    "groupName": "postman-group-1",
    "topicName": "topic-1"
}

能够发现,postman 能够独立生产指定 topic 的音讯,不受 Spring 程序生产的影响。当然,postman 能够间接应用 Spring 程序统一的消费者组,以独特生产音讯。

此时查问 MQ 的概览信息:

GET http://localhost:8080/mq/broker/overview

响应:

{
  "groupList": [
    {
      "groupName": "spring-group-1",
      "topic": {
        "name": "topic-1",
        "capacity": 1000,
        "maxIndex": 14
      },
      "offset": 15
    },
    {
      "groupName": "postman-group-1",
      "topic": {
        "name": "topic-1",
        "capacity": 1000,
        "maxIndex": 14
      },
      "offset": 5
    },
    {
      "groupName": "spring-group-2",
      "topic": {
        "name": "topic-2",
        "capacity": 1000,
        "maxIndex": 17
      },
      "offset": 18
    }
  ]
}

局限性

  1. 每个 topic 的队列容量是固定的,队列满后回绝生产音讯,暂不反对清理历史音讯。
  2. 音讯生产未加锁,如果一个消费者组的多个消费者高并发生产音讯,可能导致同一条音讯被生产屡次。

正文完
 0