1、RabbitMQ

  1. 音讯队列解决了什么问题

    • 异步解决
    • 利用解耦
    • 流量削锋
    • 日志解决

运行rabbitmq镜像

# docker run --name rabbitmq -tid -p 5672:5672 -p 15672:15672 -p 25672:25672 rabbitmq

批改rabbitmq设置

# docker exec -it rabbitmq /bin/bash
新增用户
# rabbitmqctl add_user [user_name] [pwd]
查看用户
# rabbitmqctl list_users
Setting permissions for user "[user_name]" in vhost "/" ...
# rabbitmqctl set_permissions -p "/" [user_name] ".*" ".*" ".*"# rabbitmqctl list_permissions -p /
将[user_name]用户设置为administrator角色
# rabbitmqctl set_user_tags asdf administrator
删除guest用户
# rabbitmqctl delete_user guest

开启web界面

# rabbitmq-plugins enable rabbitmq_management

web拜访

# http://IP:15672
  1. Java操作RabbitMQ

    1. simple 简略队列
    2. work queues 工作队列 偏心散发 轮询散发
    3. publish/subscribe 公布订阅
    4. routing 路由抉择 通配符模式
    5. Topics 主题
    6. 手动和主动确认音讯
    7. 队列的长久化和非长久化
    8. rabbitMQ的提早队列

    依赖

    <dependencies>    <!-- 引入队列依赖 -->    <dependency>        <groupId>com.rabbitmq</groupId>        <artifactId>amqp-client</artifactId>        <version>4.0.2</version>    </dependency><dependency>    <groupId>org.slf4j</groupId>    <artifactId>slf4j-api</artifactId>    <version>1.7.10</version></dependency><dependency>    <groupId>org.slf4j</groupId>    <artifactId>slf4j-log4j12</artifactId>    <version>1.7.5</version></dependency><dependency>    <groupId>log4j</groupId>    <artifactId>log4j</artifactId>    <version>1.2.17</version></dependency><dependency>    <groupId>junit</groupId>    <artifactId>junit</artifactId>    <version>4.11</version></dependency>    </dependencies>

2、简略队列

2.2、定义连贯MQ的工具

public class connectionUtil {    public static Connection getConnection() throws IOException, TimeoutException {        //定义链接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置服务地址[运行rabbitMQ的地址]        factory.setHost("192.168.168.130");        //AMQ的端口号        factory.setPort(5672);        //vHost        factory.setVirtualHost("lgz");        factory.setUsername("lgz");        factory.setPassword("pwd123456");        return   factory.newConnection();    }}

2.3、生产者发送音讯

//测试发送信息public class ProducerSend {    private static final String QUEUE_NAME="test_simple_queue";    public static void main(String[] args) throws IOException, TimeoutException {        System.out.println("cs");        //获取链接        Connection connection = connectionUtil.getConnection();        //从链接中获取通道        Channel channel = connection.createChannel();        //队列申明        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        String msg="hello simple";         channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());        System.out.println("test success");         channel.close();         connection.close();    }}

2.4、消费者获取信息

public class comsumerGain {    private static final String QUE_NAME="test_simple_queue";    public static void main(String[] args) throws IOException, TimeoutException {        //获取链接        Connection connection = connectionUtil.getConnection();        //创立通道        Channel channel = connection.createChannel();        //定义消费者        DefaultConsumer  consumer = new DefaultConsumer(channel) {            //获取达到的音讯            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                String msg = new String(body, "utf-8");                System.out.println(msg);            }        };        //监听队列        channel.basicConsume(QUE_NAME,true,consumer);    }}

2.5、简略队列的毛病

  1. 耦合性高:生产者一 一对应消费者

3、工作队列

3.1、工作队列模型

3.2、生产者演示

public class WorkQueue {    public static  final  String QUEUE_NAME="work_queue";    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        //创立连贯        Connection connection = connectionUtil.getConnection();        //获取channel链接        Channel channel = connection.createChannel();        //        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        for (int i = 0; i <50 ; i++) {            String msg="No"+i;            System.out.println(msg);            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());            Thread.sleep(i*20);        }        channel.close();        connection.close();    }}

3.3、消费者1

public class WorkRecv {    private final static String QUEUE_NAME="work_queue";    public static void main(String[] args) throws IOException, TimeoutException {        //获取链接        Connection connection = connectionUtil.getConnection();        //获取channel        Channel channel = connection.createChannel();        //申明队列        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        //定义一个消费者        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            //音讯触发            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                String msg=new String(body,"utf-8");                System.out.println("receive"+msg);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    System.out.println("完结");                }            }        };        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);    }

3.4、消费者2

public class WorkRecv {    private final static String QUEUE_NAME="work_queue";    public static void main(String[] args) throws IOException, TimeoutException {        //获取链接        Connection connection = connectionUtil.getConnection();        //获取channel        Channel channel = connection.createChannel();        //申明队列        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        //定义一个消费者        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            //音讯触发            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                String msg=new String(body,"utf-8");                System.out.println("receive"+msg);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    System.out.println("完结");                }            }        };        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);    }}

3.5、总结

