关于java:分布式消息中间件1Rabbitmq入门到高可用实战学会了这个还怕被B站面试官看不起

5次阅读

共计 21550 个字符,预计需要花费 54 分钟才能阅读完成。

前言

对于分布式消息中间件,首先要理解两个根底的概念,即什么是分布式系统,什么又是中间件。

分布式系统

“A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messasges.”——《Distributed Systems Concepts and Design》

从下面这个解释能够失去分布式系统的两个特点:组件散布在网络计算机上组件之间通过音讯来协调口头

中间件

Middleware is computer software that provides services to software applications beyond those available from the operating system. It can be described as “software glue”. Middleware makes it easier for software developers to implement communication and input/output, so they can focus on the specific purpose of their application.——维基百科

中间件被形容为为应用程序提供操作系统所提供的服务之外的服务,简化应用程序的通信、输入输出的开发,使他们专一于本人的业务逻辑。从维基百科上对中间件的解释感觉有点绕,其实能够从“空间”的角度去了解中间件,即中间件是处于“中间层”的组件,是下层的应用程序和底层的服务之间的桥梁(比方 DB 中间件的下层是应用程序,底层是 DB 服务),也是利用与利用之间的桥梁(比方分布式服务组件)。

分布式消息中间件

“Message-oriented middleware (MOM) is software or hardware infrastructure supporting sending and receiving messages between distributed systems.”——维基百科

维基百科给出的消息中间件的定义是反对在分布式系统中发送和承受音讯的硬件或软件基础设施(对咱们这里探讨的范畴来说必定就是软件了)。

那么分布式消息中间件其实就是指消息中间件自身也是一个分布式系统。

消息中间件能做什么?

任何中间件必然都是要去解决特定畛域的某个问题,消息中间件解决的就是分布式系统之间消息传递的问题。消息传递是分布式系统必然要面对的一个问题。

简略概括一下消息中间件的利用场景大抵如下:

  • 业务解耦:交易系统不须要晓得短信告诉服务的存在,只须要公布音讯
  • 削峰填谷:比方上游零碎的吞吐能力高于上游零碎,在流量洪峰时可能会冲垮上游零碎,消息中间件能够在峰值时沉积音讯,而在峰值过来后上游零碎缓缓生产音讯解决流量洪峰的问题
  • 事件驱动:零碎与零碎之间能够通过消息传递的模式驱动业务,以流式的模型解决

分布式消息中间件长什么样?

一个形象的对分布式消息中间件的认知大略是这样:

  • 有一个 SDK,提供给业务零碎发送、生产音讯的接口
  • 有一批 Server 节点用于承受和存储音讯,并在适合的时候发送给上游的零碎进行生产

别嫌啰嗦,大抵介绍一下,不便上面的了解,本系列次要讲三个罕用的消息中间件,也就是 RabbitmqRocketMqKafka,当然篇幅所限必定讲不完,只能挑比拟重要的货色写,但也能让不会的同学初步把握怎么去应用。

完整版的消息中间件学习材料和我集体整顿的笔记

能够间接点击蓝字支付

好了,话不多说,发车喽!


RabbitMQ 除了像兔子一样跑的很快以外,还有这些特点:

  • 开源、性能优良,稳定性保障
  • 提供可靠性音讯投递模式、返回模式
  • 与 Spring AMQP 完满整合,API 丰盛
  • 集群模式丰盛,表达式配置,HA 模式,镜像队列模型
  • 保证数据不失落的前提做到高可靠性、可用性

一、Rabbitmq 音讯队列利用

1、RabbitMQ 介绍

RabbitMQ 是一款基于 AMQP(音讯队列协定),由 Erlang 开发的开源音讯队列组件。是一款优良的音讯队列组件,他由两局部组成:服务端和客户端,客户端反对多种语言的驱动,如:.Net、JAVA、Erlang 等。RabbitMQ 与其余音讯队列组件性能比拟,在此不作介绍,网上有大把的材料。


2、RabbitMQ 原理简介

