1、RabbitMQ
音讯队列解决了什么问题
- 异步解决
- 利用解耦
- 流量削锋
- 日志解决
运行rabbitmq镜像
# docker run --name rabbitmq -tid -p 5672:5672 -p 15672:15672 -p 25672:25672 rabbitmq
批改rabbitmq设置
# docker exec -it rabbitmq /bin/bash
新增用户
# rabbitmqctl add_user [user_name] [pwd]
查看用户
# rabbitmqctl list_users
Setting permissions for user "[user_name]" in vhost "/" ...
# rabbitmqctl set_permissions -p "/" [user_name] ".*" ".*" ".*"# rabbitmqctl list_permissions -p /
将[user_name]用户设置为administrator角色
# rabbitmqctl set_user_tags asdf administrator
删除guest用户
# rabbitmqctl delete_user guest
开启web界面
# rabbitmq-plugins enable rabbitmq_management
web拜访
# http://IP:15672
Java操作RabbitMQ
- simple 简略队列
- work queues 工作队列 偏心散发 轮询散发
- publish/subscribe 公布订阅
- routing 路由抉择 通配符模式
- Topics 主题
- 手动和主动确认音讯
- 队列的长久化和非长久化
- rabbitMQ的提早队列
依赖
<dependencies> <!-- 引入队列依赖 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version> </dependency><dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.10</version></dependency><dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version></dependency><dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version></dependency><dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version></dependency> </dependencies>
2、简略队列
2.2、定义连贯MQ的工具
public class connectionUtil { public static Connection getConnection() throws IOException, TimeoutException { //定义链接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址[运行rabbitMQ的地址] factory.setHost("192.168.168.130"); //AMQ的端口号 factory.setPort(5672); //vHost factory.setVirtualHost("lgz"); factory.setUsername("lgz"); factory.setPassword("pwd123456"); return factory.newConnection(); }}
2.3、生产者发送音讯
//测试发送信息public class ProducerSend { private static final String QUEUE_NAME="test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("cs"); //获取链接 Connection connection = connectionUtil.getConnection(); //从链接中获取通道 Channel channel = connection.createChannel(); //队列申明 channel.queueDeclare(QUEUE_NAME,false,false,false,null); String msg="hello simple"; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("test success"); channel.close(); connection.close(); }}
2.4、消费者获取信息
public class comsumerGain { private static final String QUE_NAME="test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { //获取链接 Connection connection = connectionUtil.getConnection(); //创立通道 Channel channel = connection.createChannel(); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { //获取达到的音讯 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg = new String(body, "utf-8"); System.out.println(msg); } }; //监听队列 channel.basicConsume(QUE_NAME,true,consumer); }}
2.5、简略队列的毛病
- 耦合性高:生产者一 一对应消费者
3、工作队列
3.1、工作队列模型
3.2、生产者演示
public class WorkQueue { public static final String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //创立连贯 Connection connection = connectionUtil.getConnection(); //获取channel链接 Channel channel = connection.createChannel(); // channel.queueDeclare(QUEUE_NAME,false,false,false,null); for (int i = 0; i <50 ; i++) { String msg="No"+i; System.out.println(msg); channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); Thread.sleep(i*20); } channel.close(); connection.close(); }}
3.3、消费者1
public class WorkRecv { private final static String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //获取链接 Connection connection = connectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //定义一个消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { //音讯触发 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("receive"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("完结"); } } }; channel.basicConsume(QUEUE_NAME,true,defaultConsumer); }
3.4、消费者2
public class WorkRecv { private final static String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //获取链接 Connection connection = connectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //定义一个消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { //音讯触发 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("receive"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("完结"); } } }; channel.basicConsume(QUEUE_NAME,true,defaultConsumer); }}
3.5、总结
- 采纳了轮询算法轮询机制
4、偏心散发
4.1、生产者
public class WorkQueue { public static final String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //创立连贯 Connection connection = connectionUtil.getConnection(); //获取channel链接 Channel channel = connection.createChannel(); // channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 每个消费者发送确认音讯之前,音讯队列不发送下一个音讯到消费者。【一次只解决一个音讯】 * */ int prefetchCount=1; channel.basicQos(prefetchCount); for (int i = 0; i <50 ; i++) { String msg="No"+i; System.out.println(msg); channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); Thread.sleep(i*20); } channel.close(); connection.close(); }}
4.2、消费者1
public class WorkRecv { private final static String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //获取链接 Connection connection = connectionUtil.getConnection(); //获取channel final Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1); //一次只散发一个 //定义一个消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { //音讯触发 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("receive"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("完结"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; channel.basicConsume(QUEUE_NAME,false,defaultConsumer); }}
4.2、消费者2
public class WorkReceive { private final static String QUEUE_NAME="work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //获取链接 Connection connection = connectionUtil.getConnection(); //获取channel final Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //保障每次只散发一个 channel.basicQos(1); //定义一个消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { //音讯触发 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("receive"+msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("【start】:"); //手动回执一个音讯 channel.basicAck(envelope.getDeliveryTag(),false); } } }; channel.basicConsume(QUEUE_NAME,false,defaultConsumer); }}
4.3、疑难问题
- basicQos(1);用于限度rabbitMQ一次只散发一个音讯
- 应用偏心散发必须敞开自动应答ack,改成手动的
5、音讯应答与音讯长久化
5.1、音讯应答
- boolean autoack=false (手动确认模式),如果有消费者宕机,则会将信息交付给其余的消费者。【rabbitmq反对音讯应答,消费者发送一个音讯确认,则rabbitmq会进行删除内存数据】
- Boolean autoAck=true (主动确认模式),一旦rabbitmq将音讯分发给消费者就会从内存中删除
如果这种状况下,杀死正在执行的消费者,就会造成正在解决的信息失落。
- 默认状况下是autoAck是false
- 如果rabbitMQ宕机,则服务器数据依然失落
5.2、音讯长久化
//申明队列channel.queueDeclare(QUEUE_NAME,[durable]false,false,false,null);//durable:长久化,
将程序中的durable的false改称为true,也是不能够的。因为定义的QUEUE_NAME代表这个queue是未长久化的,rabbitmq不准从新定义一个已存在的队列
- 在控制台【拜访mq的界面】进行删除这个队列
6、订阅模式【fanout】
- 一个生产者,多个消费者
- 每个消费者都有本人的队列
- 生产者将音讯发送到交换机【转发器】,而不是间接发送到队列中
- 每个队列都绑定在交换机
- 生产者发送的音讯,通过交换机达到队列【实现一个音讯被多个消费者生产】
- 交换机中无奈数据【只有队列具备存储能力】,如果想要进行存储
6.1、生产者
public class Send { public static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //申明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//散发 String msg="hello_exchange"; channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); System.out.println("send:"+msg); channel.close(); connection.close(); }}
6.2、消费者
public class Receive1 { public static final String QUEUE_NAME="test_queue_exchange"; public static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //绑定队列到交换机 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); channel.basicQos(1);//保障每次只能散发一个 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("Receive1"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[ok]"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer); }}
6.3、消费者2
public class Receive2{ public static final String QUEUE_NAME="email_queue_exchange"; public static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //绑定队列到交换机 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); channel.basicQos(1);//保障每次只能散发一个 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("Receive2"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[ok2]"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer); }}
7、Routing【direct】
一方面接管生产者的音讯,另一方面向队列发送音讯
Fanout(不解决路由键)
Direct(解决路由键)
7.1、路由模式
7.2、生产者
public class Send { public static final String EXCHANGE_NAME="test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //申明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); String msg=new String("Hello direct"); String routingKey="warning"; channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes()); System.out.println("send"+msg); channel.close(); connection.close(); }}
7.3、消费者1
public class Receive1 { public static final String EXCHANGE_NAME="test_exchange_direct"; public static final String QUEUE_NAME="queue_direct_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error"); //定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[done]"+msg); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false;//自动应答false channel.basicConsume(QUEUE_NAME,autoAck,consumer); }}
7.3、消费者2
public class Receive2 { public static final String EXCHANGE_NAME="test_exchange_direct"; public static final String QUEUE_NAME="queue_direct_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info"); //定义一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[done]"+msg); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false;//自动应答false channel.basicConsume(QUEUE_NAME,autoAck,consumer); }}
8、Topics Exchange
相似于sql中的含糊查问,
将路由键routing key和某个模式盘匹配
匹配一个或者多个
- *匹配一个
8.1、生产者
public class Send { private static final String EXCHANGE_NAME="test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); String msg="商品"; channel.basicPublish(EXCHANGE_NAME,"goods.delete",null,msg.getBytes()); System.out.println("---send"+msg); channel.close(); connection.close(); }}
8.2、消费者1
public class Receive1 { public static final String QUEUE_NAME="test_queue_topic_1"; private static final String EXCHANGE_NAME="test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //绑定队列到交换机 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add"); channel.basicQos(1);//保障每次只能散发一个 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("Receive1"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[ok]"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer); }}
8.2、消费者2
public class Receive2 { public static final String QUEUE_NAME="test_queue_topic_1=2"; private static final String EXCHANGE_NAME="test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //绑定队列到交换机 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#"); channel.basicQos(1);//保障每次只能散发一个 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String msg=new String(body,"utf-8"); System.out.println("Receive1"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[ok]"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; channel.basicConsume(QUEUE_NAME,autoAck,defaultConsumer); }}
9、音讯确认机制
在MQ中能够通过长久化数据解决rabbitmq服务器异样的数据失落问题
生产者将音讯发送进来当前,如何晓得音讯到底有没有达到rabbitmq服务器?
- 形式一:AMQP实现事务机制
- 形式二:Confirm模式
9.1、事务机制
txSelect :用户将以后channel设置成transaction模式
txCommit:用于提交事务
txRollback:回滚事务
生产者
public class TxSend { private static final String QUEUE_NAME="tx_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String msg="hello autoCommit"; channel.queueDeclare(QUEUE_NAME,false,false,false,null); try { channel.txSelect(); channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); channel.txCommit(); int i=1/0; System.out.println(msg); } catch (IOException e) { channel.txRollback(); System.out.println("error"); e.printStackTrace(); } channel.close(); connection.close(); }}
消费者
public class TxReceive1 { private static final String QUEUE_NAME="tx_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("receive"+new String(body,"utf-8")); } }); }}
9.2、生产者Confirm模式
9.2.1、发送单条
public class Send1 { private static final String QUEUE_NAME="tx_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //设置为confirm模式 channel.confirmSelect(); String msg="confirm_text"; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); if (!channel.waitForConfirms()){ System.out.println("send message failed"); }else { System.out.println("success"); } channel.close(); connection.close(); }}
9.2.2、批量发送
public class SendMore { private static final String QUEUE_NAME="confirm_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //设置为confirm模式 channel.confirmSelect(); String msg="confirm_text"; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); //次要进行遍历发送,串行的【发送完之后在进行确认】 for (int i = 0; i <10 ; i++) { channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } if (!channel.waitForConfirms()){ System.out.println("send message failed"); }else { System.out.println("success"); } channel.close(); connection.close(); }}
9.3、异步模式
10、参数详解
10.1、queueDeclare
//绑定channel与音讯队列//参数一:队列名称【如果不存在该队列,则主动创立】//参数二:durable【是否要进行长久化】//参数三:exclusive【是否独占队列】//参数四:是否在生产实现之后是否要立刻删除队列【true:主动删除】【false:不主动删除】//参数五:额定参数channel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args);channel.queueDeclare("helloWorld",false,false,false,null);
10.2、basicPublish
//公布音讯/** 参数1:【exchange】交换机名称* 参数2:【routingKey】队列名称* 参数3:【props】传递音讯的额定设置* 参数4:传递音讯的具体内容【byte类型】** */channel.basicPublish("","helloWorld",null,"hello rabbitMQ".getBytes());
10.3、basicConsume
//生产信息/* * 参数一:队列名称 * 参数二:开启音讯的主动确认机制 * 参数三:生产时的回调接口 * */channel.basicConsume("helloWorld",true,consumer);
11、整合Boot
11.1、依赖导入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.2.6.RELEASE</version></dependency>
11.2、yml
spring: application: name: rabbit-springboot rabbitmq: host: 192.168.168.130 port: 5672 virtual-host: / username: lgz password: pwd123456
11.3boottest
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid contextLoads() { rabbitTemplate.convertAndSend("hello","helloWorld"); System.out.println(1);}
11.4、
@SpringBootTestclass BootRabbitmqApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend("hello","helloWorld"); System.out.println(1); } @Test public void testWork(){ for (int i = 0; i <5 ; i++) { rabbitTemplate.convertAndSend("work","work模型"); } } @Test public void testFanout(){ rabbitTemplate.convertAndSend("logs","","Fanout model"); } @Test public void testRouting(){ rabbitTemplate.convertAndSend("directs","info","info_key_routing_information"); } @Test public void testTopic(){ rabbitTemplate.convertAndSend("topics","user.save","user.save exchange"); }}
11.5、测试topic[消费者]
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic",name = "topics"), key = {"product.save","product.*"} )})public void receive(String msg){ System.out.println("consumer1"+msg);}@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic",name = "topics"), key = {"user.save","user.*"} )})public void receive2(String msg){ System.out.println("consumer2"+msg);}
11.5、测试routing
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "directs",type = "direct"), key = {"info","error"} )})public void receive1(String msg){ System.out.println(msg);}
11.6、一般的work
@RabbitListener(queuesToDeclare = @Queue("work"))public void receive(String message){ System.out.println("message:["+message+"]");}
11.7、Fanout模式
@RabbitListener(bindings = { @QueueBinding( value = @Queue, //创立长期队列 exchange =@Exchange(value = "logs", type = "fanout") //绑定的交换机 )})public void receive1(String msg){ System.out.println("["+msg+"]");}
11.8、Simple
@Component@RabbitListener(queuesToDeclare = @Queue("hello"))public class Hello { @RabbitHandler public void receive1(String message){ System.out.println("message"+message); }}