前景回顾
【mq】从零开始实现 mq-01-生产者、消费者启动
【mq】从零开始实现 mq-02-如何实现生产者调用消费者?
【mq】从零开始实现 mq-03-引入 broker 中间人
上一节咱们学习了如何实现生产者给消费者发送音讯,然而是通过直连的形式。
那么如何能力达到解耦的成果呢?
答案就是引入 broker,音讯的中间人。
MqBroker 实现
外围启动类
相似咱们后面 consumer 的启动实现:
package com.github.houbb.mq.broker.core;/** * @author binbin.hou * @since 1.0.0 */public class MqBroker extends Thread implements IMqBroker { // 省略 private ChannelHandler initChannelHandler() { MqBrokerHandler handler = new MqBrokerHandler(); handler.setInvokeService(invokeService); handler.setRegisterConsumerService(registerConsumerService); handler.setRegisterProducerService(registerProducerService); handler.setMqBrokerPersist(mqBrokerPersist); handler.setBrokerPushService(brokerPushService); handler.setRespTimeoutMills(respTimeoutMills); return handler; } @Override public void run() { // 启动服务端 log.info("MQ 中间人开始启动服务端 port: {}", port); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(workerGroup, bossGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf)) .addLast(initChannelHandler()); } }) // 这个参数影响的是还没有被accept 取出的连贯 .option(ChannelOption.SO_BACKLOG, 128) // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 .childOption(ChannelOption.SO_KEEPALIVE, true); // 绑定端口,开始接管进来的链接 ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly(); log.info("MQ 中间人启动实现,监听【" + port + "】端口"); channelFuture.channel().closeFuture().syncUninterruptibly(); log.info("MQ 中间人敞开实现"); } catch (Exception e) { log.error("MQ 中间人启动异样", e); throw new MqException(BrokerRespCode.RPC_INIT_FAILED); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }}
initChannelHandler
中有不少新面孔,咱们前面会具体介绍。
MqBrokerHandler 解决逻辑
package com.github.houbb.mq.broker.handler;import java.util.List;/** * @author binbin.hou * @since 1.0.0 */public class MqBrokerHandler extends SimpleChannelInboundHandler { private static final Log log = LogFactory.getLog(MqBrokerHandler.class); /** * 调用治理类 * @since 1.0.0 */ private IInvokeService invokeService; /** * 消费者治理 * @since 0.0.3 */ private IBrokerConsumerService registerConsumerService; /** * 生产者治理 * @since 0.0.3 */ private IBrokerProducerService registerProducerService; /** * 长久化类 * @since 0.0.3 */ private IMqBrokerPersist mqBrokerPersist; /** * 推送服务 * @since 0.0.3 */ private IBrokerPushService brokerPushService; /** * 获取响应超时工夫 * @since 0.0.3 */ private long respTimeoutMills; //set 办法 @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); RpcMessageDto rpcMessageDto = null; try { rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class); } catch (Exception exception) { log.error("RpcMessageDto json 格局转换异样 {}", new String(bytes)); return; } if (rpcMessageDto.isRequest()) { MqCommonResp commonResp = this.dispatch(rpcMessageDto, ctx); if(commonResp == null) { log.debug("以后音讯为 null,疏忽解决。"); return; } // 写回响应,和以前相似。 writeResponse(rpcMessageDto, commonResp, ctx); } else { final String traceId = rpcMessageDto.getTraceId(); // 抛弃掉 traceId 为空的信息 if(StringUtil.isBlank(traceId)) { log.debug("[Server Response] response traceId 为空,间接抛弃", JSON.toJSON(rpcMessageDto)); return; } // 增加音讯 invokeService.addResponse(traceId, rpcMessageDto); } } /** * 异步解决音讯 * @param mqMessage 音讯 * @since 0.0.3 */ private void asyncHandleMessage(MqMessage mqMessage) { List<Channel> channelList = registerConsumerService.getSubscribeList(mqMessage); if(CollectionUtil.isEmpty(channelList)) { log.info("监听列表为空,疏忽解决"); return; } BrokerPushContext brokerPushContext = new BrokerPushContext(); brokerPushContext.setChannelList(channelList); brokerPushContext.setMqMessage(mqMessage); brokerPushContext.setMqBrokerPersist(mqBrokerPersist); brokerPushContext.setInvokeService(invokeService); brokerPushContext.setRespTimeoutMills(respTimeoutMills); brokerPushService.asyncPush(brokerPushContext); }}
音讯散发
broker 接管到音讯当前,dispatch 实现如下:
/** * 音讯的散发 * * @param rpcMessageDto 入参 * @param ctx 上下文 * @return 后果 */private MqCommonResp dispatch(RpcMessageDto rpcMessageDto, ChannelHandlerContext ctx) { try { final String methodType = rpcMessageDto.getMethodType(); final String json = rpcMessageDto.getJson(); String channelId = ChannelUtil.getChannelId(ctx); final Channel channel = ctx.channel(); log.debug("channelId: {} 接管到 method: {} 内容:{}", channelId, methodType, json); // 生产者注册 if(MethodType.P_REGISTER.equals(methodType)) { BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class); return registerProducerService.register(registerReq.getServiceEntry(), channel); } // 生产者登记 if(MethodType.P_UN_REGISTER.equals(methodType)) { BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class); return registerProducerService.unRegister(registerReq.getServiceEntry(), channel); } // 生产者音讯发送 if(MethodType.P_SEND_MSG.equals(methodType)) { MqMessage mqMessage = JSON.parseObject(json, MqMessage.class); MqMessagePersistPut persistPut = new MqMessagePersistPut(); persistPut.setMqMessage(mqMessage); persistPut.setMessageStatus(MessageStatusConst.WAIT_CONSUMER); MqCommonResp commonResp = mqBrokerPersist.put(persistPut); this.asyncHandleMessage(mqMessage); return commonResp; } // 生产者音讯发送-ONE WAY if(MethodType.P_SEND_MSG_ONE_WAY.equals(methodType)) { MqMessage mqMessage = JSON.parseObject(json, MqMessage.class); MqMessagePersistPut persistPut = new MqMessagePersistPut(); persistPut.setMqMessage(mqMessage); persistPut.setMessageStatus(MessageStatusConst.WAIT_CONSUMER); mqBrokerPersist.put(persistPut); this.asyncHandleMessage(mqMessage); return null; } // 消费者注册 if(MethodType.C_REGISTER.equals(methodType)) { BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class); return registerConsumerService.register(registerReq.getServiceEntry(), channel); } // 消费者登记 if(MethodType.C_UN_REGISTER.equals(methodType)) { BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class); return registerConsumerService.unRegister(registerReq.getServiceEntry(), channel); } // 消费者监听注册 if(MethodType.C_SUBSCRIBE.equals(methodType)) { ConsumerSubscribeReq req = JSON.parseObject(json, ConsumerSubscribeReq.class); return registerConsumerService.subscribe(req, channel); } // 消费者监听登记 if(MethodType.C_UN_SUBSCRIBE.equals(methodType)) { ConsumerUnSubscribeReq req = JSON.parseObject(json, ConsumerUnSubscribeReq.class); return registerConsumerService.unSubscribe(req, channel); } // 消费者被动 pull if(MethodType.C_MESSAGE_PULL.equals(methodType)) { MqConsumerPullReq req = JSON.parseObject(json, MqConsumerPullReq.class); return mqBrokerPersist.pull(req, channel); } throw new UnsupportedOperationException("暂不反对的办法类型"); } catch (Exception exception) { log.error("执行异样", exception); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.FAIL.getCode()); resp.setRespMessage(MqCommonRespCode.FAIL.getMsg()); return resp; }}
音讯推送
this.asyncHandleMessage(mqMessage);
是 broker 接管到音讯之后的解决逻辑。
/** * 异步解决音讯 * @param mqMessage 音讯 * @since 0.0.3 */private void asyncHandleMessage(MqMessage mqMessage) { List<Channel> channelList = registerConsumerService.getSubscribeList(mqMessage); if(CollectionUtil.isEmpty(channelList)) { log.info("监听列表为空,疏忽解决"); return; } BrokerPushContext brokerPushContext = new BrokerPushContext(); brokerPushContext.setChannelList(channelList); brokerPushContext.setMqMessage(mqMessage); brokerPushContext.setMqBrokerPersist(mqBrokerPersist); brokerPushContext.setInvokeService(invokeService); brokerPushContext.setRespTimeoutMills(respTimeoutMills); brokerPushService.asyncPush(brokerPushContext);}
推送的外围实现如下:
package com.github.houbb.mq.broker.support.push;/** * @author binbin.hou * @since 0.0.3 */public class BrokerPushService implements IBrokerPushService { private static final Log log = LogFactory.getLog(BrokerPushService.class); private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor(); @Override public void asyncPush(final BrokerPushContext context) { EXECUTOR_SERVICE.submit(new Runnable() { @Override public void run() { log.info("开始异步解决 {}", JSON.toJSON(context)); final List<Channel> channelList = context.getChannelList(); final IMqBrokerPersist mqBrokerPersist = context.getMqBrokerPersist(); final MqMessage mqMessage = context.getMqMessage(); final String messageId = mqMessage.getTraceId(); final IInvokeService invokeService = context.getInvokeService(); final long responseTime = context.getRespTimeoutMills(); for(Channel channel : channelList) { try { String channelId = ChannelUtil.getChannelId(channel); log.info("开始解决 channelId: {}", channelId); //1. 调用 mqMessage.setMethodType(MethodType.B_MESSAGE_PUSH); MqConsumerResultResp resultResp = callServer(channel, mqMessage, MqConsumerResultResp.class, invokeService, responseTime); //2. 更新状态 mqBrokerPersist.updateStatus(messageId, resultResp.getConsumerStatus()); //3. 前期增加重试策略 log.info("实现解决 channelId: {}", channelId); } catch (Exception exception) { log.error("解决异样"); mqBrokerPersist.updateStatus(messageId, ConsumerStatus.FAILED.getCode()); } } log.info("实现异步解决"); } }); }}
此处在音讯推送之后,须要更新音讯的 ACK 状态。
音讯生产者解决类
IBrokerProducerService
接口定义如下:
package com.github.houbb.mq.broker.api;/** * <p> 生产者注册服务类 </p> * * @author houbinbin * @since 0.0.3 */public interface IBrokerProducerService { /** * 注册以后服务信息 * (1)将该服务通过 {@link ServiceEntry#getGroupName()} 进行分组 * 订阅了这个 serviceId 的所有客户端 * @param serviceEntry 注册以后服务信息 * @param channel channel * @since 0.0.8 */ MqCommonResp register(final ServiceEntry serviceEntry, Channel channel); /** * 登记以后服务信息 * @param serviceEntry 注册以后服务信息 * @param channel 通道 * @since 0.0.8 */ MqCommonResp unRegister(final ServiceEntry serviceEntry, Channel channel); /** * 获取服务地址信息 * @param channel channel * @return 后果 * @since 0.0.3 */ ServiceEntry getServiceEntry(final Channel channel);}
实现如下:
本地基于 map 存储申请过去的根本信息。
package com.github.houbb.mq.broker.support.api;/** * <p> 生产者注册服务类 </p> * * @author houbinbin * @since 0.0.3 */public class LocalBrokerProducerService implements IBrokerProducerService { private static final Log log = LogFactory.getLog(LocalBrokerProducerService.class); private final Map<String, BrokerServiceEntryChannel> registerMap = new ConcurrentHashMap<>(); @Override public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) { final String channelId = ChannelUtil.getChannelId(channel); BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel); registerMap.put(channelId, entryChannel); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) { final String channelId = ChannelUtil.getChannelId(channel); registerMap.remove(channelId); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public ServiceEntry getServiceEntry(Channel channel) { final String channelId = ChannelUtil.getChannelId(channel); return registerMap.get(channelId); }}
音讯消费者解决类
接口定义如下:
package com.github.houbb.mq.broker.api;/** * <p> 消费者注册服务类 </p> * * @author houbinbin * @since 0.0.3 */public interface IBrokerConsumerService { /** * 注册以后服务信息 * (1)将该服务通过 {@link ServiceEntry#getGroupName()} 进行分组 * 订阅了这个 serviceId 的所有客户端 * @param serviceEntry 注册以后服务信息 * @param channel channel * @since 0.0.3 */ MqCommonResp register(final ServiceEntry serviceEntry, Channel channel); /** * 登记以后服务信息 * @param serviceEntry 注册以后服务信息 * @param channel channel * @since 0.0.3 */ MqCommonResp unRegister(final ServiceEntry serviceEntry, Channel channel); /** * 监听服务信息 * (1)监听之后,如果有任何相干的机器信息发生变化,则进行推送。 * (2)内置的信息,须要传送 ip 信息到注册核心。 * * @param serviceEntry 客户端明细信息 * @param clientChannel 客户端 channel 信息 * @since 0.0.3 */ MqCommonResp subscribe(final ConsumerSubscribeReq serviceEntry, final Channel clientChannel); /** * 勾销监听服务信息 * (1)监听之后,如果有任何相干的机器信息发生变化,则进行推送。 * (2)内置的信息,须要传送 ip 信息到注册核心。 * * @param serviceEntry 客户端明细信息 * @param clientChannel 客户端 channel 信息 * @since 0.0.3 */ MqCommonResp unSubscribe(final ConsumerUnSubscribeReq serviceEntry, final Channel clientChannel); /** * 获取所有匹配的消费者 * 1. 同一个 groupName 只返回一个,留神负载平衡 * 2. 返回匹配以后音讯的消费者通道 * * @param mqMessage 音讯体 * @return 后果 */ List<Channel> getSubscribeList(MqMessage mqMessage);}
默认实现:
package com.github.houbb.mq.broker.support.api;/** * @author binbin.hou * @since 1.0.0 */public class LocalBrokerConsumerService implements IBrokerConsumerService { private final Map<String, BrokerServiceEntryChannel> registerMap = new ConcurrentHashMap<>(); /** * 订阅汇合 * key: topicName * value: 对应的订阅列表 */ private final Map<String, Set<ConsumerSubscribeBo>> subscribeMap = new ConcurrentHashMap<>(); @Override public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) { final String channelId = ChannelUtil.getChannelId(channel); BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel); registerMap.put(channelId, entryChannel); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) { final String channelId = ChannelUtil.getChannelId(channel); registerMap.remove(channelId); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public MqCommonResp subscribe(ConsumerSubscribeReq serviceEntry, Channel clientChannel) { final String channelId = ChannelUtil.getChannelId(clientChannel); final String topicName = serviceEntry.getTopicName(); Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName); if(set == null) { set = new HashSet<>(); } ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo(); subscribeBo.setChannelId(channelId); subscribeBo.setGroupName(serviceEntry.getGroupName()); subscribeBo.setTopicName(topicName); subscribeBo.setTagRegex(serviceEntry.getTagRegex()); set.add(subscribeBo); subscribeMap.put(topicName, set); MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public MqCommonResp unSubscribe(ConsumerUnSubscribeReq serviceEntry, Channel clientChannel) { final String channelId = ChannelUtil.getChannelId(clientChannel); final String topicName = serviceEntry.getTopicName(); ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo(); subscribeBo.setChannelId(channelId); subscribeBo.setGroupName(serviceEntry.getGroupName()); subscribeBo.setTopicName(topicName); subscribeBo.setTagRegex(serviceEntry.getTagRegex()); // 汇合 Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName); if(CollectionUtil.isNotEmpty(set)) { set.remove(subscribeBo); } MqCommonResp resp = new MqCommonResp(); resp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return resp; } @Override public List<Channel> getSubscribeList(MqMessage mqMessage) { final String topicName = mqMessage.getTopic(); Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName); if(CollectionUtil.isEmpty(set)) { return Collections.emptyList(); } //2. 获取匹配的 tag 列表 final List<String> tagNameList = mqMessage.getTags(); Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>(); for(ConsumerSubscribeBo bo : set) { String tagRegex = bo.getTagRegex(); if(hasMatch(tagNameList, tagRegex)) { //TODO: 这种设置模式,对立增加解决 String groupName = bo.getGroupName(); List<ConsumerSubscribeBo> list = groupMap.get(groupName); if(list == null) { list = new ArrayList<>(); } list.add(bo); groupMap.put(groupName, list); } } //3. 依照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 抉择 final String shardingKey = mqMessage.getShardingKey(); List<Channel> channelList = new ArrayList<>(); for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) { List<ConsumerSubscribeBo> list = entry.getValue(); ConsumerSubscribeBo bo = RandomUtils.random(list, shardingKey); BrokerServiceEntryChannel entryChannel = registerMap.get(bo.getChannelId()); channelList.add(entryChannel.getChannel()); } return channelList; } private boolean hasMatch(List<String> tagNameList, String tagRegex) { if(CollectionUtil.isEmpty(tagNameList)) { return false; } Pattern pattern = Pattern.compile(tagRegex); for(String tagName : tagNameList) { if(RegexUtils.match(pattern, tagName)) { return true; } } return false; }}
getSubscribeList
的逻辑可能略微简单点,其实就是音讯过去,找到匹配的订阅消费者而已。
因为同一个 groupName 的消费者音讯只生产一次,所以须要一次分组。
音讯长久化
接口如下:
package com.github.houbb.mq.broker.support.persist;/** * @author binbin.hou * @since 0.0.3 */public interface IMqBrokerPersist { /** * 保留音讯 * @param mqMessage 音讯 * @since 0.0.3 */ MqCommonResp put(final MqMessagePersistPut mqMessage); /** * 更新状态 * @param messageId 音讯惟一标识 * @param status 状态 * @return 后果 * @since 0.0.3 */ MqCommonResp updateStatus(final String messageId, final String status); /** * 拉取音讯 * @param pull 拉取音讯 * @return 后果 */ MqConsumerPullResp pull(final MqConsumerPullReq pull, final Channel channel);}
本地默认实现:
package com.github.houbb.mq.broker.support.persist;/** * 本地长久化策略 * @author binbin.hou * @since 1.0.0 */public class LocalMqBrokerPersist implements IMqBrokerPersist { private static final Log log = LogFactory.getLog(LocalMqBrokerPersist.class); /** * 队列 * ps: 这里只是简化实现,临时不思考并发等问题。 */ private final Map<String, List<MqMessagePersistPut>> map = new ConcurrentHashMap<>(); //1. 接管 //2. 长久化 //3. 告诉生产 @Override public synchronized MqCommonResp put(MqMessagePersistPut put) { log.info("put elem: {}", JSON.toJSON(put)); MqMessage mqMessage = put.getMqMessage(); final String topic = mqMessage.getTopic(); List<MqMessagePersistPut> list = map.get(topic); if(list == null) { list = new ArrayList<>(); } list.add(put); map.put(topic, list); MqCommonResp commonResp = new MqCommonResp(); commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return commonResp; } @Override public MqCommonResp updateStatus(String messageId, String status) { // 这里性能比拟差,所以不能够用于生产。仅作为测试验证 for(List<MqMessagePersistPut> list : map.values()) { for(MqMessagePersistPut put : list) { MqMessage mqMessage = put.getMqMessage(); if(mqMessage.getTraceId().equals(messageId)) { put.setMessageStatus(status); break; } } } MqCommonResp commonResp = new MqCommonResp(); commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode()); commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg()); return commonResp; } @Override public MqConsumerPullResp pull(MqConsumerPullReq pull, Channel channel) { //TODO... 待实现 return null; }}
ps: 后续将会基于 springboot+mysql 进行长久化策略实现。
消费者启动调整
咱们将生产者、消费者的启动都进行调整,连贯到 broker 中。
二者是相似的,此处以消费者为例。
外围启动类
package com.github.houbb.mq.consumer.core;/** * 推送生产策略 * * @author binbin.hou * @since 1.0.0 */public class MqConsumerPush extends Thread implements IMqConsumer { // 属性&设置 @Override public void run() { // 启动服务端 log.info("MQ 消费者开始启动服务端 groupName: {}, brokerAddress: {}", groupName, brokerAddress); //1. 参数校验 this.paramCheck(); try { // channel handler ChannelHandler channelHandler = this.initChannelHandler(); //channel future this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress, channelHandler); // register to broker this.registerToBroker(); // 标识为可用 enableFlag = true; log.info("MQ 消费者启动实现"); } catch (Exception e) { log.error("MQ 消费者启动异样", e); throw new MqException(ConsumerRespCode.RPC_INIT_FAILED); } } //订阅&勾销订阅 @Override public void registerListener(IMqConsumerListener listener) { this.mqListenerService.register(listener); }}
初始化 handler
private ChannelHandler initChannelHandler() { final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER); final MqConsumerHandler mqConsumerHandler = new MqConsumerHandler(invokeService, mqListenerService); // handler 实际上会被屡次调用,如果不是 @Shareable,应该每次都从新创立。 ChannelHandler handler = new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf)) .addLast(mqConsumerHandler); } }; return handler;}
注册到服务端
/** * 注册到所有的服务端 * @since 0.0.3 */private void registerToBroker() { for(RpcChannelFuture channelFuture : this.channelFutureList) { ServiceEntry serviceEntry = new ServiceEntry(); serviceEntry.setGroupName(groupName); serviceEntry.setAddress(channelFuture.getAddress()); serviceEntry.setPort(channelFuture.getPort()); serviceEntry.setWeight(channelFuture.getWeight()); BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq(); brokerRegisterReq.setServiceEntry(serviceEntry); brokerRegisterReq.setMethodType(MethodType.C_REGISTER); brokerRegisterReq.setTraceId(IdHelper.uuid32()); log.info("[Register] 开始注册到 broker:{}", JSON.toJSON(brokerRegisterReq)); final Channel channel = channelFuture.getChannelFuture().channel(); MqCommonResp resp = callServer(channel, brokerRegisterReq, MqCommonResp.class); log.info("[Register] 实现注册到 broker:{}", JSON.toJSON(resp)); }}
订阅与勾销订阅
消费者对于关怀的音讯,实现也比较简单:
public void subscribe(String topicName, String tagRegex) { ConsumerSubscribeReq req = new ConsumerSubscribeReq(); String messageId = IdHelper.uuid32(); req.setTraceId(messageId); req.setMethodType(MethodType.C_SUBSCRIBE); req.setTopicName(topicName); req.setTagRegex(tagRegex); req.setGroupName(groupName); Channel channel = getChannel(); MqCommonResp resp = callServer(channel, req, MqCommonResp.class); if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { throw new MqException(ConsumerRespCode.SUBSCRIBE_FAILED); }}
勾销订阅:
public void unSubscribe(String topicName, String tagRegex) { ConsumerUnSubscribeReq req = new ConsumerUnSubscribeReq(); String messageId = IdHelper.uuid32(); req.setTraceId(messageId); req.setMethodType(MethodType.C_UN_SUBSCRIBE); req.setTopicName(topicName); req.setTagRegex(tagRegex); req.setGroupName(groupName); Channel channel = getChannel(); MqCommonResp resp = callServer(channel, req, MqCommonResp.class); if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) { throw new MqException(ConsumerRespCode.UN_SUBSCRIBE_FAILED); }}
测试
broker 启动
MqBroker broker = new MqBroker();broker.start();
启动日志:
[DEBUG] [2022-04-21 20:36:27.158] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.[INFO] [2022-04-21 20:36:27.186] [Thread-0] [c.g.h.m.b.c.MqBroker.run] - MQ 中间人开始启动服务端 port: 9999[INFO] [2022-04-21 20:36:29.060] [Thread-0] [c.g.h.m.b.c.MqBroker.run] - MQ 中间人启动实现,监听【9999】端口
consumer 启动
final MqConsumerPush mqConsumerPush = new MqConsumerPush();mqConsumerPush.start();mqConsumerPush.subscribe("TOPIC", "TAGA");mqConsumerPush.registerListener(new IMqConsumerListener() { @Override public ConsumerStatus consumer(MqMessage mqMessage, IMqConsumerListenerContext context) { System.out.println("---------- 自定义 " + JSON.toJSONString(mqMessage)); return ConsumerStatus.SUCCESS; }});
启动日志:
...[INFO] [2022-04-21 20:37:40.985] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.registerToBroker] - [Register] 实现注册到 broker:{"respMessage":"胜利","respCode":"0000"}
启动时会注册到 broker。
producer 启动
MqProducer mqProducer = new MqProducer();mqProducer.start();String message = "HELLO MQ!";MqMessage mqMessage = new MqMessage();mqMessage.setTopic("TOPIC");mqMessage.setTags(Arrays.asList("TAGA", "TAGB"));mqMessage.setPayload(message);SendResult sendResult = mqProducer.send(mqMessage);System.out.println(JSON.toJSON(sendResult));
日志:
...[INFO] [2022-04-21 20:39:17.885] [Thread-0] [c.g.h.m.p.c.MqProducer.registerToBroker] - [Register] 实现注册到 broker:{"respMessage":"胜利","respCode":"0000"}...
此时消费者生产到咱们发送的音讯。
---------- 自定义 {"methodType":"B_MESSAGE_PUSH","payload":"HELLO MQ!","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"2237bbfe55b842328134e6a100e36364"}
小结
到这里,咱们就实现了基于中间人的生产者与消费者通信。
心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。
我是老马,期待与你的下次重逢。
开源地址
The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq
拓展浏览
rpc-从零开始实现 rpc https://github.com/houbb/rpc