前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

上一节咱们学习了如何实现生产者给消费者发送音讯,然而是通过直连的形式。

那么如何能力达到解耦的成果呢?

答案就是引入 broker,音讯的中间人。

MqBroker 实现

外围启动类

相似咱们后面 consumer 的启动实现:

package com.github.houbb.mq.broker.core;/** * @author binbin.hou * @since 1.0.0 */public class MqBroker extends Thread implements IMqBroker {    // 省略    private ChannelHandler initChannelHandler() {        MqBrokerHandler handler = new MqBrokerHandler();        handler.setInvokeService(invokeService);        handler.setRegisterConsumerService(registerConsumerService);        handler.setRegisterProducerService(registerProducerService);        handler.setMqBrokerPersist(mqBrokerPersist);        handler.setBrokerPushService(brokerPushService);        handler.setRespTimeoutMills(respTimeoutMills);        return handler;    }    @Override    public void run() {        // 启动服务端        log.info("MQ 中间人开始启动服务端 port: {}", port);        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER);            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(workerGroup, bossGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<Channel>() {                        @Override                        protected void initChannel(Channel ch) throws Exception {                            ch.pipeline()                                    .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))                                    .addLast(initChannelHandler());                        }                    })                    // 这个参数影响的是还没有被accept 取出的连贯                    .option(ChannelOption.SO_BACKLOG, 128)                    // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。                    .childOption(ChannelOption.SO_KEEPALIVE, true);            // 绑定端口,开始接管进来的链接            ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();            log.info("MQ 中间人启动实现,监听【" + port + "】端口");            channelFuture.channel().closeFuture().syncUninterruptibly();            log.info("MQ 中间人敞开实现");        } catch (Exception e) {            log.error("MQ 中间人启动异样", e);            throw new MqException(BrokerRespCode.RPC_INIT_FAILED);        } finally {            workerGroup.shutdownGracefully();            bossGroup.shutdownGracefully();        }    }}

initChannelHandler 中有不少新面孔,咱们前面会具体介绍。

MqBrokerHandler 解决逻辑

package com.github.houbb.mq.broker.handler;import java.util.List;/** * @author binbin.hou * @since 1.0.0 */public class MqBrokerHandler extends SimpleChannelInboundHandler {    private static final Log log = LogFactory.getLog(MqBrokerHandler.class);    /**     * 调用治理类     * @since 1.0.0     */    private IInvokeService invokeService;    /**     * 消费者治理     * @since 0.0.3     */    private IBrokerConsumerService registerConsumerService;    /**     * 生产者治理     * @since 0.0.3     */    private IBrokerProducerService registerProducerService;    /**     * 长久化类     * @since 0.0.3     */    private IMqBrokerPersist mqBrokerPersist;    /**     * 推送服务     * @since 0.0.3     */    private IBrokerPushService brokerPushService;    /**     * 获取响应超时工夫     * @since 0.0.3     */    private long respTimeoutMills;    //set 办法    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf byteBuf = (ByteBuf) msg;        byte[] bytes = new byte[byteBuf.readableBytes()];        byteBuf.readBytes(bytes);        RpcMessageDto rpcMessageDto = null;        try {            rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);        } catch (Exception exception) {            log.error("RpcMessageDto json 格局转换异样 {}", new String(bytes));            return;        }        if (rpcMessageDto.isRequest()) {            MqCommonResp commonResp = this.dispatch(rpcMessageDto, ctx);            if(commonResp == null) {                log.debug("以后音讯为 null,疏忽解决。");                return;            }            // 写回响应,和以前相似。                 writeResponse(rpcMessageDto, commonResp, ctx);        } else {            final String traceId = rpcMessageDto.getTraceId();            // 抛弃掉 traceId 为空的信息            if(StringUtil.isBlank(traceId)) {                log.debug("[Server Response] response traceId 为空,间接抛弃", JSON.toJSON(rpcMessageDto));                return;            }            // 增加音讯            invokeService.addResponse(traceId, rpcMessageDto);        }    }    /**     * 异步解决音讯     * @param mqMessage 音讯     * @since 0.0.3     */    private void asyncHandleMessage(MqMessage mqMessage) {        List<Channel> channelList = registerConsumerService.getSubscribeList(mqMessage);        if(CollectionUtil.isEmpty(channelList)) {            log.info("监听列表为空,疏忽解决");            return;        }        BrokerPushContext brokerPushContext = new BrokerPushContext();        brokerPushContext.setChannelList(channelList);        brokerPushContext.setMqMessage(mqMessage);        brokerPushContext.setMqBrokerPersist(mqBrokerPersist);        brokerPushContext.setInvokeService(invokeService);        brokerPushContext.setRespTimeoutMills(respTimeoutMills);        brokerPushService.asyncPush(brokerPushContext);    }}

音讯散发

broker 接管到音讯当前,dispatch 实现如下:

/** * 音讯的散发 * * @param rpcMessageDto 入参 * @param ctx 上下文 * @return 后果 */private MqCommonResp dispatch(RpcMessageDto rpcMessageDto, ChannelHandlerContext ctx) {    try {        final String methodType = rpcMessageDto.getMethodType();        final String json = rpcMessageDto.getJson();        String channelId = ChannelUtil.getChannelId(ctx);        final Channel channel = ctx.channel();        log.debug("channelId: {} 接管到 method: {} 内容:{}", channelId,                methodType, json);        // 生产者注册        if(MethodType.P_REGISTER.equals(methodType)) {            BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);            return registerProducerService.register(registerReq.getServiceEntry(), channel);        }        // 生产者登记        if(MethodType.P_UN_REGISTER.equals(methodType)) {            BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);            return registerProducerService.unRegister(registerReq.getServiceEntry(), channel);        }        // 生产者音讯发送        if(MethodType.P_SEND_MSG.equals(methodType)) {            MqMessage mqMessage = JSON.parseObject(json, MqMessage.class);            MqMessagePersistPut persistPut = new MqMessagePersistPut();            persistPut.setMqMessage(mqMessage);            persistPut.setMessageStatus(MessageStatusConst.WAIT_CONSUMER);            MqCommonResp commonResp = mqBrokerPersist.put(persistPut);            this.asyncHandleMessage(mqMessage);            return commonResp;        }        // 生产者音讯发送-ONE WAY        if(MethodType.P_SEND_MSG_ONE_WAY.equals(methodType)) {            MqMessage mqMessage = JSON.parseObject(json, MqMessage.class);            MqMessagePersistPut persistPut = new MqMessagePersistPut();            persistPut.setMqMessage(mqMessage);            persistPut.setMessageStatus(MessageStatusConst.WAIT_CONSUMER);            mqBrokerPersist.put(persistPut);            this.asyncHandleMessage(mqMessage);            return null;        }        // 消费者注册        if(MethodType.C_REGISTER.equals(methodType)) {            BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);            return registerConsumerService.register(registerReq.getServiceEntry(), channel);        }        // 消费者登记        if(MethodType.C_UN_REGISTER.equals(methodType)) {            BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);            return registerConsumerService.unRegister(registerReq.getServiceEntry(), channel);        }        // 消费者监听注册        if(MethodType.C_SUBSCRIBE.equals(methodType)) {            ConsumerSubscribeReq req = JSON.parseObject(json, ConsumerSubscribeReq.class);            return registerConsumerService.subscribe(req, channel);        }        // 消费者监听登记        if(MethodType.C_UN_SUBSCRIBE.equals(methodType)) {            ConsumerUnSubscribeReq req = JSON.parseObject(json, ConsumerUnSubscribeReq.class);            return registerConsumerService.unSubscribe(req, channel);        }        // 消费者被动 pull        if(MethodType.C_MESSAGE_PULL.equals(methodType)) {            MqConsumerPullReq req = JSON.parseObject(json, MqConsumerPullReq.class);            return mqBrokerPersist.pull(req, channel);        }        throw new UnsupportedOperationException("暂不反对的办法类型");    } catch (Exception exception) {        log.error("执行异样", exception);        MqCommonResp resp = new MqCommonResp();        resp.setRespCode(MqCommonRespCode.FAIL.getCode());        resp.setRespMessage(MqCommonRespCode.FAIL.getMsg());        return resp;    }}

音讯推送

this.asyncHandleMessage(mqMessage); 是 broker 接管到音讯之后的解决逻辑。

/** * 异步解决音讯 * @param mqMessage 音讯 * @since 0.0.3 */private void asyncHandleMessage(MqMessage mqMessage) {    List<Channel> channelList = registerConsumerService.getSubscribeList(mqMessage);    if(CollectionUtil.isEmpty(channelList)) {        log.info("监听列表为空,疏忽解决");        return;    }    BrokerPushContext brokerPushContext = new BrokerPushContext();    brokerPushContext.setChannelList(channelList);    brokerPushContext.setMqMessage(mqMessage);    brokerPushContext.setMqBrokerPersist(mqBrokerPersist);    brokerPushContext.setInvokeService(invokeService);    brokerPushContext.setRespTimeoutMills(respTimeoutMills);    brokerPushService.asyncPush(brokerPushContext);}

推送的外围实现如下:

package com.github.houbb.mq.broker.support.push;/** * @author binbin.hou * @since 0.0.3 */public class BrokerPushService implements IBrokerPushService {    private static final Log log = LogFactory.getLog(BrokerPushService.class);    private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();    @Override    public void asyncPush(final BrokerPushContext context) {        EXECUTOR_SERVICE.submit(new Runnable() {            @Override            public void run() {                log.info("开始异步解决 {}", JSON.toJSON(context));                final List<Channel> channelList = context.getChannelList();                final IMqBrokerPersist mqBrokerPersist = context.getMqBrokerPersist();                final MqMessage mqMessage = context.getMqMessage();                final String messageId = mqMessage.getTraceId();                final IInvokeService invokeService = context.getInvokeService();                final long responseTime = context.getRespTimeoutMills();                for(Channel channel : channelList) {                    try {                        String channelId = ChannelUtil.getChannelId(channel);                        log.info("开始解决 channelId: {}", channelId);                        //1. 调用                        mqMessage.setMethodType(MethodType.B_MESSAGE_PUSH);                        MqConsumerResultResp resultResp = callServer(channel, mqMessage,                                MqConsumerResultResp.class, invokeService, responseTime);                        //2. 更新状态                        mqBrokerPersist.updateStatus(messageId, resultResp.getConsumerStatus());                        //3. 前期增加重试策略                        log.info("实现解决 channelId: {}", channelId);                    } catch (Exception exception) {                        log.error("解决异样");                        mqBrokerPersist.updateStatus(messageId, ConsumerStatus.FAILED.getCode());                    }                }                log.info("实现异步解决");            }        });    }}

此处在音讯推送之后,须要更新音讯的 ACK 状态。

音讯生产者解决类

IBrokerProducerService 接口定义如下:

package com.github.houbb.mq.broker.api;/** * <p> 生产者注册服务类 </p> * * @author houbinbin * @since 0.0.3 */public interface IBrokerProducerService {    /**     * 注册以后服务信息     * (1)将该服务通过 {@link ServiceEntry#getGroupName()} 进行分组     * 订阅了这个 serviceId 的所有客户端     * @param serviceEntry 注册以后服务信息     * @param channel channel     * @since 0.0.8     */    MqCommonResp register(final ServiceEntry serviceEntry, Channel channel);    /**     * 登记以后服务信息     * @param serviceEntry 注册以后服务信息     * @param channel 通道     * @since 0.0.8     */    MqCommonResp unRegister(final ServiceEntry serviceEntry, Channel channel);    /**     * 获取服务地址信息     * @param channel channel     * @return 后果     * @since 0.0.3     */    ServiceEntry getServiceEntry(final Channel channel);}

实现如下:

本地基于 map 存储申请过去的根本信息。

package com.github.houbb.mq.broker.support.api;/** * <p> 生产者注册服务类 </p> * * @author houbinbin * @since 0.0.3 */public class LocalBrokerProducerService implements IBrokerProducerService {    private static final Log log = LogFactory.getLog(LocalBrokerProducerService.class);    private final Map<String, BrokerServiceEntryChannel> registerMap = new ConcurrentHashMap<>();    @Override    public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) {        final String channelId = ChannelUtil.getChannelId(channel);        BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);        registerMap.put(channelId, entryChannel);        MqCommonResp resp = new MqCommonResp();        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());        return resp;    }    @Override    public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) {        final String channelId = ChannelUtil.getChannelId(channel);        registerMap.remove(channelId);        MqCommonResp resp = new MqCommonResp();        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());        return resp;    }    @Override    public ServiceEntry getServiceEntry(Channel channel) {        final String channelId = ChannelUtil.getChannelId(channel);        return registerMap.get(channelId);    }}

音讯消费者解决类

接口定义如下:

package com.github.houbb.mq.broker.api;/** * <p> 消费者注册服务类 </p> * * @author houbinbin * @since 0.0.3 */public interface IBrokerConsumerService {    /**     * 注册以后服务信息     * (1)将该服务通过 {@link ServiceEntry#getGroupName()} 进行分组     * 订阅了这个 serviceId 的所有客户端     * @param serviceEntry 注册以后服务信息     * @param channel channel     * @since 0.0.3     */    MqCommonResp register(final ServiceEntry serviceEntry, Channel channel);    /**     * 登记以后服务信息     * @param serviceEntry 注册以后服务信息     * @param channel channel     * @since 0.0.3     */    MqCommonResp unRegister(final ServiceEntry serviceEntry, Channel channel);    /**     * 监听服务信息     * (1)监听之后,如果有任何相干的机器信息发生变化,则进行推送。     * (2)内置的信息,须要传送 ip 信息到注册核心。     *     * @param serviceEntry 客户端明细信息     * @param clientChannel 客户端 channel 信息     * @since 0.0.3     */    MqCommonResp subscribe(final ConsumerSubscribeReq serviceEntry,                   final Channel clientChannel);    /**     * 勾销监听服务信息     * (1)监听之后,如果有任何相干的机器信息发生变化,则进行推送。     * (2)内置的信息,须要传送 ip 信息到注册核心。     *     * @param serviceEntry 客户端明细信息     * @param clientChannel 客户端 channel 信息     * @since 0.0.3     */    MqCommonResp unSubscribe(final ConsumerUnSubscribeReq serviceEntry,                   final Channel clientChannel);    /**     * 获取所有匹配的消费者     * 1. 同一个 groupName 只返回一个,留神负载平衡     * 2. 返回匹配以后音讯的消费者通道     *     * @param mqMessage 音讯体     * @return 后果     */    List<Channel> getSubscribeList(MqMessage mqMessage);}

默认实现:

package com.github.houbb.mq.broker.support.api;/** * @author binbin.hou * @since 1.0.0 */public class LocalBrokerConsumerService implements IBrokerConsumerService {    private final Map<String, BrokerServiceEntryChannel> registerMap = new ConcurrentHashMap<>();    /**     * 订阅汇合     * key: topicName     * value: 对应的订阅列表     */    private final Map<String, Set<ConsumerSubscribeBo>> subscribeMap = new ConcurrentHashMap<>();    @Override    public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) {        final String channelId = ChannelUtil.getChannelId(channel);        BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);        registerMap.put(channelId, entryChannel);        MqCommonResp resp = new MqCommonResp();        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());        return resp;    }    @Override    public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) {        final String channelId = ChannelUtil.getChannelId(channel);        registerMap.remove(channelId);        MqCommonResp resp = new MqCommonResp();        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());        return resp;    }    @Override    public MqCommonResp subscribe(ConsumerSubscribeReq serviceEntry, Channel clientChannel) {        final String channelId = ChannelUtil.getChannelId(clientChannel);        final String topicName = serviceEntry.getTopicName();        Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);        if(set == null) {            set = new HashSet<>();        }        ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo();        subscribeBo.setChannelId(channelId);        subscribeBo.setGroupName(serviceEntry.getGroupName());        subscribeBo.setTopicName(topicName);        subscribeBo.setTagRegex(serviceEntry.getTagRegex());        set.add(subscribeBo);        subscribeMap.put(topicName, set);        MqCommonResp resp = new MqCommonResp();        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());        return resp;    }    @Override    public MqCommonResp unSubscribe(ConsumerUnSubscribeReq serviceEntry, Channel clientChannel) {        final String channelId = ChannelUtil.getChannelId(clientChannel);        final String topicName = serviceEntry.getTopicName();        ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo();        subscribeBo.setChannelId(channelId);        subscribeBo.setGroupName(serviceEntry.getGroupName());        subscribeBo.setTopicName(topicName);        subscribeBo.setTagRegex(serviceEntry.getTagRegex());        // 汇合        Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);        if(CollectionUtil.isNotEmpty(set)) {            set.remove(subscribeBo);        }        MqCommonResp resp = new MqCommonResp();        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());        return resp;    }    @Override    public List<Channel> getSubscribeList(MqMessage mqMessage) {        final String topicName = mqMessage.getTopic();        Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);        if(CollectionUtil.isEmpty(set)) {            return Collections.emptyList();        }        //2. 获取匹配的 tag 列表        final List<String> tagNameList = mqMessage.getTags();        Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>();        for(ConsumerSubscribeBo bo : set) {            String tagRegex = bo.getTagRegex();            if(hasMatch(tagNameList, tagRegex)) {                //TODO: 这种设置模式,对立增加解决                String groupName = bo.getGroupName();                List<ConsumerSubscribeBo> list = groupMap.get(groupName);                if(list == null) {                    list = new ArrayList<>();                }                list.add(bo);                groupMap.put(groupName, list);            }        }        //3. 依照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 抉择        final String shardingKey = mqMessage.getShardingKey();        List<Channel> channelList = new ArrayList<>();        for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) {            List<ConsumerSubscribeBo> list = entry.getValue();            ConsumerSubscribeBo bo = RandomUtils.random(list, shardingKey);            BrokerServiceEntryChannel entryChannel = registerMap.get(bo.getChannelId());            channelList.add(entryChannel.getChannel());        }        return channelList;    }    private boolean hasMatch(List<String> tagNameList,                             String tagRegex) {        if(CollectionUtil.isEmpty(tagNameList)) {            return false;        }        Pattern pattern = Pattern.compile(tagRegex);        for(String tagName : tagNameList) {            if(RegexUtils.match(pattern, tagName)) {                return true;            }        }        return false;    }}

getSubscribeList 的逻辑可能略微简单点,其实就是音讯过去,找到匹配的订阅消费者而已。

因为同一个 groupName 的消费者音讯只生产一次,所以须要一次分组。

音讯长久化

接口如下:

package com.github.houbb.mq.broker.support.persist;/** * @author binbin.hou * @since 0.0.3 */public interface IMqBrokerPersist {    /**     * 保留音讯     * @param mqMessage 音讯     * @since 0.0.3     */    MqCommonResp put(final MqMessagePersistPut mqMessage);    /**     * 更新状态     * @param messageId 音讯惟一标识     * @param status 状态     * @return 后果     * @since 0.0.3     */    MqCommonResp updateStatus(final String messageId,                              final String status);    /**     * 拉取音讯     * @param pull 拉取音讯     * @return 后果     */    MqConsumerPullResp pull(final MqConsumerPullReq pull, final Channel channel);}

本地默认实现:

package com.github.houbb.mq.broker.support.persist;/** * 本地长久化策略 * @author binbin.hou * @since 1.0.0 */public class LocalMqBrokerPersist implements IMqBrokerPersist {    private static final Log log = LogFactory.getLog(LocalMqBrokerPersist.class);    /**     * 队列     * ps: 这里只是简化实现,临时不思考并发等问题。     */    private final Map<String, List<MqMessagePersistPut>> map = new ConcurrentHashMap<>();    //1. 接管    //2. 长久化    //3. 告诉生产    @Override    public synchronized MqCommonResp put(MqMessagePersistPut put) {        log.info("put elem: {}", JSON.toJSON(put));        MqMessage mqMessage = put.getMqMessage();        final String topic = mqMessage.getTopic();        List<MqMessagePersistPut> list = map.get(topic);        if(list == null) {            list = new ArrayList<>();        }        list.add(put);        map.put(topic, list);        MqCommonResp commonResp = new MqCommonResp();        commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());        commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());        return commonResp;    }    @Override    public MqCommonResp updateStatus(String messageId, String status) {        // 这里性能比拟差,所以不能够用于生产。仅作为测试验证        for(List<MqMessagePersistPut> list : map.values()) {            for(MqMessagePersistPut put : list) {                MqMessage mqMessage = put.getMqMessage();                if(mqMessage.getTraceId().equals(messageId)) {                    put.setMessageStatus(status);                    break;                }            }        }        MqCommonResp commonResp = new MqCommonResp();        commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());        commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());        return commonResp;    }    @Override    public MqConsumerPullResp pull(MqConsumerPullReq pull, Channel channel) {        //TODO... 待实现        return null;    }}

ps: 后续将会基于 springboot+mysql 进行长久化策略实现。

消费者启动调整

咱们将生产者、消费者的启动都进行调整,连贯到 broker 中。

二者是相似的,此处以消费者为例。

外围启动类

package com.github.houbb.mq.consumer.core;/** * 推送生产策略 * * @author binbin.hou * @since 1.0.0 */public class MqConsumerPush extends Thread implements IMqConsumer  {    // 属性&设置    @Override    public void run() {        // 启动服务端        log.info("MQ 消费者开始启动服务端 groupName: {}, brokerAddress: {}",                groupName, brokerAddress);        //1. 参数校验        this.paramCheck();        try {            // channel handler            ChannelHandler channelHandler = this.initChannelHandler();            //channel future            this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress, channelHandler);            // register to broker            this.registerToBroker();            // 标识为可用            enableFlag = true;            log.info("MQ 消费者启动实现");        } catch (Exception e) {            log.error("MQ 消费者启动异样", e);            throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);        }    }    //订阅&勾销订阅    @Override    public void registerListener(IMqConsumerListener listener) {        this.mqListenerService.register(listener);    }}

初始化 handler

private ChannelHandler initChannelHandler() {    final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER);    final MqConsumerHandler mqConsumerHandler = new MqConsumerHandler(invokeService, mqListenerService);    // handler 实际上会被屡次调用,如果不是 @Shareable,应该每次都从新创立。    ChannelHandler handler = new ChannelInitializer<Channel>() {        @Override        protected void initChannel(Channel ch) throws Exception {            ch.pipeline()                    .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))                    .addLast(mqConsumerHandler);        }    };    return handler;}

注册到服务端

/** * 注册到所有的服务端 * @since 0.0.3 */private void registerToBroker() {    for(RpcChannelFuture channelFuture : this.channelFutureList) {        ServiceEntry serviceEntry = new ServiceEntry();        serviceEntry.setGroupName(groupName);        serviceEntry.setAddress(channelFuture.getAddress());        serviceEntry.setPort(channelFuture.getPort());        serviceEntry.setWeight(channelFuture.getWeight());        BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq();        brokerRegisterReq.setServiceEntry(serviceEntry);        brokerRegisterReq.setMethodType(MethodType.C_REGISTER);        brokerRegisterReq.setTraceId(IdHelper.uuid32());        log.info("[Register] 开始注册到 broker:{}", JSON.toJSON(brokerRegisterReq));        final Channel channel = channelFuture.getChannelFuture().channel();        MqCommonResp resp = callServer(channel, brokerRegisterReq, MqCommonResp.class);        log.info("[Register] 实现注册到 broker:{}", JSON.toJSON(resp));    }}

订阅与勾销订阅

消费者对于关怀的音讯,实现也比较简单:

public void subscribe(String topicName, String tagRegex) {    ConsumerSubscribeReq req = new ConsumerSubscribeReq();    String messageId = IdHelper.uuid32();    req.setTraceId(messageId);    req.setMethodType(MethodType.C_SUBSCRIBE);    req.setTopicName(topicName);    req.setTagRegex(tagRegex);    req.setGroupName(groupName);    Channel channel = getChannel();    MqCommonResp resp = callServer(channel, req, MqCommonResp.class);    if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {        throw new MqException(ConsumerRespCode.SUBSCRIBE_FAILED);    }}

勾销订阅:

public void unSubscribe(String topicName, String tagRegex) {    ConsumerUnSubscribeReq req = new ConsumerUnSubscribeReq();    String messageId = IdHelper.uuid32();    req.setTraceId(messageId);    req.setMethodType(MethodType.C_UN_SUBSCRIBE);    req.setTopicName(topicName);    req.setTagRegex(tagRegex);    req.setGroupName(groupName);    Channel channel = getChannel();    MqCommonResp resp = callServer(channel, req, MqCommonResp.class);    if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {        throw new MqException(ConsumerRespCode.UN_SUBSCRIBE_FAILED);    }}

测试

broker 启动

MqBroker broker = new MqBroker();broker.start();

启动日志:

[DEBUG] [2022-04-21 20:36:27.158] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.[INFO] [2022-04-21 20:36:27.186] [Thread-0] [c.g.h.m.b.c.MqBroker.run] - MQ 中间人开始启动服务端 port: 9999[INFO] [2022-04-21 20:36:29.060] [Thread-0] [c.g.h.m.b.c.MqBroker.run] - MQ 中间人启动实现,监听【9999】端口

consumer 启动

final MqConsumerPush mqConsumerPush = new MqConsumerPush();mqConsumerPush.start();mqConsumerPush.subscribe("TOPIC", "TAGA");mqConsumerPush.registerListener(new IMqConsumerListener() {    @Override    public ConsumerStatus consumer(MqMessage mqMessage, IMqConsumerListenerContext context) {        System.out.println("---------- 自定义 " + JSON.toJSONString(mqMessage));        return ConsumerStatus.SUCCESS;    }});

启动日志:

...[INFO] [2022-04-21 20:37:40.985] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.registerToBroker] - [Register] 实现注册到 broker:{"respMessage":"胜利","respCode":"0000"}

启动时会注册到 broker。

producer 启动

MqProducer mqProducer = new MqProducer();mqProducer.start();String message = "HELLO MQ!";MqMessage mqMessage = new MqMessage();mqMessage.setTopic("TOPIC");mqMessage.setTags(Arrays.asList("TAGA", "TAGB"));mqMessage.setPayload(message);SendResult sendResult = mqProducer.send(mqMessage);System.out.println(JSON.toJSON(sendResult));

日志:

...[INFO] [2022-04-21 20:39:17.885] [Thread-0] [c.g.h.m.p.c.MqProducer.registerToBroker] - [Register] 实现注册到 broker:{"respMessage":"胜利","respCode":"0000"}...

此时消费者生产到咱们发送的音讯。

---------- 自定义 {"methodType":"B_MESSAGE_PUSH","payload":"HELLO MQ!","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"2237bbfe55b842328134e6a100e36364"}

小结

到这里,咱们就实现了基于中间人的生产者与消费者通信。

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq

拓展浏览

rpc-从零开始实现 rpc https://github.com/houbb/rpc