前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

【mq】从零开始实现 mq-05-实现优雅停机

【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07-负载平衡 load balance

【mq】从零开始实现 mq-08-配置优化 fluent

fluent

大家好,我是老马。

fluent 的配置形式,是我集体十分喜爱的一种配置形式。

传统的 java 应用 get/set 办法进行属性设置。

相似这种:

MqBroker  mqBroker = new MqBroker();mqBroker.setPort(9999);mqBroker.setAddress("127.0.0.1");

fluent 写法能够让咱们写起来代码更加晦涩:

MqBroker.newInstance().port(9999).address("127.0.0.1")

写起来更加丝滑晦涩。

Broker 配置

属性

/** * 端口号 */private int port = BrokerConst.DEFAULT_PORT;/** * 调用治理类 * * @since 1.0.0 */private final IInvokeService invokeService = new InvokeService();/** * 消费者治理 * * @since 0.0.3 */private IBrokerConsumerService registerConsumerService = new LocalBrokerConsumerService();/** * 生产者治理 * * @since 0.0.3 */private IBrokerProducerService registerProducerService = new LocalBrokerProducerService();/** * 长久化类 * * @since 0.0.3 */private IMqBrokerPersist mqBrokerPersist = new LocalMqBrokerPersist();/** * 推送服务 * * @since 0.0.3 */private IBrokerPushService brokerPushService = new BrokerPushService();/** * 获取响应超时工夫 * @since 0.0.3 */private long respTimeoutMills = 5000;/** * 负载平衡 * @since 0.0.7 */private ILoadBalance<ConsumerSubscribeBo> loadBalance = LoadBalances.weightRoundRobbin();/** * 推送最大尝试次数 * @since 0.0.8 */private int pushMaxAttempt = 3;

flent 配置

public MqBroker port(int port) {    this.port = port;    return this;}public MqBroker registerConsumerService(IBrokerConsumerService registerConsumerService) {    this.registerConsumerService = registerConsumerService;    return this;}public MqBroker registerProducerService(IBrokerProducerService registerProducerService) {    this.registerProducerService = registerProducerService;    return this;}public MqBroker mqBrokerPersist(IMqBrokerPersist mqBrokerPersist) {    this.mqBrokerPersist = mqBrokerPersist;    return this;}public MqBroker brokerPushService(IBrokerPushService brokerPushService) {    this.brokerPushService = brokerPushService;    return this;}public MqBroker respTimeoutMills(long respTimeoutMills) {    this.respTimeoutMills = respTimeoutMills;    return this;}public MqBroker loadBalance(ILoadBalance<ConsumerSubscribeBo> loadBalance) {    this.loadBalance = loadBalance;    return this;}

Producer 配置

属性

/** * 分组名称 */private String groupName = ProducerConst.DEFAULT_GROUP_NAME;/** * 中间人地址 */private String brokerAddress  = "127.0.0.1:9999";/** * 获取响应超时工夫 * @since 0.0.2 */private long respTimeoutMills = 5000;/** * 检测 broker 可用性 * @since 0.0.4 */private volatile boolean check = true;/** * 调用治理服务 * @since 0.0.2 */private final IInvokeService invokeService = new InvokeService();/** * 状态治理类 * @since 0.0.5 */private final IStatusManager statusManager = new StatusManager();/** * 生产者-两头服务端服务类 * @since 0.0.5 */private final IProducerBrokerService producerBrokerService = new ProducerBrokerService();/** * 为残余的申请等待时间 * @since 0.0.5 */private long waitMillsForRemainRequest = 60 * 1000;/** * 负载平衡策略 * @since 0.0.7 */private ILoadBalance<RpcChannelFuture> loadBalance = LoadBalances.weightRoundRobbin();/** * 音讯发送最大尝试次数 * @since 0.0.8 */private int maxAttempt = 3;

fluent 配置

public MqProducer groupName(String groupName) {    this.groupName = groupName;    return this;}public MqProducer brokerAddress(String brokerAddress) {    this.brokerAddress = brokerAddress;    return this;}public MqProducer respTimeoutMills(long respTimeoutMills) {    this.respTimeoutMills = respTimeoutMills;    return this;}public MqProducer check(boolean check) {    this.check = check;    return this;}public MqProducer waitMillsForRemainRequest(long waitMillsForRemainRequest) {    this.waitMillsForRemainRequest = waitMillsForRemainRequest;    return this;}public MqProducer loadBalance(ILoadBalance<RpcChannelFuture> loadBalance) {    this.loadBalance = loadBalance;    return this;}public MqProducer maxAttempt(int maxAttempt) {    this.maxAttempt = maxAttempt;    return this;}

Consuemr 配置

属性

/** * 组名称 */private String groupName = ConsumerConst.DEFAULT_GROUP_NAME;/** * 中间人地址 */private String brokerAddress  = "127.0.0.1:9999";/** * 获取响应超时工夫 * @since 0.0.2 */private long respTimeoutMills = 5000;/** * 检测 broker 可用性 * @since 0.0.4 */private volatile boolean check = true;/** * 为残余的申请等待时间 * @since 0.0.5 */private long waitMillsForRemainRequest = 60 * 1000;/** * 调用治理类 * * @since 1.0.0 */private final IInvokeService invokeService = new InvokeService();/** * 音讯监听服务类 * @since 0.0.5 */private final IMqListenerService mqListenerService = new MqListenerService();/** * 状态治理类 * @since 0.0.5 */private final IStatusManager statusManager = new StatusManager();/** * 生产者-两头服务端服务类 * @since 0.0.5 */private final IConsumerBrokerService consumerBrokerService = new ConsumerBrokerService();/** * 负载平衡策略 * @since 0.0.7 */private ILoadBalance<RpcChannelFuture> loadBalance = LoadBalances.weightRoundRobbin();/** * 订阅最大尝试次数 * @since 0.0.8 */private int subscribeMaxAttempt = 3;/** * 勾销订阅最大尝试次数 * @since 0.0.8 */private int unSubscribeMaxAttempt = 3;

fluent 配置

public MqConsumerPush subscribeMaxAttempt(int subscribeMaxAttempt) {    this.subscribeMaxAttempt = subscribeMaxAttempt;    return this;}public MqConsumerPush unSubscribeMaxAttempt(int unSubscribeMaxAttempt) {    this.unSubscribeMaxAttempt = unSubscribeMaxAttempt;    return this;}public MqConsumerPush groupName(String groupName) {    this.groupName = groupName;    return this;}public MqConsumerPush brokerAddress(String brokerAddress) {    this.brokerAddress = brokerAddress;    return this;}public MqConsumerPush respTimeoutMills(long respTimeoutMills) {    this.respTimeoutMills = respTimeoutMills;    return this;}public MqConsumerPush check(boolean check) {    this.check = check;    return this;}public MqConsumerPush waitMillsForRemainRequest(long waitMillsForRemainRequest) {    this.waitMillsForRemainRequest = waitMillsForRemainRequest;    return this;}public MqConsumerPush loadBalance(ILoadBalance<RpcChannelFuture> loadBalance) {    this.loadBalance = loadBalance;    return this;}

小结

这一节的实现非常简单,能够说是没有啥技术难度。

只是为了让使用者更加不便。

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 繁难版本 mq 实现) https://github.com/houbb/mq

拓展浏览

rpc-从零开始实现 rpc https://github.com/houbb/rpc