前景回顾

上一节咱们学习了如何实现基于 netty 客服端和服务端的启动。

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】java 从零开始实现音讯队列 mq-02-如何实现生产者调用消费者?

那么客户端如何调用服务端呢?

咱们本节就来一起实现一下。

消费者实现

启动类的调整

ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(workerGroup, bossGroup)        .channel(NioServerSocketChannel.class)        .childHandler(new ChannelInitializer<Channel>() {            @Override            protected void initChannel(Channel ch) throws Exception {                ch.pipeline()                        .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))                        .addLast(new MqConsumerHandler(invokeService));            }        })        // 这个参数影响的是还没有被accept 取出的连贯        .option(ChannelOption.SO_BACKLOG, 128)        // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。        .childOption(ChannelOption.SO_KEEPALIVE, true);

这里咱们通过指定分隔符解决 netty 粘包问题。

解决 netty 粘包问题

MqConsumerHandler 解决类

MqConsumerHandler 的实现如下,增加对应的业务解决逻辑。

package com.github.houbb.mq.consumer.handler;/** * @author binbin.hou * @since 1.0.0 */public class MqConsumerHandler extends SimpleChannelInboundHandler {    private static final Log log = LogFactory.getLog(MqConsumerHandler.class);    /**     * 调用治理类     * @since 1.0.0     */    private final IInvokeService invokeService;    public MqConsumerHandler(IInvokeService invokeService) {        this.invokeService = invokeService;    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf byteBuf = (ByteBuf) msg;        byte[] bytes = new byte[byteBuf.readableBytes()];        byteBuf.readBytes(bytes);        RpcMessageDto rpcMessageDto = null;        try {            rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);        } catch (Exception exception) {            log.error("RpcMessageDto json 格局转换异样 {}", new String(bytes));            return;        }        if (rpcMessageDto.isRequest()) {            MqCommonResp commonResp = this.dispatch(rpcMessageDto, ctx);            if(commonResp == null) {                log.debug("以后音讯为 null,疏忽解决。");                return;            }            writeResponse(rpcMessageDto, commonResp, ctx);        } else {            final String traceId = rpcMessageDto.getTraceId();            // 抛弃掉 traceId 为空的信息            if(StringUtil.isBlank(traceId)) {                log.debug("[Server Response] response traceId 为空,间接抛弃", JSON.toJSON(rpcMessageDto));                return;            }            // 增加音讯            invokeService.addResponse(traceId, rpcMessageDto);        }    }}

rpc 音讯体定义

为了统一标准,咱们的 rpc 音讯体 RpcMessageDto 定义如下:

package com.github.houbb.mq.common.rpc;/** * @author binbin.hou * @since 1.0.0 */public class RpcMessageDto implements Serializable {    /**     * 申请工夫     */    private long requestTime;    /**     * 申请标识     */    private String traceId;    /**     * 办法类型     */    private String methodType;    /**     * 是否为申请音讯     */    private boolean isRequest;    private String respCode;    private String respMsg;    private String json;    //getter&setter}

音讯散发

对于接管到的音讯体 RpcMessageDto,散发逻辑如下:

/** * 音讯的散发 * * @param rpcMessageDto 入参 * @param ctx 上下文 * @return 后果 */private MqCommonResp dispatch(RpcMessageDto rpcMessageDto, ChannelHandlerContext ctx) {    final String methodType = rpcMessageDto.getMethodType();    final String json = rpcMessageDto.getJson();    String channelId = ChannelUtil.getChannelId(ctx);    log.debug("channelId: {} 接管到 method: {} 内容:{}", channelId,            methodType, json);    // 音讯发送    if(MethodType.P_SEND_MESSAGE.equals(methodType)) {        // 日志输入        log.info("收到服务端音讯: {}", json);        // 如果是 broker,应该进行解决化等操作。        MqCommonResp resp = new MqCommonResp();        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());        return resp;    }    throw new UnsupportedOperationException("暂不反对的办法类型");}

这里对于接管到的音讯,只做一个简略的日志输入,后续将增加对应的业务逻辑解决。

后果回写

收到申请当前,咱们须要返回对应的响应。

基于 channel 的回写实现如下:

/** * 后果写回 * * @param req  申请 * @param resp 响应 * @param ctx  上下文 */private void writeResponse(RpcMessageDto req,                           Object resp,                           ChannelHandlerContext ctx) {    final String id = ctx.channel().id().asLongText();    RpcMessageDto rpcMessageDto = new RpcMessageDto();    // 响应类音讯    rpcMessageDto.setRequest(false);    rpcMessageDto.setTraceId(req.getTraceId());    rpcMessageDto.setMethodType(req.getMethodType());    rpcMessageDto.setRequestTime(System.currentTimeMillis());    String json = JSON.toJSONString(resp);    rpcMessageDto.setJson(json);    // 回写到 client 端    ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);    ctx.writeAndFlush(byteBuf);    log.debug("[Server] channel {} response {}", id, JSON.toJSON(rpcMessageDto));}

调用治理类

为了方便管理异步返回的申请后果,咱们对立定义了 IInvokeService 类,用于治理申请与响应。

接口

package com.github.houbb.mq.common.support.invoke;import com.github.houbb.mq.common.rpc.RpcMessageDto;/** * 调用服务接口 * @author binbin.hou * @since 1.0.0 */public interface IInvokeService {    /**     * 增加申请信息     * @param seqId 序列号     * @param timeoutMills 超时工夫     * @return this     * @since 1.0.0     */    IInvokeService addRequest(final String seqId,                              final long timeoutMills);    /**     * 放入后果     * @param seqId 惟一标识     * @param rpcResponse 响应后果     * @return this     * @since 1.0.0     */    IInvokeService addResponse(final String seqId, final RpcMessageDto rpcResponse);    /**     * 获取标记信息对应的后果     * @param seqId 序列号     * @return 后果     * @since 1.0.0     */    RpcMessageDto getResponse(final String seqId);}

实现

实现自身也不难。

package com.github.houbb.mq.common.support.invoke.impl;/** * 调用服务接口 * @author binbin.hou * @since 1.0.0 */public class InvokeService implements IInvokeService {    private static final Log logger = LogFactory.getLog(InvokeService.class);    /**     * 申请序列号 map     * (1)这里前期如果要增加超时检测,能够增加对应的超时工夫。     * 能够把这里调整为 map     *     * key: seqId 惟一标识一个申请     * value: 存入该申请最长的无效工夫。用于定时删除和超时判断。     * @since 0.0.2     */    private final ConcurrentHashMap<String, Long> requestMap;    /**     * 响应后果     * @since 1.0.0     */    private final ConcurrentHashMap<String, RpcMessageDto> responseMap;    public InvokeService() {        requestMap = new ConcurrentHashMap<>();        responseMap = new ConcurrentHashMap<>();        final Runnable timeoutThread = new TimeoutCheckThread(requestMap, responseMap);        Executors.newScheduledThreadPool(1)                .scheduleAtFixedRate(timeoutThread,60, 60, TimeUnit.SECONDS);    }    @Override    public IInvokeService addRequest(String seqId, long timeoutMills) {        logger.debug("[Invoke] start add request for seqId: {}, timeoutMills: {}", seqId,                timeoutMills);        final long expireTime = System.currentTimeMillis()+timeoutMills;        requestMap.putIfAbsent(seqId, expireTime);        return this;    }    @Override    public IInvokeService addResponse(String seqId, RpcMessageDto rpcResponse) {        // 1. 判断是否无效        Long expireTime = this.requestMap.get(seqId);        // 如果为空,可能是这个后果曾经超时了,被定时 job 移除之后,响应后果才过去。间接疏忽        if(ObjectUtil.isNull(expireTime)) {            return this;        }        //2. 判断是否超时        if(System.currentTimeMillis() > expireTime) {            logger.debug("[Invoke] seqId:{} 信息已超时,间接返回超时后果。", seqId);            rpcResponse = RpcMessageDto.timeout();        }        // 这里放入之前,能够增加判断。        // 如果 seqId 必须解决申请汇合中,才容许放入。或者间接疏忽抛弃。        // 告诉所有期待方        responseMap.putIfAbsent(seqId, rpcResponse);        logger.debug("[Invoke] 获取后果信息,seqId: {}, rpcResponse: {}", seqId, JSON.toJSON(rpcResponse));        logger.debug("[Invoke] seqId:{} 信息曾经放入,告诉所有期待方", seqId);        // 移除对应的 requestMap        requestMap.remove(seqId);        logger.debug("[Invoke] seqId:{} remove from request map", seqId);        // 同步锁        synchronized (this) {            this.notifyAll();            logger.debug("[Invoke] {} notifyAll()", seqId);        }        return this;    }    @Override    public RpcMessageDto getResponse(String seqId) {        try {            RpcMessageDto rpcResponse = this.responseMap.get(seqId);            if(ObjectUtil.isNotNull(rpcResponse)) {                logger.debug("[Invoke] seq {} 对应后果曾经获取: {}", seqId, rpcResponse);                return rpcResponse;            }            // 进入期待            while (rpcResponse == null) {                logger.debug("[Invoke] seq {} 对应后果为空,进入期待", seqId);                // 同步期待锁                synchronized (this) {                    this.wait();                }                logger.debug("[Invoke] {} wait has notified!", seqId);                rpcResponse = this.responseMap.get(seqId);                logger.debug("[Invoke] seq {} 对应后果曾经获取: {}", seqId, rpcResponse);            }            return rpcResponse;        } catch (InterruptedException e) {            logger.error("获取响应异样", e);            throw new MqException(MqCommonRespCode.RPC_GET_RESP_FAILED);        }    }}

这里 getResponse 获取不到会进入期待,直到 addResponse 唤醒。

然而这也有一个问题,如果一个申请的响应失落了怎么办?

总不能始终期待吧。

TimeoutCheckThread 超时检测线程

超时检测线程就能够帮咱们解决一些超时未返回的后果。

package com.github.houbb.mq.common.support.invoke.impl;import com.github.houbb.heaven.util.common.ArgUtil;import com.github.houbb.mq.common.rpc.RpcMessageDto;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;/** * 超时检测线程 * @author binbin.hou * @since 0.0.2 */public class TimeoutCheckThread implements Runnable {    /**     * 申请信息     * @since 0.0.2     */    private final ConcurrentHashMap<String, Long> requestMap;    /**     * 申请信息     * @since 0.0.2     */    private final ConcurrentHashMap<String, RpcMessageDto> responseMap;    /**     * 新建     * @param requestMap  申请 Map     * @param responseMap 后果 map     * @since 0.0.2     */    public TimeoutCheckThread(ConcurrentHashMap<String, Long> requestMap,                              ConcurrentHashMap<String, RpcMessageDto> responseMap) {        ArgUtil.notNull(requestMap, "requestMap");        this.requestMap = requestMap;        this.responseMap = responseMap;    }    @Override    public void run() {        for(Map.Entry<String, Long> entry : requestMap.entrySet()) {            long expireTime = entry.getValue();            long currentTime = System.currentTimeMillis();            if(currentTime > expireTime) {                final String key = entry.getKey();                // 后果设置为超时,从申请 map 中移除                responseMap.putIfAbsent(key, RpcMessageDto.timeout());                requestMap.remove(key);            }        }    }}

解决逻辑就是定时检测,如果超时了,就默认设置后果为超时,并且从申请汇合中移除。

音讯生产者实现

启动外围类

public class MqProducer extends Thread implements IMqProducer {    private static final Log log = LogFactory.getLog(MqProducer.class);    /**     * 分组名称     */    private final String groupName;    /**     * 端口号     */    private final int port;    /**     * 中间人地址     */    private String brokerAddress  = "";    /**     * channel 信息     * @since 0.0.2     */    private ChannelFuture channelFuture;    /**     * 客户端解决 handler     * @since 0.0.2     */    private ChannelHandler channelHandler;    /**     * 调用治理服务     * @since 0.0.2     */    private final IInvokeService invokeService = new InvokeService();    /**     * 获取响应超时工夫     * @since 0.0.2     */    private long respTimeoutMills = 5000;    /**     * 可用标识     * @since 0.0.2     */    private volatile boolean enableFlag = false;    /**     * 粘包解决分隔符     * @since 1.0.0     */    private String delimiter = DelimiterUtil.DELIMITER;    //set 办法        @Override    public synchronized void run() {        // 启动服务端        log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",                groupName, port, brokerAddress);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            // channel handler            this.initChannelHandler();            // 省略,同以前            // 标识为可用            enableFlag = true;        } catch (Exception e) {            log.error("MQ 生产者启动遇到异样", e);            throw new MqException(ProducerRespCode.RPC_INIT_FAILED);        }    }}

其中初始化 handler 的实现如下:

private void initChannelHandler() {    final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(delimiter);    final MqProducerHandler mqProducerHandler = new MqProducerHandler();    mqProducerHandler.setInvokeService(invokeService);    // handler 实际上会被屡次调用,如果不是 @Shareable,应该每次都从新创立。    ChannelHandler handler = new ChannelInitializer<Channel>() {        @Override        protected void initChannel(Channel ch) throws Exception {            ch.pipeline()                    .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))                    .addLast(mqProducerHandler);        }    };    this.channelHandler = handler;}

MqProducerHandler 生产者解决逻辑

和消费者解决逻辑相似。

这里最外围的就是增加响应后果:invokeService.addResponse(rpcMessageDto.getTraceId(), rpcMessageDto);

package com.github.houbb.mq.producer.handler;/** * @author binbin.hou * @since 1.0.0 */public class MqProducerHandler extends SimpleChannelInboundHandler {    private static final Log log = LogFactory.getLog(MqProducerHandler.class);    /**     * 调用治理类     */    private IInvokeService invokeService;    public void setInvokeService(IInvokeService invokeService) {        this.invokeService = invokeService;    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf byteBuf = (ByteBuf)msg;        byte[] bytes = new byte[byteBuf.readableBytes()];        byteBuf.readBytes(bytes);        String text = new String(bytes);        log.debug("[Client] channelId {} 接管到音讯 {}", ChannelUtil.getChannelId(ctx), text);        RpcMessageDto rpcMessageDto = null;        try {            rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);        } catch (Exception exception) {            log.error("RpcMessageDto json 格局转换异样 {}", JSON.parse(bytes));            return;        }        if(rpcMessageDto.isRequest()) {            // 申请类            final String methodType = rpcMessageDto.getMethodType();            final String json = rpcMessageDto.getJson();        } else {            // 抛弃掉 traceId 为空的信息            if(StringUtil.isBlank(rpcMessageDto.getTraceId())) {                log.debug("[Client] response traceId 为空,间接抛弃", JSON.toJSON(rpcMessageDto));                return;            }            invokeService.addResponse(rpcMessageDto.getTraceId(), rpcMessageDto);            log.debug("[Client] response is :{}", JSON.toJSON(rpcMessageDto));        }    }}

音讯的发送

关怀申请后果的:

public SendResult send(MqMessage mqMessage) {    String messageId = IdHelper.uuid32();    mqMessage.setTraceId(messageId);    mqMessage.setMethodType(MethodType.P_SEND_MESSAGE);    MqCommonResp resp = callServer(mqMessage, MqCommonResp.class);    if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {        return SendResult.of(messageId, SendStatus.SUCCESS);    }    return SendResult.of(messageId, SendStatus.FAILED);}

不关怀申请后果的发送:

public SendResult sendOneWay(MqMessage mqMessage) {    String messageId = IdHelper.uuid32();    mqMessage.setTraceId(messageId);    mqMessage.setMethodType(MethodType.P_SEND_MESSAGE);    this.callServer(mqMessage, null);    return SendResult.of(messageId, SendStatus.SUCCESS);}

其中 callServer 实现如下:

/** * 调用服务端 * @param commonReq 通用申请 * @param respClass 类 * @param <T> 泛型 * @param <R> 后果 * @return 后果 * @since 1.0.0 */public <T extends MqCommonReq, R extends MqCommonResp> R callServer(T commonReq, Class<R> respClass) {    final String traceId = commonReq.getTraceId();    final long requestTime = System.currentTimeMillis();    RpcMessageDto rpcMessageDto = new RpcMessageDto();    rpcMessageDto.setTraceId(traceId);    rpcMessageDto.setRequestTime(requestTime);    rpcMessageDto.setJson(JSON.toJSONString(commonReq));    rpcMessageDto.setMethodType(commonReq.getMethodType());    rpcMessageDto.setRequest(true);    // 增加调用服务    invokeService.addRequest(traceId, respTimeoutMills);    // 遍历 channel    // 敞开以后线程,以获取对应的信息    // 应用序列化的形式    ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);    //负载平衡获取 channel    Channel channel = channelFuture.channel();    channel.writeAndFlush(byteBuf);    String channelId = ChannelUtil.getChannelId(channel);    log.debug("[Client] channelId {} 发送音讯 {}", channelId, JSON.toJSON(rpcMessageDto));    if (respClass == null) {        log.debug("[Client] 以后音讯为 one-way 音讯,疏忽响应");        return null;    } else {        //channelHandler 中获取对应的响应        RpcMessageDto messageDto = invokeService.getResponse(traceId);        if (MqCommonRespCode.TIMEOUT.getCode().equals(messageDto.getRespCode())) {            throw new MqException(MqCommonRespCode.TIMEOUT);        }        String respJson = messageDto.getJson();        return JSON.parseObject(respJson, respClass);    }}

测试代码

启动消费者

MqConsumerPush mqConsumerPush = new MqConsumerPush();mqConsumerPush.start();

启动日志如下:

[DEBUG] [2022-04-21 19:55:26.346] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.[INFO] [2022-04-21 19:55:26.369] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C_DEFAULT_GROUP_NAME, port: 9527, brokerAddress: [INFO] [2022-04-21 19:55:27.845] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动实现,监听【9527】端口

启动生产者

MqProducer mqProducer = new MqProducer();mqProducer.start();//期待启动实现while (!mqProducer.isEnableFlag()) {    System.out.println("期待初始化实现...");    DateUtil.sleep(100);}String message = "HELLO MQ!";MqMessage mqMessage = new MqMessage();mqMessage.setTopic("TOPIC");mqMessage.setTags(Arrays.asList("TAGA", "TAGB"));mqMessage.setPayload(message.getBytes(StandardCharsets.UTF_8));SendResult sendResult = mqProducer.send(mqMessage);System.out.println(JSON.toJSON(sendResult));

生产者日志:

[INFO] [2022-04-21 19:56:39.609] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端实现,监听端口:9527...[DEBUG] [2022-04-21 19:56:39.895] [main] [c.g.h.m.c.s.i.i.InvokeService.addRequest] - [Invoke] start add request for seqId: a70ea2c4325641d6a5b198323228dc24, timeoutMills: 5000...[DEBUG] [2022-04-21 19:56:40.282] [main] [c.g.h.m.c.s.i.i.InvokeService.getResponse] - [Invoke] seq a70ea2c4325641d6a5b198323228dc24 对应后果曾经获取: com.github.houbb.mq.common.rpc.RpcMessageDto@a8f0b4...{"messageId":"a70ea2c4325641d6a5b198323228dc24","status":"SUCCESS"}

消费者日志:

[DEBUG] [2022-04-21 19:56:40.179] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.dispatch] - channelId: 502b73fffec4485c-00003954-00000001-384d194f6233433e-c8246542 接管到 method: P_SEND_MESSAGE 内容:{"methodType":"P_SEND_MESSAGE","payload":"SEVMTE8gTVEh","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"a70ea2c4325641d6a5b198323228dc24"}[INFO] [2022-04-21 19:56:40.180] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.dispatch] - 收到服务端音讯: {"methodType":"P_SEND_MESSAGE","payload":"SEVMTE8gTVEh","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"a70ea2c4325641d6a5b198323228dc24"}[DEBUG] [2022-04-21 19:56:40.234] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.writeResponse] - [Server] channel 502b73fffec4485c-00003954-00000001-384d194f6233433e-c8246542 response {"requestTime":1650542200182,"traceId":"a70ea2c4325641d6a5b198323228dc24","request":false,"methodType":"P_SEND_MESSAGE","json":"{\"respCode\":\"0000\",\"respMessage\":\"胜利\"}"}

能够看到消费者胜利的获取到了生产者的音讯。

小结

到这里,咱们就实现了一个音讯生产者调用消费者的实现。

然而你可能会问,这不就是 rpc 吗?

没有解耦。

是的,为了解决耦合问题,咱们将在下一节引入 broker 音讯的中间人。

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq

拓展浏览

rpc-从零开始实现 rpc https://github.com/houbb/rpc