  1. 采纳了轮询算法轮询机制

4、偏心散发

4.1、生产者

public class WorkQueue {    public static  final  String QUEUE_NAME="work_queue";    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        //创立连贯        Connection connection = connectionUtil.getConnection();        //获取channel链接        Channel channel = connection.createChannel();        //        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        /*        * 每个消费者发送确认音讯之前,音讯队列不发送下一个音讯到消费者。【一次只解决一个音讯】        * */        int prefetchCount=1;        channel.basicQos(prefetchCount);        for (int i = 0; i <50 ; i++) {            String msg="No"+i;            System.out.println(msg);            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());            Thread.sleep(i*20);        }        channel.close();        connection.close();    }}

4.2、消费者1

public class WorkRecv {    private final static String QUEUE_NAME="work_queue";    public static void main(String[] args) throws IOException, TimeoutException {        //获取链接        Connection connection = connectionUtil.getConnection();        //获取channel        final Channel channel = connection.createChannel();        //申明队列        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        channel.basicQos(1); //一次只散发一个        //定义一个消费者        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            //音讯触发            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                String msg=new String(body,"utf-8");                System.out.println("receive"+msg);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    System.out.println("完结");                    channel.basicAck(envelope.getDeliveryTag(),false);                }            }        };        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);    }}

4.2、消费者2

public class WorkReceive {    private final static String QUEUE_NAME="work_queue";    public static void main(String[] args) throws IOException, TimeoutException {        //获取链接        Connection connection = connectionUtil.getConnection();        //获取channel        final Channel channel = connection.createChannel();        //申明队列        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        //保障每次只散发一个        channel.basicQos(1);        //定义一个消费者        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            //音讯触发            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                String msg=new String(body,"utf-8");                System.out.println("receive"+msg);                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    System.out.println("【start】:");                    //手动回执一个音讯                    channel.basicAck(envelope.getDeliveryTag(),false);                }            }        };        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);    }}

4.3、疑难问题

  1. basicQos(1);用于限度rabbitMQ一次只散发一个音讯
  2. 应用偏心散发必须敞开自动应答ack,改成手动的

5、音讯应答与音讯长久化

5.1、音讯应答

  1. boolean autoack=false (手动确认模式),如果有消费者宕机,则会将信息交付给其余的消费者。【rabbitmq反对音讯应答,消费者发送一个音讯确认,则rabbitmq会进行删除内存数据】
  2. Boolean autoAck=true (主动确认模式),一旦rabbitmq将音讯分发给消费者就会从内存中删除

如果这种状况下,杀死正在执行的消费者,就会造成正在解决的信息失落。

  1. 默认状况下是autoAck是false
  2. 如果rabbitMQ宕机,则服务器数据依然失落

5.2、音讯长久化

//申明队列channel.queueDeclare(QUEUE_NAME,[durable]false,false,false,null);//durable:长久化,
  1. 将程序中的durable的false改称为true,也是不能够的。因为定义的QUEUE_NAME代表这个queue是未长久化的,rabbitmq不准从新定义一个已存在的队列

    • 在控制台【拜访mq的界面】进行删除这个队列

6、订阅模式【fanout】