RabbitMQ 中间件分为服务端(RabbitMQ Server)和客户端(RabbitMQ Client),服务端能够了解为是一个音讯的代理消费者,客户端又分为音讯生产者(Producer)和音讯消费者(Consumer)。

2.1 音讯生产者(Producer):次要生产音讯并将音讯基于 TCP 协定,通过建设 Connection 和 Channel,将音讯传输给 RabbitMQ Server,对于 Producer 而言根本就实现了工作。

2.2 服务端(RabbitMQ Server):次要负责解决音讯路由、散发、入队列、缓存和入列。次要由三局部组成:Exchange、RoutingKey、Queue。

(1)Exchange:用于接管音讯生产者发送的音讯,有三种类型的 exchange:direct, fanout,topic,不同类型实现了不同的路由算法;

A. direct exchange:将与 routing key 比配的音讯,间接推入绝对应的队列,创立队列时,默认就创立同名的 routing key。

B. fanout exchange:是一种播送模式,疏忽 routingkey 的规定。

C. topic exchange:利用主题,依据 key 进行模式匹配路由,例如:若为 abc则推入到所有 abc绝对应的 queue;若为 abc.# 则推入到 abc.xx.one ,abc.yy.two 对应的 queue。

(2)RoutingKey:是 RabbitMQ 实现路由散发到各个队列的规定,并联合 Binging 提供于 Exchange 应用将音讯推送入队列;

(3)Queue:是音讯队列,能够依据须要定义多个队列,设置队列的属性,比方:音讯移除、音讯缓存、回调机制等设置,实现与 Consumer 通信;

2.3 音讯消费者(Consumer):次要负责生产 Queue 的音讯,同样基于 TCP 协定,通过建设 Connection 和 Channel 与 Queue 传输音讯,一个音讯能够给多个 Consumer 生产;

2.4 要害名词阐明:Connection、Channel、Binging 等;

(1)Connection:是建设客户端与服务端的连贯。

(2)Channel:是基于 Connection 之上建设通信通道,因为每次 Connection 建设 TCP 协定通信开销及性能耗费较大,所以一次建设 Connection 后,应用多个 Channel 通道通信缩小开销和进步性能。

(3)Binging:是一个捆绑定义,将 exchange 和 queue 捆绑,定义 routingkey 相干策略。


3、RabbitMQ 装置部署

  以上对 RabbitMQ 简介,接下来咱们通过理论搭建音讯队列服务实际。RabbitMQ 服务端能运行于 Window、Linux 和 Mac 平台,客户端也反对多种技术的实现。本次咱们将在 Linux 之 CentOS7 平台搭建。

3.1 装置 Erlang 运行环境

因为 RabbitMQ 应用 Erlang 技术开发,所以须要先装置 Erlang 运行环境后,能力装置音讯队列服务。

(1)配置零碎能失常拜访公网,设置默认网关

`route add ``default` `gw 192.168.1.1`

(2)装置 erlang

`su -c ``'rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-5.noarch.rpm'`

`sudo yum install erlang`

(3)查看 erlang 是否装置胜利

`erl`

(4)装置胜利

3.2 装置 RabbitMQ 服务端

(1)下载安装包

`wget http:``//www.rabbitmq.com/releases/rabbitmq-server/v3.6.0/rabbitmq-server-3.6.0-1.noarch.rpm`

(2)装置和配置 RabbitMQ 服务端,3.6.0 版本:

`rpm --import https:``//www.rabbitmq.com/rabbitmq-signing-key-public.asc`
`yum install rabbitmq-server-3.6.0-1.noarch.rpm`

(3)启用 web 治理插件

`rabbitmq-plugins enable rabbitmq_management`

(4)启动 RabbitMQ

`chkconfig rabbitmq-server ``on`
`/sbin/service rabbitmq-server start`

(5)防火墙开明端口

`# firewall-cmd --permanent --zone=public --add-port=5672/tcp`
`# firewall-cmd --permanent --zone=public --add-port=15672/tcp`
`# firewall-cmd --reload`

