乐趣区

关于mq:mq从零开始实现-mq09消费者拉取消息-pull-message

前景回顾

【mq】从零开始实现 mq-01- 生产者、消费者启动

【mq】从零开始实现 mq-02- 如何实现生产者调用消费者?

【mq】从零开始实现 mq-03- 引入 broker 中间人

【mq】从零开始实现 mq-04- 启动检测与实现优化

【mq】从零开始实现 mq-05- 实现优雅停机

【mq】从零开始实现 mq-06- 消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07- 负载平衡 load balance

【mq】从零开始实现 mq-08- 配置优化 fluent

【mq】从零开始实现 mq-09- 消费者拉取音讯 pull message

音讯的推与拉

大家好,我是老马。

这一节咱们来一起看一下 MQ 音讯中的推和拉两种模式。

音讯由 broker 间接推送给消费者,实时性比拟好。

毛病是如果消费者解决不过去,就会造成大量问题。

音讯由消费者定时从 broker 拉取,长处是实现简略,能够依据消费者本人的解决能力来生产。

毛病是实时性绝对较差。

理论业务中,须要联合具体的场景,抉择适合的策略。

拉取策略实现

push 策略

咱们首先看一下 push 策略的简化外围实现:

package com.github.houbb.mq.consumer.core;

/**
 * 推送生产策略
 *
 * @author binbin.hou
 * @since 1.0.0
 */
public class MqConsumerPush extends Thread implements IMqConsumer  {

    @Override
    public void run() {
        // 启动服务端
        log.info("MQ 消费者开始启动服务端 groupName: {}, brokerAddress: {}",
                groupName, brokerAddress);

        //1. 参数校验
        this.paramCheck();

        try {
            //0. 配置信息
            //1. 初始化
            //2. 连贯到服务端
            //3. 标识为可用
            //4. 增加钩子函数

            //5. 启动实现当前的事件
            this.afterInit();

            log.info("MQ 消费者启动实现");
        } catch (Exception e) {log.error("MQ 消费者启动异样", e);
            throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);
        }
    }

    /**
     * 初始化实现当前
     */
    protected void afterInit() {}

    // 其余办法

    /**
     * 获取生产策略类型
     * @return 类型
     * @since 0.0.9
     */
    protected String getConsumerType() {return ConsumerTypeConst.PUSH;}

}

咱们在 push 中预留了一个 afterInit 办法,便于子类重载。

pull 策略

消费者实现

package com.github.houbb.mq.consumer.core;

/**
 * 拉取生产策略
 *
 * @author binbin.hou
 * @since 0.0.9
 */
public class MqConsumerPull extends MqConsumerPush  {private static final Log log = LogFactory.getLog(MqConsumerPull.class);

    /**
     * 拉取定时工作
     *
     * @since 0.0.9
     */
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    /**
     * 单次拉取大小
     * @since 0.0.9
     */
    private int size = 10;

    /**
     * 初始化提早毫秒数
     * @since 0.0.9
     */
    private int pullInitDelaySeconds = 5;

    /**
     * 拉取周期
     * @since 0.0.9
     */
    private int pullPeriodSeconds = 5;

    /**
     * 订阅列表
     * @since 0.0.9
     */
    private final List<MqTopicTagDto> subscribeList = new ArrayList<>();

    // 设置

    

    @Override
    protected String getConsumerType() {return ConsumerTypeConst.PULL;}

    @Override
    public synchronized void subscribe(String topicName, String tagRegex) {MqTopicTagDto tagDto = buildMqTopicTagDto(topicName, tagRegex);

        if(!subscribeList.contains(tagDto)) {subscribeList.add(tagDto);
        }
    }

    @Override
    public void unSubscribe(String topicName, String tagRegex) {MqTopicTagDto tagDto = buildMqTopicTagDto(topicName, tagRegex);

        subscribeList.remove(tagDto);
    }

    private MqTopicTagDto buildMqTopicTagDto(String topicName, String tagRegex) {MqTopicTagDto dto = new MqTopicTagDto();
        dto.setTagRegex(tagRegex);
        dto.setTopicName(topicName);
        return dto;
    }

}

订阅相干

pull 策略能够把订阅 / 勾销订阅放在本地,防止与服务端的交互。

定时拉取

