共计 6458 个字符,预计需要花费 17 分钟才能阅读完成。
MQ 是什么?
MQ(Message Queue)音讯队列,是根底数据结构中“先进先出”的一种数据结构。
指把要传输的数据(音讯)放在队列中,用队列机制来实现消息传递——生产者产生音讯并把音讯放入队列,而后由消费者去解决。
消费者能够到指定队列拉取音讯,或者订阅相应的队列,由 MQ 服务端给其推送音讯。
MQ 的作用?
音讯队列中间件是分布式系统中重要的组件,次要解决利用解耦,异步音讯,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
解耦:一个业务须要多个模块独特实现,或者一条音讯有多个零碎须要对应解决,只须要主业务实现当前,发送一条 MQ,其余模块生产 MQ 音讯,即可实现业务,升高模块之间的耦合。
异步:主业务执行完结后隶属业务通过 MQ,异步执行,减低业务的响应工夫,进步用户体验。
削峰:高并发状况下,业务异步解决,提供高峰期业务解决能力,防止零碎瘫痪。
ps: 以上内容摘选自百科。
实现 mq 的筹备工作
maven 引入
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
模块划分
The message queue in java. 作为 mq 的从零开始的学习我的项目,目前已开源。
我的项目的模块如下:
模块 | 阐明 |
---|---|
mq-common | 公共代码 |
mq-broker | 注册核心 |
mq-producer | 音讯生产者 |
mq-consumer | 音讯消费者 |
音讯消费者
接口定义
package com.github.houbb.mq.consumer.api;
/**
* @author binbin.hou
* @since 1.0.0
*/
public interface IMqConsumer {
/**
* 订阅
* @param topicName topic 名称
* @param tagRegex 标签正则
*/
void subscribe(String topicName, String tagRegex);
/**
* 注册监听器
* @param listener 监听器
*/
void registerListener(final IMqConsumerListener listener);
}
IMqConsumerListener
作为音讯监听类的接口,定义如下:
public interface IMqConsumerListener {
/**
* 生产
* @param mqMessage 音讯体
* @param context 上下文
* @return 后果
*/
ConsumerStatus consumer(final MqMessage mqMessage,
final IMqConsumerListenerContext context);
}
ConsumerStatus 代表音讯生产的几种状态。
音讯体
启动音讯体 MqMessage 定义如下:
package com.github.houbb.mq.common.dto;
import java.util.Arrays;
import java.util.List;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class MqMessage {
/**
* 题目名称
*/
private String topic;
/**
* 标签
*/
private List<String> tags;
/**
* 内容
*/
private byte[] payload;
/**
* 业务标识
*/
private String bizKey;
/**
* 负载分片标识
*/
private String shardingKey;
// getter&setter&toString
}
push 消费者策略实现
消费者启动的实现如下:
/**
* 推送生产策略
*
* @author binbin.hou
* @since 1.0.0
*/
public class MqConsumerPush extends Thread implements IMqConsumer {
// 省略...
@Override
public void run() {
// 启动服务端
log.info("MQ 消费者开始启动服务端 groupName: {}, port: {}, brokerAddress: {}",
groupName, port, brokerAddress);
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(workerGroup, bossGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new MqConsumerHandler());
}
})
// 这个参数影响的是还没有被 accept 取出的连贯
.option(ChannelOption.SO_BACKLOG, 128)
// 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接管进来的链接
ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
log.info("MQ 消费者启动实现,监听【" + port + "】端口");
channelFuture.channel().closeFuture().syncUninterruptibly();
log.info("MQ 消费者敞开实现");
} catch (Exception e) {log.error("MQ 消费者启动异样", e);
throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);
} finally {workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();}
}
// 省略...
}
ps: 初期咱们把 consumer 作为服务端,后续引入 broker 则只有 broker 是服务端。
MqConsumerHandler 解决类
这个类是一个空的实现。
public class MqConsumerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {//nothing}
}
测试代码
MqConsumerPush mqConsumerPush = new MqConsumerPush();
mqConsumerPush.start();
启动日志:
[DEBUG] [2022-04-21 19:16:41.343] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2022-04-21 19:16:41.356] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C_DEFAULT_GROUP_NAME, port: 9527, brokerAddress:
[INFO] [2022-04-21 19:16:43.196] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动实现,监听【9527】端口
音讯生产者
接口定义
最根本的音讯发送接口。
package com.github.houbb.mq.producer.api;
import com.github.houbb.mq.common.dto.MqMessage;
import com.github.houbb.mq.producer.dto.SendResult;
/**
* @author binbin.hou
* @since 1.0.0
*/
public interface IMqProducer {
/**
* 同步发送音讯
* @param mqMessage 音讯类型
* @return 后果
*/
SendResult send(final MqMessage mqMessage);
/**
* 单向发送音讯
* @param mqMessage 音讯类型
* @return 后果
*/
SendResult sendOneWay(final MqMessage mqMessage);
}
生产者实现
MqProducer 启动的实现如下,基于 netty。
package com.github.houbb.mq.producer.core;
/**
* 默认 mq 生产者
* @author binbin.hou
* @since 1.0.0
*/
public class MqProducer extends Thread implements IMqProducer {
// 省略...
@Override
public void run() {
// 启动服务端
log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",
groupName, port, brokerAddress);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {Bootstrap bootstrap = new Bootstrap();
ChannelFuture channelFuture = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception {ch.pipeline()
.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new MqProducerHandler());
}
})
.connect("localhost", port)
.syncUninterruptibly();
log.info("MQ 生产者启动客户端实现,监听端口:" + port);
channelFuture.channel().closeFuture().syncUninterruptibly();
log.info("MQ 生产者开始客户端已敞开");
} catch (Exception e) {log.error("MQ 生产者启动遇到异样", e);
throw new MqException(ProducerRespCode.RPC_INIT_FAILED);
} finally {workerGroup.shutdownGracefully();
}
}
// 省略...
}
MqProducerHandler 解决类
默认的空实现,什么都不做。
package com.github.houbb.mq.producer.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class MqProducerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {//do nothing now}
}
启动代码
MqProducer mqProducer = new MqProducer();
mqProducer.start();
启动日志:
[DEBUG] [2022-04-21 19:17:11.960] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2022-04-21 19:17:11.974] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者开始启动客户端 GROUP: P_DEFAULT_GROUP_NAME, PORT: 9527, brokerAddress:
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x5cb48145] REGISTERED
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler connect
信息: [id: 0x5cb48145] CONNECT: localhost/127.0.0.1:9527
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x5cb48145, L:/127.0.0.1:57740 - R:localhost/127.0.0.1:9527] ACTIVE
[INFO] [2022-04-21 19:17:13.833] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端实现,监听端口:9527
小结
基于 netty 最根本的服务端启动、客户端启动到这里就完结了。
千里之行,始于足下。
咱们下一节将和大家一起学习,如何实现客户端与服务端之间的交互。
心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。
我是老马,期待与你的下次重逢。
开源地址
The message queue in java.(java 繁难版本 mq 实现) : https://github.com/houbb/mq
拓展浏览
rpc- 从零开始实现 rpc: https://github.com/houbb/rpc
【mq】从零开始实现 mq-01- 生产者、消费者启动