MQ 是什么?

MQ(Message Queue)音讯队列,是根底数据结构中“先进先出”的一种数据结构。

指把要传输的数据(音讯)放在队列中,用队列机制来实现消息传递——生产者产生音讯并把音讯放入队列,而后由消费者去解决。

消费者能够到指定队列拉取音讯,或者订阅相应的队列,由MQ服务端给其推送音讯。

MQ 的作用?

音讯队列中间件是分布式系统中重要的组件,次要解决利用解耦,异步音讯,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

解耦:一个业务须要多个模块独特实现,或者一条音讯有多个零碎须要对应解决,只须要主业务实现当前,发送一条MQ,其余模块生产MQ音讯,即可实现业务,升高模块之间的耦合。

异步:主业务执行完结后隶属业务通过MQ,异步执行,减低业务的响应工夫,进步用户体验。

削峰:高并发状况下,业务异步解决,提供高峰期业务解决能力,防止零碎瘫痪。

ps: 以上内容摘选自百科。

实现 mq 的筹备工作

maven 引入

<dependency>    <groupId>io.netty</groupId>    <artifactId>netty-all</artifactId>    <version>4.1.42.Final</version></dependency><dependency>    <groupId>com.alibaba</groupId>    <artifactId>fastjson</artifactId>    <version>1.2.76</version></dependency>

模块划分

The message queue in java. 作为 mq 的从零开始的学习我的项目,目前已开源。

我的项目的模块如下:

模块阐明
mq-common公共代码
mq-broker注册核心
mq-producer音讯生产者
mq-consumer音讯消费者

音讯消费者

接口定义

package com.github.houbb.mq.consumer.api;/** * @author binbin.hou * @since 1.0.0 */public interface IMqConsumer {    /**     * 订阅     * @param topicName topic 名称     * @param tagRegex 标签正则     */    void subscribe(String topicName, String tagRegex);    /**     * 注册监听器     * @param listener 监听器     */    void registerListener(final IMqConsumerListener listener);}

IMqConsumerListener 作为音讯监听类的接口,定义如下:

public interface IMqConsumerListener {    /**     * 生产     * @param mqMessage 音讯体     * @param context 上下文     * @return 后果     */    ConsumerStatus consumer(final MqMessage mqMessage,                            final IMqConsumerListenerContext context);}

ConsumerStatus 代表音讯生产的几种状态。

音讯体

启动音讯体 MqMessage 定义如下:

package com.github.houbb.mq.common.dto;import java.util.Arrays;import java.util.List;/** * @author binbin.hou * @since 1.0.0 */public class MqMessage {    /**     * 题目名称     */    private String topic;    /**     * 标签     */    private List<String> tags;    /**     * 内容     */    private byte[] payload;    /**     * 业务标识     */    private String bizKey;    /**     * 负载分片标识     */    private String shardingKey;    // getter&setter&toString}

push 消费者策略实现

消费者启动的实现如下:

/** * 推送生产策略 * * @author binbin.hou * @since 1.0.0 */public class MqConsumerPush extends Thread implements IMqConsumer  {    // 省略...    @Override    public void run() {        // 启动服务端        log.info("MQ 消费者开始启动服务端 groupName: {}, port: {}, brokerAddress: {}",                groupName, port, brokerAddress);        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(workerGroup, bossGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<Channel>() {                        @Override                        protected void initChannel(Channel ch) throws Exception {                            ch.pipeline().addLast(new MqConsumerHandler());                        }                    })                    // 这个参数影响的是还没有被accept 取出的连贯                    .option(ChannelOption.SO_BACKLOG, 128)                    // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。                    .childOption(ChannelOption.SO_KEEPALIVE, true);            // 绑定端口,开始接管进来的链接            ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();            log.info("MQ 消费者启动实现,监听【" + port + "】端口");            channelFuture.channel().closeFuture().syncUninterruptibly();            log.info("MQ 消费者敞开实现");        } catch (Exception e) {            log.error("MQ 消费者启动异样", e);            throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);        } finally {            workerGroup.shutdownGracefully();            bossGroup.shutdownGracefully();        }    }    // 省略...}

ps: 初期咱们把 consumer 作为服务端,后续引入 broker 则只有 broker 是服务端。

MqConsumerHandler 解决类

这个类是一个空的实现。

public class MqConsumerHandler extends SimpleChannelInboundHandler {    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {        //nothing    }}

测试代码

MqConsumerPush mqConsumerPush = new MqConsumerPush();mqConsumerPush.start();

启动日志:

[DEBUG] [2022-04-21 19:16:41.343] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.[INFO] [2022-04-21 19:16:41.356] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C_DEFAULT_GROUP_NAME, port: 9527, brokerAddress: [INFO] [2022-04-21 19:16:43.196] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动实现,监听【9527】端口

音讯生产者

接口定义

最根本的音讯发送接口。

package com.github.houbb.mq.producer.api;import com.github.houbb.mq.common.dto.MqMessage;import com.github.houbb.mq.producer.dto.SendResult;/** * @author binbin.hou * @since 1.0.0 */public interface IMqProducer {    /**     * 同步发送音讯     * @param mqMessage 音讯类型     * @return 后果     */    SendResult send(final MqMessage mqMessage);    /**     * 单向发送音讯     * @param mqMessage 音讯类型     * @return 后果     */    SendResult sendOneWay(final MqMessage mqMessage);}

生产者实现

MqProducer 启动的实现如下,基于 netty。

package com.github.houbb.mq.producer.core;/** * 默认 mq 生产者 * @author binbin.hou * @since 1.0.0 */public class MqProducer extends Thread implements IMqProducer {    //省略...    @Override    public void run() {        // 启动服务端        log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",                groupName, port, brokerAddress);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            Bootstrap bootstrap = new Bootstrap();            ChannelFuture channelFuture = bootstrap.group(workerGroup)                    .channel(NioSocketChannel.class)                    .option(ChannelOption.SO_KEEPALIVE, true)                    .handler(new ChannelInitializer<Channel>(){                        @Override                        protected void initChannel(Channel ch) throws Exception {                            ch.pipeline()                                    .addLast(new LoggingHandler(LogLevel.INFO))                                    .addLast(new MqProducerHandler());                        }                    })                    .connect("localhost", port)                    .syncUninterruptibly();            log.info("MQ 生产者启动客户端实现,监听端口:" + port);            channelFuture.channel().closeFuture().syncUninterruptibly();            log.info("MQ 生产者开始客户端已敞开");        } catch (Exception e) {            log.error("MQ 生产者启动遇到异样", e);            throw new MqException(ProducerRespCode.RPC_INIT_FAILED);        } finally {            workerGroup.shutdownGracefully();        }    }    //省略...}

MqProducerHandler 解决类

默认的空实现,什么都不做。

package com.github.houbb.mq.producer.handler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * @author binbin.hou * @since 1.0.0 */public class MqProducerHandler extends SimpleChannelInboundHandler {    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {        //do nothing now    }}

启动代码

MqProducer mqProducer = new MqProducer();mqProducer.start();

启动日志:

[DEBUG] [2022-04-21 19:17:11.960] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.[INFO] [2022-04-21 19:17:11.974] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者开始启动客户端 GROUP: P_DEFAULT_GROUP_NAME, PORT: 9527, brokerAddress: 四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelRegistered信息: [id: 0x5cb48145] REGISTERED四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler connect信息: [id: 0x5cb48145] CONNECT: localhost/127.0.0.1:9527四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelActive信息: [id: 0x5cb48145, L:/127.0.0.1:57740 - R:localhost/127.0.0.1:9527] ACTIVE[INFO] [2022-04-21 19:17:13.833] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端实现,监听端口:9527

小结

基于 netty 最根本的服务端启动、客户端启动到这里就完结了。

千里之行,始于足下。

咱们下一节将和大家一起学习,如何实现客户端与服务端之间的交互。

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 繁难版本 mq 实现) : https://github.com/houbb/mq

拓展浏览

rpc-从零开始实现 rpc: https://github.com/houbb/rpc

【mq】从零开始实现 mq-01-生产者、消费者启动