(6)rabbitmq 默认会创立 guest 账号,只能用于 localhost 登录页面管理员,本机拜访地址:

http://localhost:15672/

`rabbitmqctl add_user test test`
`rabbitmqctl set_user_tags test administrator<br>rabbitmqctl set_permissions -p / test ``".*"` `".*"` `".*"`

RabbitMQ 管理员页面。


4、RabbitMQ 利用

  本章节形容,web 利用生产的日志,通过 rabbitmq 传输,而后日志服务接管音讯队列的音讯。

本零碎采纳官网的 Client,通过 nuget 援用。

  4.1 Web 利用生产业务日志

[HttpPost]
        public ActionResult Create()
        {this.HttpContext.Session["mysession"] = DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss");
            var txt = Request.Form["txtSite"].ToString();
            RabbitMQHelper helper = new RabbitMQHelper();
            helper.SendMsg(txt + ", 操作日志,工夫:" + DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));
 
            return RedirectToAction("Index");
        }

`}`

页面效果图。

4.2 日志服务接管日志音讯

  基于 window form 开发一个日志解决服务,并将接管的音讯打印进去。

private void btnReceive_Click(object sender, EventArgs e)
        {
            isConnected = true;
            using (var channel = connection.CreateModel())
            {channel.QueueDeclare("MyLog", false, false, false, null);
 
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume("MyLog", true, consumer);
 
                while (isConnected)
                {var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
 
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    txtMsg.Text += message + "\r\n";
 
                }
            }
        }

  4.3 RabbitMQ 页面监控状况

  RabbitMQ 自带页面监控工具,通过此工具能够监控 MQ 的状况:

完整版的消息中间件学习材料和我集体整顿的笔记 能够间接点击蓝字支付

二、Rabbitmq 音讯确认机制

1、生产端 Confirm 音讯确认机制

音讯的确认,是指生产者投递音讯后,如果 Broker 收到音讯,则会给咱们生产者一个应答。生产者进行接管应答,用来确定这条音讯是否失常的发送到 Broker,这种形式也是音讯的可靠性投递的外围保障!

Confirm 确认机制流程图