  1. 一个生产者,多个消费者
  2. 每个消费者都有本人的队列
  3. 生产者将音讯发送到交换机【转发器】,而不是间接发送到队列中
  4. 每个队列都绑定在交换机
  5. 生产者发送的音讯,通过交换机达到队列【实现一个音讯被多个消费者生产】
  • 交换机中无奈数据【只有队列具备存储能力】,如果想要进行存储

6.1、生产者

public class Send {    public static final String EXCHANGE_NAME="test_exchange_fanout";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        //申明交换机        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//散发        String msg="hello_exchange";        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());        System.out.println("send:"+msg);        channel.close();        connection.close();    }}

6.2、消费者

public class Receive1 {    public static final String QUEUE_NAME="test_queue_exchange";    public static final String EXCHANGE_NAME="test_exchange_fanout";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtil.getConnection();        final Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        //绑定队列到交换机        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");        channel.basicQos(1);//保障每次只能散发一个        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                String msg=new String(body,"utf-8");                System.out.println("Receive1"+msg);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    System.out.println("[ok]");                    channel.basicAck(envelope.getDeliveryTag(),false);                }            }        };        boolean autoAck=false;        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);    }}

6.3、消费者2

public class Receive2{    public static final String QUEUE_NAME="email_queue_exchange";    public static final String EXCHANGE_NAME="test_exchange_fanout";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtil.getConnection();        final Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        //绑定队列到交换机        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");        channel.basicQos(1);//保障每次只能散发一个        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                String msg=new String(body,"utf-8");                System.out.println("Receive2"+msg);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    System.out.println("[ok2]");                    channel.basicAck(envelope.getDeliveryTag(),false);                }            }        };        boolean autoAck=false;        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);    }}

7、Routing【direct】

一方面接管生产者的音讯,另一方面向队列发送音讯

Fanout(不解决路由键)

Direct(解决路由键)

7.1、路由模式

7.2、生产者

public class Send {    public static final String EXCHANGE_NAME="test_exchange_direct";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        //申明交换机        channel.exchangeDeclare(EXCHANGE_NAME,"direct");        String msg=new String("Hello direct");        String routingKey="warning";        channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());        System.out.println("send"+msg);        channel.close();        connection.close();    }}

7.3、消费者1

public class Receive1 {    public static final String EXCHANGE_NAME="test_exchange_direct";    public static final String QUEUE_NAME="queue_direct_1";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtil.getConnection();        final Channel channel = connection.createChannel();        //        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        channel.basicQos(1);        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");        //定义一个消费者        DefaultConsumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                String msg=new String(body,"utf-8");                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    System.out.println("[done]"+msg);                    channel.basicAck(envelope.getDeliveryTag(),false);                }            }        };        boolean autoAck=false;//自动应答false        channel.basicConsume(QUEUE_NAME,autoAck,consumer);    }}

7.3、消费者2

public class Receive2 {    public static final String EXCHANGE_NAME="test_exchange_direct";    public static final String QUEUE_NAME="queue_direct_1";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtil.getConnection();        final Channel channel = connection.createChannel();        //        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        channel.basicQos(1);        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");        //定义一个消费者        DefaultConsumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                String msg=new String(body,"utf-8");                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    System.out.println("[done]"+msg);                    channel.basicAck(envelope.getDeliveryTag(),false);                }            }        };        boolean autoAck=false;//自动应答false        channel.basicConsume(QUEUE_NAME,autoAck,consumer);    }}

8、Topics Exchange

相似于sql中的含糊查问,

将路由键routing key和某个模式盘匹配

  1. 匹配一个或者多个

  2. *匹配一个

8.1、生产者

public class Send {    private static final String EXCHANGE_NAME="test_exchange_topic";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.exchangeDeclare(EXCHANGE_NAME,"topic");        String msg="商品";        channel.basicPublish(EXCHANGE_NAME,"goods.delete",null,msg.getBytes());        System.out.println("---send"+msg);        channel.close();        connection.close();    }}

8.2、消费者1

public class Receive1 {    public static final String QUEUE_NAME="test_queue_topic_1";    private static final String EXCHANGE_NAME="test_exchange_topic";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtil.getConnection();        final Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        //绑定队列到交换机        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");        channel.basicQos(1);//保障每次只能散发一个        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                String msg=new String(body,"utf-8");                System.out.println("Receive1"+msg);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    System.out.println("[ok]");                    channel.basicAck(envelope.getDeliveryTag(),false);                }            }        };        boolean autoAck=false;        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);    }}

8.2、消费者2

public class Receive2 {    public static final String QUEUE_NAME="test_queue_topic_1=2";    private static final String EXCHANGE_NAME="test_exchange_topic";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtil.getConnection();        final Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        //绑定队列到交换机        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");        channel.basicQos(1);//保障每次只能散发一个        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                String msg=new String(body,"utf-8");                System.out.println("Receive1"+msg);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    System.out.println("[ok]");                    channel.basicAck(envelope.getDeliveryTag(),false);                }            }        };        boolean autoAck=false;        channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer);    }}

9、音讯确认机制

在MQ中能够通过长久化数据解决rabbitmq服务器异样的数据失落问题

  1. 生产者将音讯发送进来当前,如何晓得音讯到底有没有达到rabbitmq服务器?

    • 形式一:AMQP实现事务机制
    • 形式二:Confirm模式

9.1、事务机制

txSelect :用户将以后channel设置成transaction模式

txCommit:用于提交事务

txRollback:回滚事务

生产者

public class TxSend {    private static final String QUEUE_NAME="tx_queue";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        String msg="hello autoCommit";        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        try {            channel.txSelect();            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());            channel.txCommit();            int i=1/0;            System.out.println(msg);        } catch (IOException e) {            channel.txRollback();            System.out.println("error");            e.printStackTrace();        }        channel.close();        connection.close();    }}

消费者

public class TxReceive1 {    private static final String QUEUE_NAME="tx_queue";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                super.handleDelivery(consumerTag, envelope, properties, body);                System.out.println("receive"+new String(body,"utf-8"));            }        });    }}

9.2、生产者Confirm模式

9.2.1、发送单条

public class Send1 {    private static final String QUEUE_NAME="tx_queue";    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        //设置为confirm模式        channel.confirmSelect();        String msg="confirm_text";        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());        if (!channel.waitForConfirms()){            System.out.println("send message failed");        }else {            System.out.println("success");        }        channel.close();        connection.close();    }}

9.2.2、批量发送

public class SendMore {    private static final String QUEUE_NAME="confirm_queue";    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        //设置为confirm模式        channel.confirmSelect();        String msg="confirm_text";        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());        //次要进行遍历发送,串行的【发送完之后在进行确认】        for (int i = 0; i <10 ; i++) {            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());        }        if (!channel.waitForConfirms()){            System.out.println("send message failed");        }else {            System.out.println("success");        }        channel.close();        connection.close();    }}

9.3、异步模式

10、参数详解

10.1、queueDeclare

//绑定channel与音讯队列//参数一:队列名称【如果不存在该队列,则主动创立】//参数二:durable【是否要进行长久化】//参数三:exclusive【是否独占队列】//参数四:是否在生产实现之后是否要立刻删除队列【true:主动删除】【false:不主动删除】//参数五:额定参数channel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args);channel.queueDeclare("helloWorld",false,false,false,null);

10.2、basicPublish

//公布音讯/** 参数1:【exchange】交换机名称* 参数2:【routingKey】队列名称* 参数3:【props】传递音讯的额定设置* 参数4:传递音讯的具体内容【byte类型】** */channel.basicPublish("","helloWorld",null,"hello rabbitMQ".getBytes());

10.3、basicConsume

//生产信息/* * 参数一:队列名称 * 参数二:开启音讯的主动确认机制 * 参数三:生产时的回调接口 * */channel.basicConsume("helloWorld",true,consumer);

11、整合Boot

11.1、依赖导入

 <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId>     <version>2.2.6.RELEASE</version></dependency>

11.2、yml

spring:  application:    name: rabbit-springboot  rabbitmq:    host: 192.168.168.130    port: 5672    virtual-host: /    username: lgz    password: pwd123456

11.3boottest

@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid contextLoads() {    rabbitTemplate.convertAndSend("hello","helloWorld");    System.out.println(1);}

11.4、

@SpringBootTestclass BootRabbitmqApplicationTests {    @Autowired    private RabbitTemplate rabbitTemplate;    @Test    void contextLoads() {        rabbitTemplate.convertAndSend("hello","helloWorld");        System.out.println(1);    }    @Test    public void testWork(){        for (int i = 0; i <5 ; i++) {            rabbitTemplate.convertAndSend("work","work模型");        }    }    @Test    public void testFanout(){        rabbitTemplate.convertAndSend("logs","","Fanout model");    }    @Test    public void testRouting(){        rabbitTemplate.convertAndSend("directs","info","info_key_routing_information");    }    @Test    public  void  testTopic(){        rabbitTemplate.convertAndSend("topics","user.save","user.save exchange");    }}

11.5、测试topic[消费者]

@RabbitListener(bindings = {        @QueueBinding(                value = @Queue,                exchange = @Exchange(type = "topic",name = "topics"),                key = {"product.save","product.*"}        )})public  void  receive(String msg){    System.out.println("consumer1"+msg);}@RabbitListener(bindings = {        @QueueBinding(                value = @Queue,                exchange = @Exchange(type = "topic",name = "topics"),                key = {"user.save","user.*"}        )})public  void  receive2(String msg){    System.out.println("consumer2"+msg);}

11.5、测试routing

@RabbitListener(bindings = {        @QueueBinding(                value = @Queue,                exchange = @Exchange(value = "directs",type = "direct"),                key = {"info","error"}        )})public void receive1(String msg){    System.out.println(msg);}

11.6、一般的work

@RabbitListener(queuesToDeclare = @Queue("work"))public void receive(String message){    System.out.println("message:["+message+"]");}

11.7、Fanout模式

@RabbitListener(bindings = {        @QueueBinding(                value = @Queue, //创立长期队列                exchange =@Exchange(value = "logs", type = "fanout")    //绑定的交换机        )})public void receive1(String msg){    System.out.println("["+msg+"]");}

11.8、Simple

@Component@RabbitListener(queuesToDeclare = @Queue("hello"))public class Hello {    @RabbitHandler    public void receive1(String message){        System.out.println("message"+message);    }}