咱们重载了 push 策略的 afterInit 办法。

/**
 * 初始化拉取音讯
 * @since 0.0.6
 */
@Override
public void afterInit() {
    //5S 发一次心跳
    scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {if(CollectionUtil.isEmpty(subscribeList)) {log.warn("订阅列表为空,疏忽解决。");
                return;
            }
            for(MqTopicTagDto tagDto : subscribeList) {final String topicName = tagDto.getTopicName();
                final String tagRegex = tagDto.getTagRegex();
                MqConsumerPullResp resp = consumerBrokerService.pull(topicName, tagRegex, size);
                if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {List<MqMessage> mqMessageList = resp.getList();
                    if(CollectionUtil.isNotEmpty(mqMessageList)) {for(MqMessage mqMessage : mqMessageList) {IMqConsumerListenerContext context = new MqConsumerListenerContext();
                            mqListenerService.consumer(mqMessage, context);
                        }
                    }
                } else {log.error("拉取音讯失败: {}", JSON.toJSON(resp));
                }
            }
        }
    }, pullInitDelaySeconds, pullPeriodSeconds, TimeUnit.SECONDS);
}

利用启动时,指定工夫定时拉取音讯并进行生产解决。

其中 consumerBrokerService.pull(topicName, tagRegex, size); 拉取实现如下:

public MqConsumerPullResp pull(String topicName, String tagRegex, int fetchSize) {MqConsumerPullReq req = new MqConsumerPullReq();
    req.setSize(fetchSize);
    req.setGroupName(groupName);
    req.setTagRegex(tagRegex);
    req.setTopicName(topicName);
    final String traceId = IdHelper.uuid32();
    req.setTraceId(traceId);
    req.setMethodType(MethodType.C_MESSAGE_PULL);

    Channel channel = getChannel(null);
    return this.callServer(channel, req, MqConsumerPullResp.class);
}

Borker 相干

音讯散发

// 消费者被动 pull
if(MethodType.C_MESSAGE_PULL.equals(methodType)) {MqConsumerPullReq req = JSON.parseObject(json, MqConsumerPullReq.class);
    return mqBrokerPersist.pull(req, channel);
}

实现

mqBrokerPersist 是一个接口,此处演示基于本地实现的,后续会实现基于数据库的长久化。

原理是相似的,此处仅作为演示。

@Override
public MqConsumerPullResp pull(MqConsumerPullReq pullReq, Channel channel) {
    //1. 拉取匹配的信息
    //2. 状态更新为代理中
    //3. 如何更新对应的生产状态呢?// 获取状态为 W 的订单
    final int fetchSize = pullReq.getSize();
    final String topic = pullReq.getTopicName();
    final String tagRegex = pullReq.getTagRegex();
    List<MqMessage> resultList = new ArrayList<>(fetchSize);
    List<MqMessagePersistPut> putList = map.get(topic);
    // 性能比拟差
    if(CollectionUtil.isNotEmpty(putList)) {for(MqMessagePersistPut put : putList) {final String status = put.getMessageStatus();
            if(!MessageStatusConst.WAIT_CONSUMER.equals(status)) {continue;}
            final MqMessage mqMessage = put.getMqMessage();
            List<String> tagList = mqMessage.getTags();
            if(InnerRegexUtils.hasMatch(tagList, tagRegex)) {
                // 设置为解决中
                // TODO:音讯的最终状态什么时候更新呢?// 能够给 broker 一个 ACK
                put.setMessageStatus(MessageStatusConst.PROCESS_CONSUMER);
                resultList.add(mqMessage);
            }
            if(resultList.size() >= fetchSize) {break;}
        }
    }
    MqConsumerPullResp resp = new MqConsumerPullResp();
    resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
    resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
    resp.setList(resultList);
    return resp;
}

咱们遍历找到匹配的音讯,将其状态更新为中间状态。

不过这里还是短少了一个要害的步骤,那就是音讯的 ACK。

咱们将在下一大节进行实现。

小结

音讯的推送和拉取各有本人的优缺点,须要咱们联合本人的业务,进行抉择。

一般而言,IM 更加适宜音讯的推送;个别的业务,为了削峰填谷,更加适宜拉取的模式。

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq

拓展浏览

rpc- 从零开始实现 rpc https://github.com/houbb/rpc

退出移动版