2、如何实现 Confirm 确认音讯?

  • 第一步: 在 channel 上开启确认模式: channel.confirmSelect()
  • 第二步: 在 channel 上增加监听: channel.addConfirmListener(ConfirmListener listener);, 监听胜利和失败的返回后果,依据具体的后果对音讯进行从新发送、或记录日志等后续解决!
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class ConfirmProducer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "item.update";

        // 指定音讯的投递模式:confirm 确认模式
        channel.confirmSelect();

        // 发送
        final long start = System.currentTimeMillis();
        for (int i = 0; i < 5 ; i++) {
            String msg = "this is confirm msg";
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
            System.out.println("Send message :" + msg);
        }

        // 增加一个确认监听,这里就不敞开连贯了,为了能保障能收到监听音讯
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 返回胜利的回调函数
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("succuss ack");
                System.out.println(multiple);
                System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
            }
            /**
             * 返回失败的回调函数
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.printf("defeat ack");
                System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
            }
        });
    }
}
`import com.rabbitmq.client.*;
import java.io.IOException;

public class ConfirmConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String queueName = "test_confirm_queue";
        String routingKey = "item.#";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        // 个别不必代码绑定,在治理界面手动绑定
        channel.queueBind(queueName, exchangeName, routingKey);

        // 创立消费者并接管音讯
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {String message = new String(body, "UTF-8");
                System.out.println("[x] Received'" + message + "'");
            }
        };

        // 设置 Channel 消费者绑定队列
        channel.basicConsume(queueName, true, consumer);

    }
}

咱们此处只关注生产端输入音讯

Send message : this is confirm msg 
Send message : this is confirm msg 
Send message : this is confirm msg 
Send message : this is confirm msg 
Send message : this is confirm msg 
succuss ack
true
耗时:3ms
succuss ack
true
耗时:4ms

注意事项

  • 咱们采纳的是异步 confirm 模式:提供一个回调办法,服务端 confirm 了一条或者多条音讯后 Client 端会回调这个办法。除此之外还有单条同步 confirm 模式、批量同步 confirm 模式,因为事实场景中很少应用咱们在此不做介绍,如有趣味间接参考官网文档。
  • 咱们运行生产端会发现每次运行后果都不一样, 会有多种状况呈现,因为 Broker 会进行优化,有时会批量一次性 confirm,有时会离开几条 confirm。

    
     `succuss ack
      true
      耗时:3ms
      succuss ack
      false
      耗时:4ms
    
      或者
      succuss ack
      true
      耗时:3ms` 

3、Return 音讯机制

  • Return Listener 用于解决一 - 些不可路 由的音讯!
  • 音讯生产者,通过指定一个 Exchange 和 Routingkey,把音讯送达到某一个队列中去,而后咱们的消费者监听队列,进行生产解决操作!
  • 然而在某些状况下,如果咱们在发送音讯的时候,以后的 exchange 不存在或者指定的路由 key 路由不到,这个时候如果咱们须要监听这种不可达的音讯,就要应用 Return Listener !
  • 在根底 API 中有一个要害的配置项:Mandatory:如果为 true,则监听器会接管到路由不可达的音讯,而后进行后续解决,如果为 false,那么 broker 端主动删除该音讯!

Return 音讯机制流程图

Return 音讯示例

  • 首先咱们须要发送三条音讯,并且成心将第 0 条音讯的 routing Key设置为谬误的,让他无奈失常路由到生产端。
  • mandatory 设置为 true 路由不可达的音讯会被监听到,不会被主动删除. 即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
  • 最初增加监听即可监听到不可路由到生产端的音讯channel.addReturnListener(ReturnListener r))

    `import com.rabbitmq.client.*;
    import java.io.IOException;
    
    public class ReturnListeningProducer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        String exchangeName = "test_return_exchange";
        String routingKey = "item.update";
        String errRoutingKey = "error.update";
    
        // 指定音讯的投递模式:confirm 确认模式
        channel.confirmSelect();
    
        // 发送
        for (int i = 0; i < 3 ; i++) {
            String msg = "this is return——listening msg";
            //@param mandatory 设置为 true 路由不可达的音讯会被监听到,不会被主动删除
            if (i == 0) {channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
            } else {channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
            }
            System.out.println("Send message :" + msg);
        }
    
        // 增加一个确认监听,这里就不敞开连贯了,为了能保障能收到监听音讯
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 返回胜利的回调函数
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("succuss ack");
            }
            /**
             * 返回失败的回调函数
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.printf("defeat ack");
            }
        });
    
        // 增加一个 return 监听
        channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("return relyCode:" + replyCode);
                System.out.println("return replyText:" + replyText);
                System.out.println("return exchange:" + exchange);
                System.out.println("return routingKey:" + routingKey);
                System.out.println("return properties:" + properties);
                System.out.println("return body:" + new String(body));
            }
        });
    
    }
    }
`import com.rabbitmq.client.*;
import java.io.IOException;

public class ReturnListeningConsumer {public static void main(String[] args) throws Exception {
        //1\. 创立一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        //2\. 通过连贯工厂来创立连贯
        Connection connection = factory.newConnection();

        //3\. 通过 Connection 来创立 Channel
        Channel channel = connection.createChannel();

        //4\. 申明
        String exchangeName = "test_return_exchange";
        String queueName = "test_return_queue";
        String routingKey = "item.#";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        // 个别不必代码绑定,在治理界面手动绑定
        channel.queueBind(queueName, exchangeName, routingKey);

        //5\. 创立消费者并接管音讯
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {String message = new String(body, "UTF-8");
                System.out.println("[x] Received'" + message + "'");
            }
        };

        //6\. 设置 Channel 消费者绑定队列
        channel.basicConsume(queueName, true, consumer);
    }
}

咱们只关注生产端后果,生产端只收到两条音讯。

`Send message : this is return——listening msg 
Send message : this is return——listening msg 
Send message : this is return——listening msg 
return relyCode: 312
return replyText: NO_ROUTE
return exchange: test_return_exchange
return routingKey: error.update
return properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
return body: this is return——listening msg 
succuss ack
succuss ack
succuss ack

4、生产端 Ack 和 Nack 机制

生产端进行生产的时候,如果因为业务异样咱们能够进行日志的记录,而后进行弥补! 如果因为服务器宕机等重大问题,那咱们就须要手工进行 ACK 保障生产端生产胜利! 生产端重回队列是为了对没有解决胜利的音讯,把音讯从新会递给 Broker! 个别咱们在理论利用中,都会敞开重回队列,也就是设置为 False。

参考 api

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
void basicAck(long deliveryTag, boolean multiple) throws IOException;

如何设置手动 Ack、Nack 以及重回队列

  • 首先咱们发送五条音讯,将每条音讯对应的循环下标 i 放入音讯的 properties 中作为标记,以便于咱们在前面的回调办法中辨认。
  • 其次,咱们将生产端的 ·channel.basicConsume(queueName, false, consumer); 中的 autoAck属性设置为 false,如果设置为 true 的话 将会失常输入五条音讯。
  • 咱们通过 Thread.sleep(2000)来延时一秒,用以看清后果。咱们获取到 properties 中的 num 之后,通过 channel.basicNack(envelope.getDeliveryTag(), false, true); 将 num为 0 的音讯设置为 nack,即生产失败,并且将 requeue属性设置为true,即生产失败的音讯重回队列末端。

`import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;

public class AckAndNackProducer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String routingKey = "item.update";

        String msg = "this is ack msg";
        for (int i = 0; i < 5; i++) {Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num" ,i);

            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .headers(headers)
                    .build();

            String tem = msg + ":" + i;

            channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes());
            System.out.println("Send message :" + msg);
        }

        channel.close();
        connection.close();}
}
import com.rabbitmq.client.*;
import java.io.IOException;

public class AckAndNackConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();

        final Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "item.#";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        // 个别不必代码绑定,在治理界面手动绑定
        channel.queueBind(queueName, exchangeName, routingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {String message = new String(body, "UTF-8");
                System.out.println("[x] Received'" + message + "'");

                try {Thread.sleep(2000);
                } catch (InterruptedException e) {e.printStackTrace();
                }

                if ((Integer) properties.getHeaders().get("num") == 0) {channel.basicNack(envelope.getDeliveryTag(), false, true);
                } else {channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        //6\. 设置 Channel 消费者绑定队列
        channel.basicConsume(queueName, false, consumer);

    }
}

咱们此处只关怀生产端输入,能够看到第 0 条生产失败从新回到队列尾部生产。

 [x] Received 'this is ack msg:1'
 [x] Received 'this is ack msg:2'
 [x] Received 'this is ack msg:3'
 [x] Received 'this is ack msg:4'
 [x] Received 'this is ack msg:0'
 [x] Received 'this is ack msg:0'
 [x] Received 'this is ack msg:0'
 [x] Received 'this is ack msg:0'
 [x] Received 'this is ack msg:0'

三、Rabbitmq 镜像队列

1、镜像队列的设置

镜像队列的配置通过增加 policy 实现,policy 增加的命令为:

rabbitmqctl  set_policy  [-p Vhost]  Name  Pattern  Definition  [Priority]

-p Vhost:  可选参数,针对指定 vhost 下的 queue 进行设置

Name:  policy 的名称

Pattern:  queue 的匹配模式(正则表达式)

Definition:  镜像定义,包含三个局部 ha-mode,ha-params,ha-sync-mode

 ha-mode:  指明镜像队列的模式,有效值为 all/exactly/nodes

  • all 示意在集群所有的节点上进行镜像

    - exactly 示意在指定个数的节点上进行镜像,节点的个数由 ha-params 指定
    
  • nodes 示意在指定的节点上进行镜像,节点名称通过 ha-params 指定

ha-params: ha-mode 模式须要用到的参数

 ha-sync-mode:  镜像队列中音讯的同步形式,有效值为 automatic,manually

Priority:  可选参数,policy 的优先级

例如,对队列名称以 hello 结尾的所有队列进行镜像,并在集群的两个节点上实现镜像,policy 的设置命令为:

rabbitmqctl  set_policy  hello-ha  "^hello"  '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

2、镜像队列的大略实现

2.1 整体介绍

通常队列由两局部组成:一部分是 amqqueue_process,负责协定相干的音讯解决,即接管生产者公布的音讯、向消费者投递音讯、解决音讯 confirm、acknowledge 等等;另一部分是 backing_queue,它提供了相干的接口供 amqqueue_process 调用,实现音讯的存储以及可能的长久化工作等。

镜像队列同样由这两局部组成,

  • amqqueue_process 仍旧进行协定相干的音讯解决
  • backing_queue 则是由 master 节点和 slave 节点组成的一个非凡的 backing_queue
  • master 节点和 slave 节点都由一组过程组成,一个负责音讯播送的 gm,一个负责对 gm 收到的播送音讯进行回调解决。
  • master 节点上回调解决是 coordinator
  • slave 节点上则是 mirror_queue_slave。mirror_queue_slave 中蕴含了一般的 backing_queue 进行音讯的存储
  • master 节点中 backing_queue 蕴含在 mirror_queue_master 中由 amqqueue_process 进行调用。

留神:音讯的公布与生产都是通过 master 节点实现。master 节点对音讯进行解决的同时将音讯的解决动作通过 gm 播送给所有的 slave 节点,slave 节点的 gm 收到音讯后,通过回调交由 mirror_queue_slave 进行理论的解决。

2.2 gm(Guaranteed Multicast)

传统的主从复制形式:由 master 节点负责向所有 slave 节点发送须要复制的音讯,在复制过程中,如果有 slave 节点出现异常,master 节点须要作出相应的解决;如果是 master 节点自身呈现问题,那么 slave 节点间可能会进行通信决定本次复制是否持续。当然为了解决各种异常情况,整个过程中的日志记录是免不了的。

然而 rabbitmq 中并没有采纳这种形式,而是将所有的节点造成一个循环链表,每个节点都会监控位于本人左右两边的节点,当有节点新增时,相邻的节点保障以后播送的音讯会复制到新的节点上;当有节点生效时,相邻的节点会接管保障本次播送的音讯会复制到所有节点。

在 master 节点和 slave 节点上的这些 gm 造成一个 group,group 的信息会记录在 mnesia 中。不同的镜像队列造成不同的 group。

音讯从 master 节点对应的 gm 收回后,顺着链表顺次传送到所有节点,因为所有节点组成一个循环链表,master 节点对应的 gm 最终会收到本人发送的音讯,这个时候 master 节点就晓得音讯曾经复制到所有 slave 节点了。

2.3 重要的表构造

rabbit_queue 表记录队列的相干信息:

-record(amqqueue,
{
name,             %% 队列的名称
durable,          %% 标识队列是否长久化
auto_delete,      %% 标识队列是否主动删除
exclusive_owner,  %% 标识是否独占模式
arguments,        %% 队列创立时的参数
pid,              %%amqqueue_process 过程 PID
slave_pids,       %%mirror_queue_slave 过程 PID 汇合
sync_slave_pids,  %% 已同步的 slave 过程 PID 汇合
policy,           %% 与队列无关的 policy
                  %% 通过 set_policy 设置, 没有则为 undefined
gm_pids,          %%{gm,mirror_queue_coordinator},{gm,mirror_queue_slave}过程 PID 汇合
decorator         %%
}).

留神:slave_pids 的存储是依照 slave 退出的工夫来排序的,以便 master 节点生效时,晋升 ” 资格最老 ” 的 slave 节点为新的 master。

gm_group 表记录 gm 造成的 group 的相干信息:

-record(gm_group,
{
name,     %%group 的名称, 与 queue 的名称统一
version,  %%group 的版本号, 新增节点 / 节点生效时会递增
members,  %%group 的成员列表, 依照节点组成的链表程序进行排序
}).

3、镜像队列的一些细节

3.1 新增节点

slave 节点先从 gm_group 中获取对应 group 的所有成员信息,而后随机抉择一个节点并向这个节点发送申请,这个节点收到申请后,更新 gm_group 对应的信息,同时告诉左右节点更新街坊信息(调整对左右节点的监控)及以后正在播送的音讯,而后回复告诉申请节点胜利退出 group。申请退出 group 的节点收到回复后再更新 rabbit_queue 中的相干信息,并依据须要进行音讯的同步。


3.2 音讯的播送

音讯从 master 节点收回,顺着节点链表发送。在这期间,所有的 slave 节点都会对音讯进行缓存,当 master 节点收到本人发送的音讯后,会再次播送 ack 音讯,同样 ack 音讯会顺着节点链表通过所有的 slave 节点,其作用是告诉 slave 节点能够革除缓存的音讯,当 ack 音讯回到 master 节点时对应播送音讯的生命周期完结。

下图为一个简略的示意图,A 节点为 master 节点,播送一条内容为 ”test” 的音讯。”1″ 示意音讯为播送的第一条音讯;”id=A” 示意音讯的发送者为节点 A。左边是 slave 节点记录的状态信息。

为什么所有的节点都须要缓存一份公布的音讯呢?

master 公布的音讯是顺次通过所有 slave 节点,在这期间的任何时刻,有可能有节点生效,那么相邻的节点可能须要从新发送给新的节点。例如,A->B->C->D->A 造成的循环链表,A 为 master 节点,播送音讯发送给节点 B,B 再发送给 C,如果节点 C 收到 B 发送的音讯还未发送给 D 时异样完结了,那么节点 B 感知后节点 C 生效后须要从新将音讯发送给 D。同样,如果 B 节点将音讯发送给 C 后,B,C 节点中新增了 E 节点,那么 B 节点须要再将音讯发送给新增的 E 节点。

gm 的状态记录:

-record(state,
{
self,              %%gm 自身的 ID
left,              %% 该节点右边的节点
right,             %% 该节点左边的节点
group_name,        %%group 名称 与队列名统一
module,            %% 回调模块 rabbit_mirror_queue_slave 或者
                   %%rabbit_mirror_queue_coordinator
view,              %%group 成员列表视图信息
                   %% 记录了成员的 ID 及每个成员的左右街坊节点
pub_count,         %% 以后已公布的音讯计数
members_state,     %%group 成员状态列表 记录了播送状态:[#member{}]
callback_args,     %% 回调函数的参数信息
                   %%rabbit_mirror_queue_slave/rabbit_mirror_queue_coordinator 过程 PID
confirms,          %%confirm 列表
broadcast_buffer,  %% 缓存待播送的音讯
broadcast_timer,   %% 播送音讯定时器
txn_executor       
}).
 
-record(member,
{
pending_ack,  %% 待确认的音讯, 也就是已公布的音讯缓存的中央
last_pub,     %% 最初一次公布的音讯计数
last_ack      %% 最初一次确认的音讯计数
}).

3.3 节点的生效

当 slave 节点生效时,仅仅是相邻节点感知,而后从新调整街坊节点信息、更新 rabbit_queue、gm_group 的记录等。如果是 master 节点生效,” 资格最老 ” 的 slave 节点被晋升为 master 节点,slave 节点会创立出新的 coordinator,并告知 gm 批改回调解决为 coordinator,原来的 mirror_queue_slave 充当 amqqueue_process 解决生产者公布的音讯,向消费者投递音讯等。

下面提到如果是 slave 节点生效,只有相邻的节点能感知到,那么 master 节点生效是不是也是只有相邻的节点能感知到?如果是这样的话,如果相邻的节点不是 ” 资格最老 ” 的节点,怎么告诉 ” 资格最老 ” 的节点晋升为新的 master 节点呢?

实际上,所有的 slave 节点在退出 group 时,mirror_queue_slave 过程会对 master 节点的 amqqueue_process 过程(也可能是 mirror_queue_slave 过程)进行监控,如果 master 节点生效的话,mirror_queue_slave 会感知,而后再通过 gm 进行播送,这样所有的节点最终都会晓得 master 节点生效。当然,只有 ” 资格最老 ” 的节点会晋升本人为新的 master。

另外,在 slave 晋升为 master 时,mirror_queue_slave外部来了一次 ” 移花接木 ”,即本来须要回调 mirror_queue_slavehandle_call/handle_info/handle_cast等接口进行解决的音讯,全副改为调用 amqqueue_processhandle_call/handle_info/handle_cast等接口,从而能够解释下面说的,mirror_queue_slave过程充当了 amqqueue_process 实现协定相干的音讯的解决。

rabbit_mirror_queue_slave.erl
 
handle_call({gm_deaths,LiveGMPids},From,
            State = #state{q = Q = #amqqueue{name=QName,pid=MPid}})->
    Self = self(),
    case rabbit_mirror_queue_misc:remove_from_queue(QName,
                                                    Self,
                                                    LiveGMPids) of
        {ok,Pid,DeadPids} ->
            case Pid of
                MPid ->
                    %% master hasn't changed
                    gen_server2:reply(From, ok),
                    noreply(State);
                Self ->
                    %% we've become master
                    QueueState = promote_me(From,State),
                    {become,
                     %% 改由 rabbit_amqqueue_process 模块解决音讯
                     rabbit_amqqueue_process,
                     QueueState, hibernate};
                ...
 
gen_server2.erl
 
handle_common_reply(Reply,Msg,GS2State = #gs2_state{name=Name,
                                                    debug=Debug})->
    case Reply of
        ...
        {become, Mod, NState, Time1} ->
            Debug1=common_become(Name,Mod,NState,Debug),
            loop(find_prioritisers(
                GS2State#gs2_state{mod=Mod,
                                   state=NState,
                                   time=Time1,
                                   debug=Debug1}));
        ...
 
handle_msg({'gen_call',From,Msg},
           GS2State=#gs2_state{mod=Mod,
                               state=State,
                               name=Name,
                               debug=Debug}) ->
    case catch Mod:handle_call(Msg, From, State) of
        ...
 
handle_msg(Msg,GS2State=#gs2_state{mod=Mod,state=State})->
    Reply = (catch dispatch(Msg,Mod,State)),
    handle_common_reply(Reply, Msg, GS2State).
 
dispatch({'$gen_cast',Msg},Mod,State)->
    Mod:handle_cast(Msg, State);
dispatch(Info, Mod, State)->
    Mod:handle_info(Info,State).

4、音讯的同步

配置镜像队列的时候有个 ha-sync-mode 属性,这个有什么用呢?

新节点退出到 group 后,最多能从右边节点获取到以后正在播送的音讯内容,退出 group 之前曾经播送的音讯则无奈获取到。如果此时 master 节点可怜生效,而新节点有恰好成为了新的 master,那么退出 group 之前曾经播送的音讯则会全副失落。

留神:这里的音讯具体是指新节点退出前曾经公布并复制到所有 slave 节点的音讯,并且这些音讯还未被消费者生产或者未被消费者确认。如果新节点退出前,所有播送的音讯被消费者生产并确认了,master 节点删除音讯的同时会告诉 slave 节点实现相应动作。这种状况等同于新节点退出前没有公布任何音讯。

防止这种问题的解决办法就是对新的 slave 节点进行音讯同步。当 ha-sync-mode 配置为主动同步(automatic)时,新节点退出 group 时会主动进行音讯的同步;如果配置为 manually 则须要手动操作实现同步


就先写到这把,原本是想一篇文把三个中间件都写了的,没想到人不知; 鬼不觉写了这么多我都感觉 Rabbitmq 还有很多货色还没写到,前面会再写两篇专门讲一下 RocketMq 和 kafka,感兴趣的敌人能够给我点个关注。

完整版的消息中间件学习材料和我集体整顿的笔记 间接点击蓝字支付

如果能够点个赞就更好了,你说呢


end

正文完
 0