共计 7214 个字符,预计需要花费 19 分钟才能阅读完成。
一. RabbitMQ 音讯解决
在上一节咱们讲到了 RabbitMQ 的简略架构,咱们先回顾一下 RabbitMQ 中音讯的流转流程,这个概念会在本节中应用到。
1. 生产者生产音讯后,会绑定一个路由键将音讯投递到交换器中
2. 队列通过路由键绑定到交换器
3. 交换器通过音讯的路由键去查找绑定在它下面的队列,通过不同的规定去匹配队列绑定的路由键,匹配胜利后将音讯路由到队列上,遍历所有队列后都没有匹配的音讯将无奈路由到队列
4. 消费者监听队列,一旦有音讯路由到队列,消费者就会从队列中取出音讯或者是 RabbitMQ 将音讯推送给消费者进行生产
下面的流程是在失常状况下的音讯解决流程,对于投递失败的音讯和生产失败的音讯,咱们会在后续的知识点中讲到
二. 交换器类型
不同类型的交换器有不同的路由键键匹配规定,咱们先来看看 RabbitMQ 中有哪几种交换器
1 Direct
只有当路由键齐全匹配时,才会将音讯投递到相应的队列中
2 Fanout
音讯播送到绑定的队列,即该类型的交换器不会依据路由键去匹配队列,只有有音讯投递到该交换器,它就会将音讯投递到所有绑定在该交换器上的队列
3 Topic
Topic 交换器能够应用通配符 (“*” 和 “#”) 使来自不同源头的音讯达到对立队列。”.”将路由键分为了几个标识符,” * “ 匹配 1 个,”#” 匹配一个或多个
eg:有四个队列,queue1 绑定路由键 ”usa.#”,queue2 绑定路由键 ”#.news”,queue3 绑定路由键 ”*.weather”,queue4 绑定路由键 ”europe.*”
四条音讯:message1 绑定的路由键为 ”usa.news”,message2 绑定的路由键为 ”usa.weather”,message3 绑定的路由键为 ”europe.news”,message4 绑定的路由键为 ”errope.weather”
路由后:
queue1 能收到音讯 message1,message2
queue2 能收到音讯 message1,message3
queue3 能收到音讯 message2,message4
queue4 能收到音讯 message3,message4
须要留神一点是,”*” 是匹配一个,”#” 匹配多个。若是音讯绑定路由键为 usa.weather.cloud,那么 ”usa.*” 无奈匹配该路由键,”usa.#” 能匹配,此时也能够应用 ”usa.*.*” 匹配
4 Headers
因为 headers 交换器根本不必,这里就不做解说
三. 应用 RabbitMQ 原生 API
应用 RabbitMQ 原生 API 来发送和生产音讯时,依照如下流程
生产者:
获取连贯工厂
ConnectionFactory factory = new ConnectionFactory();
新建连贯
public Connection newConnection();
创立信道
Channel createChannel();
申明交换器
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type)
args1 交换器名称,args2 交换器类型
发送音讯
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
args1 交换器名,args2 路由键,args3 音讯无奈路由时的解决,为 true 时生产者能够通过回调函数获取音讯状态,args4 音讯属性,args5 音讯体
消费者:
获取连贯工厂
ConnectionFactory factory = new ConnectionFactory();
新建连贯
public Connection newConnection();
创立信道
Channel createChannel();
申明交换器
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type)
args1 交换器名称,args2 交换器类型
申明队列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
args1 队列名,arg2 是否长久化,args3 是否独占队列,args4 主动删除,args5 其余属性
队列绑定
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
args1 队列名,args2 交换器名,args3 路由键,arg4 其余属性
音讯生产
String basicConsume(String queue, boolean autoAck, Consumer callback)
args1 队列名,args2 是否主动签收,args3 消费者
四. 应用不同的交换器实现音讯解决
- Direct Exchange
DirectProducer Demo
public class DirectProducer { | |
public static final String EXCHANGE_NAME = "direct_exchange"; | |
public static final String ROUTING_KEY = "direct_key"; | |
public static void main(String[] args) throws Exception{ | |
// 创立连贯工厂 | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setHost("127.0.0.1"); | |
factory.setVirtualHost("/"); | |
// 新建连贯 | |
Connection connection = factory.newConnection(); | |
// 创立信道 | |
Channel channel = connection.createChannel(); | |
// 申明交换器 | |
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); | |
String msg = "hello rabbitmq"; | |
// 发送音讯 | |
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,null, msg.getBytes()); | |
System.out.println("音讯投递实现,音讯:" + msg + ", 路由键:" + ROUTING_KEY); | |
channel.close(); | |
connection.close();} | |
} |
生产者端后果
DirectConsumer Demo
public class DirectConsumer {public static void main(String[] args) throws Exception{ | |
// 创立连贯工厂 | |
ConnectionFactory connectionFactory = new ConnectionFactory(); | |
connectionFactory.setHost("127.0.0.1"); | |
connectionFactory.setVirtualHost("/"); | |
// 创立连贯 | |
Connection connection = connectionFactory.newConnection(); | |
// 创立信道 | |
Channel channel = connection.createChannel(); | |
// 申明交换器 | |
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); | |
String queueName = "direct_queue"; | |
// 申明队列 | |
channel.queueDeclare(queueName, false, false, false, null); | |
// 将队列绑定到交换器 | |
channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, DirectProducer.ROUTING_KEY); | |
// 申明消费者 | |
Consumer consumer = new DefaultConsumer(channel){ | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8"); | |
System.out.println("收到的音讯:" + msg + ", 路由键:" + envelope.getRoutingKey()); | |
} | |
}; | |
// 消费者生产指定队列的音讯 | |
channel.basicConsume(queueName, true, consumer); | |
} | |
} |
生产端后果:
- Fanout Exchange
FanoutProducer Demo
public class FanoutProducer { | |
// 交换器 | |
public static final String EXCHANGE_NAME = "fanout_exchange"; | |
// 路由键 | |
public static final String ROUTING_KEY = "fanout_key"; | |
public static void main(String[] args) throws Exception{ | |
// 获取连贯工厂 | |
ConnectionFactory connectionFactory = new ConnectionFactory(); | |
connectionFactory.setVirtualHost("/"); | |
connectionFactory.setHost("127.0.0.1"); | |
connectionFactory.setPort(5672); | |
// 获取连贯 | |
Connection connection = connectionFactory.newConnection(); | |
// 申明信道 | |
Channel channel = connection.createChannel(); | |
// 申明交换器 | |
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); | |
// 发送音讯 | |
String msg = "hello rabbitmq"; | |
channel.basicPublish(FanoutProducer.EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes()); | |
System.out.println("发送音讯:" + msg + ",路由键:" + ROUTING_KEY); | |
channel.close(); | |
connection.close();} | |
} |
生产端后果:
FanoutConsumer Demo
public class FanoutConsumer {public static void main(String[] args) throws Exception{ | |
// 创立连贯工厂 | |
ConnectionFactory connectionFactory = new ConnectionFactory(); | |
connectionFactory.setHost("127.0.0.1"); | |
connectionFactory.setVirtualHost("/"); | |
// 创立连贯 | |
Connection connection = connectionFactory.newConnection(); | |
// 创立信道 | |
Channel channel = connection.createChannel(); | |
// 申明交换器 | |
channel.exchangeDeclare(FanoutProducer.EXCHANGE_NAME, BuiltinExchangeType.FANOUT); | |
// 队列 | |
String[] queueNames = {"queue1", "queue2", "queue3"}; | |
// 路由键 | |
String[] routingKey = {"fanout_key", "fanout_key1", "fanout_key2"}; | |
// 别离应用不同的路由键申明不同的队列 | |
for (int i = 0; i < 3; i++){channel.queueDeclare(queueNames[i], false, false, false, null); | |
channel.queueBind(queueNames[i], FanoutProducer.EXCHANGE_NAME, routingKey[i]); | |
// 生产音讯 | |
channel.basicConsume(queueNames[i], true, new DefaultConsumer(channel){ | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8"); | |
System.out.println("音讯:" + msg + ",路由键:" + envelope.getRoutingKey()); | |
} | |
}); | |
} | |
} | |
} |
生产端后果:
- Topic Exchange
TopicProducer Demo
public class TopicProducer { | |
// 交换器 | |
public static final String EXCHANGE_NAME = "topic_exchange"; | |
public static void main(String[] args) throws Exception{ | |
// 创立连贯工厂 | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setHost("127.0.0.1"); | |
factory.setVirtualHost("/"); | |
// 新建连贯 | |
Connection connection = factory.newConnection(); | |
// 创立信道 | |
Channel channel = connection.createChannel(); | |
// 申明交换器 | |
channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC); | |
// 发送音讯 | |
String msg = "hello rabbitmq"; | |
String[] routingKey = {"topic.key1", "topic.key2", "topic.key2.test"}; | |
for (int i = 0; i < 3; i++){channel.basicPublish(TopicProducer.EXCHANGE_NAME, routingKey[i], null, msg.getBytes()); | |
System.out.println("发送音讯:" + msg + ", 路由键:" + routingKey[i]); | |
} | |
channel.close(); | |
connection.close();} | |
} |
生产端后果:
TopicConsumer Demo
public class TopicConsumer {public static void main(String[] args) throws Exception{ | |
// 创立连贯工厂 | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setHost("127.0.0.1"); | |
factory.setVirtualHost("/"); | |
// 新建连贯 | |
Connection connection = factory.newConnection(); | |
// 创立信道 | |
Channel channel = connection.createChannel(); | |
// 申明交换器 | |
channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC); | |
String queueName = "topic_queue"; | |
// 申明队列 | |
channel.queueDeclare(queueName, false, false, false, null); | |
// 队列绑定 | |
channel.queueBind(queueName, TopicProducer.EXCHANGE_NAME, "topic.*"); | |
// 音讯生产 | |
channel.basicConsume(queueName, true, new DefaultConsumer(channel){ | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8"); | |
System.out.println("音讯:" + msg + ", 路由键:" + envelope.getRoutingKey()); | |
} | |
}); | |
} | |
} |
生产端后果: