关于rabbitmq:RabbitMQ-03

60次阅读

共计 2905 个字符,预计需要花费 8 分钟才能阅读完成。

rabbitmq 六种工作模式

1. 简略模式

RabbitMQ 是一个消息中间件,你能够设想它是一个邮局。当你把函件放到邮箱里时,可能确信邮递员会正确地递送你的函件。RabbitMq 就是一个邮箱、一个邮局和一个邮递员。

  • 发送音讯的程序是生产者
  • 队列就代表一个邮箱。尽管音讯会流经 RbbitMQ 和你的应用程序,但音讯只能被存储在队列里。队列存储空间只受服务器内存和磁盘限度,它实质上是一个大的音讯缓冲区。多个生产者能够向同一个队列发送音讯,多个消费者也能够从同一个队列接管音讯.
  • 消费者期待从队列接管音讯

1.pom.xml
增加 slf4j 依赖, 和 rabbitmq 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.tedu</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.8.0-alpha2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.8.0-alpha2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2. 生产者发送音讯

public class Test1 {public static void main(String[] args) throws Exception {
        // 创立连贯工厂, 并设置连贯信息
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);// 可选,5672 是默认端口
        f.setUsername("admin");
        f.setPassword("admin");

        /*
         * 与 rabbitmq 服务器建设连贯,
         * rabbitmq 服务器端应用的是 nio, 会复用 tcp 连贯,
         * 并开拓多个信道与客户端通信
         * 以加重服务器端建设连贯的开销
         */
        Connection c = f.newConnection();
        // 建设信道
        Channel ch = c.createChannel();

        /*
         * 申明队列, 会在 rabbitmq 中创立一个队列
         * 如果曾经创立过该队列,就不能再应用其余参数来创立
         * 
         * 参数含意:
         *   -queue: 队列名称
         *   -durable: 队列长久化,true 示意 RabbitMQ 重启后队列仍存在
         *   -exclusive: 排他,true 示意限度仅以后连贯可用
         *   -autoDelete: 当最初一个消费者断开后, 是否删除队列
         *   -arguments: 其余参数
         */
        ch.queueDeclare("helloworld", false,false,false,null);

        /*
         * 公布音讯
         * 这里把音讯向默认交换机发送.
         * 默认交换机隐含与所有队列绑定,routing key 即为队列名称
         * 
         * 参数含意:
         *     -exchange: 交换机名称, 空串示意默认交换机 "(AMQP default)", 不能用 null 
         *     -routingKey: 对于默认交换机, 路由键就是指标队列名称
         *     -props: 其余参数, 例如头信息
         *     -body: 音讯内容 byte[] 数组
         */
        ch.basicPublish("","helloworld", null,"Hello world!".getBytes());

        System.out.println("音讯已发送");
        c.close();}
}

3. 消费者接管队列

public class Test2 {public static void main(String[] args) throws Exception {
        // 连贯工厂
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setUsername("admin");
        f.setPassword("admin");
        // 建设连贯
        Connection c = f.newConnection();
        // 建设信道
        Channel ch = c.createChannel();
        // 申明队列, 如果该队列曾经创立过, 则不会反复创立
        ch.queueDeclare("helloworld",false,false,false,null);
        System.out.println("期待接收数据");
        
        // 收到音讯后用来解决音讯的回调对象
        DeliverCallback callback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {String msg = new String(message.getBody(), "UTF-8");
                System.out.println("收到:"+msg);
            }
        };
        
        // 消费者勾销时的回调对象
        CancelCallback cancel = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {}};
        
        ch.basicConsume("helloworld", true, callback, cancel);
    }
}

正文完
 0