关于rabbitmq:系统学习消息队列RabbitMQ的交换机

1.Exchange的概念

2.Exchange的绑定

3.Exchange的Fanout

4.Exchange的direct

5.Exchange的Topics

1.Exchange的概念

在咱们之前练习mq的根本应用的时候,咱们发送一个信息,看起来是间接把音讯发送给了队列,其实,Rabbitmq消息传递模型的核心思想是:生产者生产的音讯不会发送给队列,而是发送给交换机。交换机工作的内容也非常简略,一方面承受来自生产者的音讯,另一方面将他们推入队列。交换机晓得如何精确地解决音讯,到底是发送给一个或者多个队列,还是丢掉,都由交换机决定。

所以看看之前咱们写的代码,咱们是把音讯发送给了一个无名的交换机

 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

第一个参数就是交换机的名称,空字符串示意无名交换机,音讯能路由到哪个队列中是由routingKey(bindingkey)绑定 key 指定的。

2.Exchange的绑定

因为交换机晓得要把音讯发送给哪个队列,然而咱们须要将队列和路由器进行绑定。

绑定的形式有Fanout(播送),Direct exchange(全匹配),Topics(主题匹配)。

3.Exchange的Fanout
Fanout类型的exchange简略粗犷,就是播送,将受到的所有音讯播送到它晓得的所有队列中。

消费者1:

public class Consumer1 {
    private final static String EXCHANGE_NAME = "test";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //申明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /**
         * 生成一个长期的队列 队列的名称是随机的
         * 当消费者断开和该队列的连贯时 队列主动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        //交换机和队列绑定,因为是fanout类型,所以无需路由键
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        System.out.println("消费者1期待接管音讯.........");

        //音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消费者1控制台接管并打印消息:"+message);
        };
        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("音讯生产被中断");
        };

        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);

    }
}

消费者2:

public class Consumer2 {
    private final static String EXCHANGE_NAME = "test";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //申明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /**
         * 生成一个长期的队列 队列的名称是随机的
         * 当消费者断开和该队列的连贯时 队列主动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        //交换机和队列绑定,因为是fanout类型,所以无需路由键
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        System.out.println("消费者2期待接管音讯.........");

        //音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消费者2控制台接管并打印消息:"+message);
        };
        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("音讯生产被中断");
        };

        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);

    }
}

生产者:

public class Producer {
    private final static String EXCHANGE_NAME = "test";

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个 exchange
             * 1.exchange 的名称
             * 2.exchange 的类型
             */
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            Scanner sc = new Scanner(System.in);
            System.out.println("请输出信息");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println("生产者收回音讯" + message);
            }
        }
    }
}

咱们间断发送十条音讯:

消费者1和消费者2同时受到了音讯:

胜利进行轮询散发。

4.Exchange的direct

咱们在下面演示的是把音讯播送给全副消费者,然而咱们心愿做一些扭转,比方只发给一些特定的队列,有一些队列不发。Fanout这种形式对咱们来说灵活性不是很高。

所以咱们采取direct的形式,绑定key和队列一一对应,这样咱们就能够依据绑定key来抉择要发送的队列。

然而如果k1和k2雷同,就有点像fanout的感觉了,也达到了轮询散发的成果。

消费者1代码:

public class Consumer1 {
    private final static String EXCHANGE_NAME = "test";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //申明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);
        /**
         * 生成一个长期的队列 队列的名称是随机的
         * 当消费者断开和该队列的连贯时 队列主动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        //交换机和队列绑定,因为是fanout类型,所以无需路由键
        channel.queueBind(queueName,EXCHANGE_NAME,"k1");

        System.out.println("消费者1期待接管音讯.........");

        //音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消费者1控制台接管并打印消息:"+message);
        };
        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("音讯生产被中断");
        };

        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);

    }
}

消费者2代码:

public class Consumer2 {
    private final static String EXCHANGE_NAME = "test";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //申明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        /**
         * 生成一个长期的队列 队列的名称是随机的
         * 当消费者断开和该队列的连贯时 队列主动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        //交换机和队列绑定,因为是fanout类型,所以无需路由键
        channel.queueBind(queueName,EXCHANGE_NAME,"k2");

        System.out.println("消费者2期待接管音讯.........");

        //音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消费者2控制台接管并打印消息:"+message);
        };
        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("音讯生产被中断");
        };

        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);

    }
}

生产者代码:

public class Producer {
    private final static String EXCHANGE_NAME = "test";

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个 exchange
             * 1.exchange 的名称
             * 2.exchange 的类型
             */
            channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);
            Scanner sc = new Scanner(System.in);
            System.out.println("请输出信息");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish(EXCHANGE_NAME, "k1", null, message.getBytes("UTF-8"));
                System.out.println("生产者收回音讯" + message);
            }
        }
    }
}

