乐趣区

RabbitMQ-交换器

在 Rabbitmq 中,消息发送给交换器,交换器根据一定的规则把消息发给队列,broker 再把消息发送给消费者,或者发送至主动从队列拉去消息。前面几张讲了队列的相关东西,这篇看看交换器是如何把消息发送给队列的。

交换器

交换器接收消息并将其路由到零个或多个队列,它支持四种交换类型:DirectFanoutTopicHeaders。还还声明了一些属性,其中最重要的是: 交换器的名称、交换器类型、是否持久化、是否自动删除、参数。
是否持久化,决定了 rabbitmq 重启后,交换器是否存在。是否自动删除,决定了当最后一个队列被解除绑定时,交换器是否被删除。

Exchange.DeclareOk exchangeDeclare(String exchange,
        BuiltinExchangeType type,
        boolean durable,
        boolean autoDelete,
        boolean internal,
        Map<String, Object> arguments) throws IOException;

默认的交换器

默认的交换器名称为"",即空字符串。当我们没有定义的时候,看起来就像消息直接发送到队列一样。

Direct

根据消息路由键将消息传递到队列。主要的步骤如下:

  1. 通过路由键 K 把队列绑定到交换器
  2. 当带有路由键 R 的新消息到达交换器时,如果 K = R,交换器将其路由到队列


生产者代码,通过 channel.basicPublish 把带有路由键 (“images.archive”, “images.crop”, “images.resizer”) 的消息发送给交换器 images。

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    // 创建一个 Channel
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        // 定义交换器
        channel.exchangeDeclare(Constant.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String[] routingKeys = {"images.archive", "images.crop", "images.resizer"};
        for (int i = 0; i < routingKeys.length; i++) {
            // 把消息发送到队列中
            channel.basicPublish(Constant.EXCHANGE_NAME, routingKeys[i], null, routingKeys[i].getBytes());
            channel.basicPublish(Constant.EXCHANGE_NAME, routingKeys[i], null, routingKeys[i].getBytes());
        }
        System.out.println("Sent complete");
    }
}

ArchiveRec1 消费者,通过 channel.queueBind 把交换器 images、路由键 images.archive、队列 archive1 绑定一起。

// 定义队列的名称
public final static String QUEUE_NAME = "archive1";

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    // 这边声明队列,是因为可能在发布服务器之前启动消费者,所以在尝试使用队列中的消息之前确保队列存在
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定交换器、路由键、队列
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_NAME, "images.archive");
    System.out.println("Waiting for messages.");
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ArchiveRec1 Received'" + message + "'");
    };
    // 接收消息
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}

ArchiveRec1 消费者,通过 channel.queueBind 把交换器 images、路由键 images.archive、队列 archive2 绑定一起。除了队列名称,其他与上面代码雷同,就不贴了。
CropRec 消费者,通过 channel.queueBind 把交换器 images、路由键 images.crop、队列 cropper 绑定一起。

// 定义队列的名称
public final static String QUEUE_NAME = "cropper";

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    // 这边声明队列,是因为可能在发布服务器之前启动消费者,所以在尝试使用队列中的消息之前确保队列存在
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定交换器、路由键、队列
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_NAME, "images.crop");
    System.out.println("Waiting for messages.");
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("cropper Received'" + message + "'");
    };
    // 接收消息
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}

运行 ArchiveRec1、ArchiveRec2 各一次,运行 CropRec 两次。启动消费者后,再运行生产者 DirectExchange。
运行结果如下,ArchiveRec1 和 ArchiveRec2 各消费 2 条数据,两个 CropRec 一共消费 2 条数据,说明同一个队列两个消费者,他们是轮询消费的。
ArchiveRec1

ArchiveRec2

CropRec

fanout

把消息发送给交换器所有的队列上,忽略的路由键的影响。也就是说,当多个队列跟这个交换器绑定的时候,交换器每收到一条消息,就会群发给这些队列。虽然 Direct 上面的例子中,也可以通过多个队列和路由键交换器绑定,达到部分群发的功能,但是 fanout 对于群发的功能还是更方便些。

照着 direct 的例子改一下,把交换器、队列名称改一下,再把交换类型改为 fanout。
ArchiveRec1 代码如下,ArchiveRec2 雷同。

// 定义队列的名称
public final static String QUEUE_NAME = "fanout_archive1";

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    // 这边声明队列,是因为可能在发布服务器之前启动消费者,所以在尝试使用队列中的消息之前确保队列存在
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定交换器、路由键、队列
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_FANOUT_NAME, "images.archive");
    System.out.println("Waiting for messages.");
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ArchiveRec1 Received'" + message + "'");
    };
    // 接收消息
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}

CropRec 代码如下:

// 定义队列的名称
public final static String QUEUE_NAME = "fanout_cropper";

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    // 这边声明队列,是因为可能在发布服务器之前启动消费者,所以在尝试使用队列中的消息之前确保队列存在
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定交换器、路由键、队列
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_FANOUT_NAME, "images.crop");
    System.out.println("Waiting for messages.");
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("cropper Received'" + message + "'");
    };
    // 接收消息
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}

运行 ArchiveRec1、ArchiveRec2 各一次,运行 CropRec 两次。启动消费者后,再运行生产者 FanoutExchange。
运行结果如下,ArchiveRec1 和 ArchiveRec2 各消费 6 条数据,其他路由键的信息都收到了。两个 CropRec 一共消费 6 条数据,fanout_cropper 队列也收到了 6 条信息,被两个消费者分别消费 3 条。
ArchiveRec1

ArchiveRec2

CropRec

Topic

通过通配符,来消费所要的消息。名称和 activemq 的发布订阅一样,但是特性和 activemq 的通配符差不多。
通配符有 #*#是匹配一个或多个,*是匹配一个。

AllRec 用来接收 images 开头所有的消息,ArchiveRec 用来接收 images.archive 开头所有的消息,ARec 用来接收 images 开头 a 结尾的消息,CropRec 用来接收 images.cropRec 开头所有的消息。
生产者分布向这三个路由键 images.archive.a,images.archive.b,images.crop.a 发送消息

生产者:

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    // 创建一个 Channel
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        // 定义交换器
        channel.exchangeDeclare(Constant.EXCHANGE_TOPIC_NAME, BuiltinExchangeType.TOPIC);
        String[] routingKeys = {"images.archive.a", "images.archive.b", "images.crop.a"};
        for (int i = 0; i < routingKeys.length; i++) {
            // 把消息发送到队列中
            channel.basicPublish(Constant.EXCHANGE_TOPIC_NAME, routingKeys[i], null, routingKeys[i].getBytes());
        }
        System.out.println("Sent complete");
    }
}

AllRec

// 定义队列的名称
public final static String QUEUE_NAME = "all";

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    // 这边声明队列,是因为可能在发布服务器之前启动消费者,所以在尝试使用队列中的消息之前确保队列存在
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定交换器、路由键、队列
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images.#");
    System.out.println("Waiting for messages.");
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("AllRec Received'" + message + "'");
    };
    // 接收消息
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}

ArchiveRec

// 定义队列的名称
public final static String QUEUE_NAME = "archive";

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    // 这边声明队列,是因为可能在发布服务器之前启动消费者,所以在尝试使用队列中的消息之前确保队列存在
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定交换器、路由键、队列
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images.archive.*");
    System.out.println("Waiting for messages.");
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ArchiveRec Received'" + message + "'");
    };
    // 接收消息
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}

ARec

// 定义队列的名称
public final static String QUEUE_NAME = "a";

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    // 这边声明队列,是因为可能在发布服务器之前启动消费者,所以在尝试使用队列中的消息之前确保队列存在
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定交换器、路由键、队列
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images.*.a");
    System.out.println("Waiting for messages.");
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ARec Received'" + message + "'");
    };
    // 接收消息
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}

CropRec

// 定义队列的名称
public final static String QUEUE_NAME = "crop";

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与 rabbitmq 服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个 Channel
    Channel channel = connection.createChannel();
    // 通过 Channel 定义队列
    // 这边声明队列,是因为可能在发布服务器之前启动消费者,所以在尝试使用队列中的消息之前确保队列存在
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定交换器、路由键、队列
    channel.queueBind(QUEUE_NAME, Constant.EXCHANGE_TOPIC_NAME, "images.crop.*");
    System.out.println("Waiting for messages.");
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("CropRec Received'" + message + "'");
    };
    // 接收消息
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}

运行结果如下:
AllRec 收到了全部消息

ArchiveRec 收到了 2 条 images.archive 开头的消息

ARec 收到了 2 条 images 开头 a 结尾的消息

CropRec 收到了 1 条 images.crop 开头的消息

退出移动版