生产者发送音讯:

消费者1有音讯,消费者2无音讯:

5.Exchange的Topics

之前咱们从播送散发的fanout交换机,到了可抉择的direct交换机,从而实现有选择性地对音讯进行发送和生产。

然而,新的需要又产生了,假如当初有一个日志零碎,咱们对多台服务器用队列进行发送日志,进行保留。

假如当初有了一个子分类,info.feign , info.service,咱们只想要保留info.service的音讯,不想保留info.feign的音讯,咱们就必须要应用topic模式。

topic交换机的音讯绑定key不能随便编写,它必须是一个单词列表,用.离开。这些单词是任意单词,比方”info.user , info.store”等等。

在这个规定中,还有两条规定:
“*”(星号)能够代替一个单词
“#”(井号)能够代替多个单词

例如:
Q1绑定的是三个单词的字符串 (.orange.

Q2绑定的是最初一个单词是rabbit的字符串 (..rabbit)

第二个绑定关系是lazy后有任意个单词的字符串(lazy.#)

所以理论会呈现上面这种状况

绑定key 会收到的队列
quick.orange.rabbit Q1,Q2
lazy.orange.elephant Q2
quick.orange.fox Q1
lazy.brown.fox Q2
lazy.pink.rabbit Q2
quick.brown.fox Q2
quick.orange.male.rabbit 无队列接管
lazy.orange.male.rabbit Q2

消费者1:

public class Consumer1 {
    private final static String EXCHANGE_NAME = "test";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //申明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.TOPIC);
        /**
         * 生成一个长期的队列 队列的名称是随机的
         * 当消费者断开和该队列的连贯时 队列主动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        //交换机和队列绑定,因为是fanout类型,所以无需路由键
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");

        System.out.println("消费者1期待接管音讯.........");

        //音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消费者1控制台接管并打印消息:"+message);
        };
        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("音讯生产被中断");
        };

        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);

    }
}

消费者2:

public class Consumer2 {
    private final static String EXCHANGE_NAME = "test";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //申明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        /**
         * 生成一个长期的队列 队列的名称是随机的
         * 当消费者断开和该队列的连贯时 队列主动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        //交换机和队列绑定,因为是fanout类型,所以无需路由键
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");

        System.out.println("消费者2期待接管音讯.........");

        //音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("消费者2控制台接管并打印消息:"+message);
        };
        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("音讯生产被中断");
        };

        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);

    }
}

生产者:

public class Producer {
    private final static String EXCHANGE_NAME = "test";

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个 exchange
             * 1.exchange 的名称
             * 2.exchange 的类型
             */
            channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.TOPIC);
            Scanner sc = new Scanner(System.in);
            System.out.println("请输出信息");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                String routingKey = null;
                //当发送的音讯不同,路右键也不同
                if(message.equals("1")){
                    routingKey = "1.orange.1";
                }else if(message.equals("2")){
                    routingKey = "lazy.1.1";
                }else if(message.equals("3")){
                    routingKey = "lazy.orange.1";
                }else if(message.equals("4")){
                    routingKey = "orange.1";
                }
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println("生产者收回音讯" + message);
            }
        }
    }
}

演示成果:

依照咱们的冀望:

音讯1和2都会被各自的队列收到

音讯3会被两个队列收到

音讯4没有队列收到

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理