共计 9280 个字符,预计需要花费 24 分钟才能阅读完成。
人生终将是场单人旅途,孤单之前是迷茫,孤单过后是成长。
楔子
这篇是音讯队列 RabbitMQ 的第二弹。
上一篇的结尾我也预报了本篇的内容:利用 RabbitTemplate 和注解进行收发音讯,还有一个我长期加上的内容:音讯的序列化转换。
本篇会和 SpringBoot 做整合,采纳主动配置的形式进行开发,咱们只须要申明 RabbitMQ 地址就能够了,对于各种创立连贯敞开连贯的事都由 Spring 帮咱们了~
交给 Spring 帮咱们治理连贯能够让咱们专一于业务逻辑,就像申明式事务一样易用,不便又高效。
祝有好播种,先赞后看,高兴有限。
本文代码: 码云地址 GitHub 地址
Tip:上一篇的代码都放在 prototype
包下,本篇的代码都放在 auto
包上面。
1. ???? 环境配置
第一节咱们先来搞一下环境的配置,上一篇中咱们曾经引入了主动配置的包,咱们既然应用了主动配置的形式,那 RabbitMQ
的连贯信息咱们间接放在配置文件中就行了,就像咱们须要用到 JDBC 连贯的时候去配置一下 DataSource
一样。
如图所示,咱们只须要指明一下连贯的 IP+ 端口号和用户名明码就行了,这里我用的是默认的用户名与明码,不写的话默认也都是 guest,端口号也是默认 5672。
次要咱们须要看一下手动确认音讯的配置,须要配置成 manual
才是手动确认,日后还会有其余的配置项,眼下咱们配置这一个就能够了。
接下来咱们要配置一个 Queue
,上一篇中咱们往一个名叫erduo
的队列中发送音讯,过后是咱们手动定义的此队列,这里咱们也须要手动配置,申明一个 Bean
就能够了。
@Configuration
public class RabbitmqConfig {
@Bean
public Queue erduo() {
// 其三个参数:durable exclusive autoDelete
// 个别只设置一下长久化即可
return new Queue("erduo",true);
}
}
就这么简略申明一下就能够了,当然了 RabbitMQ
毕竟是一个独立的组件,如果你在 RabbitMQ
中通过其余形式曾经创立过一个名叫 erduo
的队列了,你这里也能够不申明,这里起到的一个成果就是如果你没有这个队列,会依照你申明的形式帮你创立这个队列。
配置完环境之后,咱们就能够以 SpringBoot 的形式来编写生产者和消费者了。
2. ???? 生产者与 RabbitTemplate
和上一篇的节奏一样,咱们先来编写生产者,不过这次我要引入一个新的工具:RabbitTemplate
。
听它的这个名字就晓得,又是一个拿来即用的工具类,Spring 家族这点就很难受,什么货色都给你封装一遍,让你用起来更不便更棘手。
RabbitTemplate
实现了规范 AmqpTemplate 接口,性能大抵能够分为发送音讯和承受音讯。
咱们这里是在生产者中来用,次要就是应用它的发送音讯性能:send
和 convertAndSend
办法。
// 发送音讯到默认的 Exchange,应用默认的 routing key
void send(Message message) throws AmqpException;
// 应用指定的 routing key 发送音讯到默认的 exchange
void send(String routingKey, Message message) throws AmqpException;
// 应用指定的 routing key 发送音讯到指定的 exchange
void send(String exchange, String routingKey, Message message) throws AmqpException;
send
办法是发送 byte 数组的数据的模式,这里代表音讯内容的对象是 Message
对象,它的构造方法就是传入 byte 数组数据,所以咱们须要把咱们的数据转成 byte 数组而后结构成一个 Message
对象再进行发送。
// Object 类型,能够传入 POJO
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
convertAndSend
办法是能够传入 POJO 对象作为参数,底层是有一个 MessageConverter
帮咱们主动将数据转换成 byte 类型或 String 或序列化类型。
所以这里反对的传入对象也只有三种:byte 类型,String 类型和实现了 Serializable
接口的 POJO。
介绍完了,咱们能够看一下代码:
@Slf4j
@Component("rabbitProduce")
public class RabbitProduce {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {String message = "Hello 我是作者和耳朵,欢送关注我。" + LocalDateTime.now().toString();
System.out.println("Message content :" + message);
// 指定音讯类型
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();
rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props));
System.out.println("音讯发送结束。");
}
public void convertAndSend() {User user = new User();
System.out.println("Message content :" + user);
rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user);
System.out.println("音讯发送结束。");
}
}
这里我特意写明了两个例子,一个用来测试 send,另一个用来测试 convertAndSend。
send
办法里咱们看下来和之前的代码是简直一样的,定义一个音讯,而后间接 send,然而这个结构音讯的构造方法可能比咱们想的要多一个参数,咱们原来说的只有把数据转成二进制数组放进去即可,当初看来还要多放一个参数了。
MessageProperties
,是的咱们须要多放一个 MessageProperties
对象,从他的名字咱们也能够看出它的性能就是附带一些参数,然而某些参数是少不了的,不带不行。
比方我的代码这里就是设置了一下音讯的类型,音讯的类型有很多种能够是二进制类型,文本类型,或者序列化类型,JSON 类型,我这里设置的就是文本类型,指定类型是必须的,也能够为咱们拿到音讯之后要将音讯转换成什么样的对象提供一个参考。
convertAndSend
办法就要简略太多,这里我放了一个 User 对象拿来测试用,间接指定队列而后放入这个对象即可。
Tips:User 必须实现 Serializable
接口,不然的话调用此办法的时候会抛出 IllegalArgumentException
异样。
代码实现之后咱们就能够调用了,这里我写一个测试类进行调用:
@SpringBootTest
public class RabbitProduceTest {
@Autowired
private RabbitProduce rabbitProduce;
@Test
public void sendSimpleMessage() {rabbitProduce.send();
rabbitProduce.convertAndSend();}
}
成果如下图~
同时在控制台应用命令 rabbitmqctl.bat list_queues
查看队列 -erduo
当初的状况:
如此一来,咱们的生产者测试就算实现了,当初音讯队列里两条音讯了,而且音讯类型必定不一样,一个是咱们设置的文本类型,一个是主动设置的序列化类型。
3. ???? 消费者与 RabbitListener
既然队列外面曾经有音讯了,接下来咱们就要看咱们该如何通过新的形式拿到音讯并生产与确认了。
消费者这里咱们要用到 @RabbitListener
来帮咱们拿到指定队列音讯,它的用法很简略也很简单,咱们能够先来说简略的形式,间接放到办法上,指定监听的队列就行了。
@Slf4j
@Component("rabbitConsumer")
public class RabbitConsumer {@RabbitListener(queues = Producer.QUEUE_NAME)
public void onMessage(Message message, Channel channel) throws Exception {System.out.println("Message content :" + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("音讯已确认");
}
}
这段代码就代表 onMessage
办法会解决 erduo
(Producer.QUEUE_NAME 是常量字符串 ”erduo”) 队列中的音讯。
咱们能够看到这个办法外面有两个参数,Message
和 Channel
,如果用不到Channel
能够不写此参数,然而 Message
音讯肯定是要的,它代表了音讯自身。
咱们能够想想,咱们的程序从 RabbitMQ
之中拉回一条条音讯之后,要以怎么样的形式展现给咱们呢?
没错,就是封装为一个个 Message
对象,这外面放入了一条音讯的所有信息,数据结构是什么样一会我一 run 你就能看到了。
同时这里咱们应用 Channel
做一个音讯确认的操作,这里的 DeliveryTag 代表的是这个音讯在队列中的序号,这个信息寄存在 MessageProperties
中。
4. ????SpringBoot 启动!
编写完生产者和消费者,同时曾经运行过生产者往音讯队列外面放了两条信息,接下来咱们能够间接启动音讯,查看生产状况:
在我红色框线标记的中央能够看到,因为咱们有了消费者所以我的项目启动后先和 RabbitMQ 建设了一个连贯进行监听队列。
随后就开始生产咱们队列中的两条音讯:
第一条信息是 contentType=text/plain
类型,所以间接就在管制台上打印出了具体内容。
第二条信息是contentType=application/x-java-serialized-object
,在打印的时候只打印了一个内存地址 + 字节大小。
不管怎么说,数据咱们是拿到了,也就是代表咱们的生产是没有问题的,同时也都进行了音讯确认操作,从数据上看,整个音讯能够分为两局部:body
和MessageProperties
。
咱们能够独自应用一个注解拿到这个 body 的内容 – @Payload
@RabbitListener(queues = Producer.QUEUE_NAME)
public void onMessage(@Payload String body, Channel channel) throws Exception {System.out.println("Message content :" + body);
}
也能够独自应用一个注解拿到 MessageProperties
的 headers 属性,headers 属性在截图里也能够看到,只不过是个空的 – @Headers。
@RabbitListener(queues = Producer.QUEUE_NAME)
public void onMessage(@Payload String body, @Headers Map<String,Object> headers) throws Exception {System.out.println("Message content :" + body);
System.out.println("Message headers :" + headers);
}
这两个注解都算是扩大常识,我还是更喜爱间接拿到全副,全都要!!!
下面咱们曾经实现了音讯的发送与生产,整个过程咱们能够再次回忆一下,所有都和我画的这张图上一样的轨迹:
只不过咱们始终没有指定 Exchage
始终应用的默认路由,心愿大家好好记住这张图。
5. ????@RabbitListener 与 @RabbitHandler
上面再来补一些知识点,无关 @RabbitListener
与@RabbitHandler
。
@RabbitListener
下面咱们曾经简略的进行了应用,略微扩大一下它其实是能够监听多个队列的,就像这样:
@RabbitListener(queues = { "queue1", "queue2"})
public void onMessage(Message message, Channel channel) throws Exception {System.out.println("Message content :" + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("音讯已确认");
}
还有一些其余的个性如绑定之类的,这里不再赘述因为太硬编码了个别用不上。
上面来说说这节要次要讲的一个个性:@RabbitListener 和 @RabbitHandler 的搭配应用。
后面咱们没有提到,@RabbitListener
注解其实是能够注解在类上的,这个注解在类上标记着这个类监听某个队列或某些队列。
这两个注解的搭配应用就要让 @RabbitListener
注解在类上,而后用 @RabbitHandler
注解在办法上,依据办法参数的不同自动识别并去生产,写个例子给大家看一看更直观一些。
@Slf4j
@Component("rabbitConsumer")
@RabbitListener(queues = Producer.QUEUE_NAME)
public class RabbitConsumer {
@RabbitHandler
public void onMessage(@Payload String message){System.out.println("Message content :" + message);
}
@RabbitHandler
public void onMessage(@Payload User user) {System.out.println("Message content :" + user);
}
}
大家能够看看这个例子,咱们先用 @RabbitListener
监听 erduo
队列中的音讯,而后应用 @RabbitHandler
注解了两个办法。
- 第一个 办法的 body 类型是 String 类型,这就代表着这个办法只能解决文本类型的音讯。
- 第二个办法 的 body 类型是 User 类型,这就代表着这个办法只能解决序列化类型且为 User 类型的音讯。
这两个办法正好对应着咱们第二节中测试类会发送的两种音讯,所以咱们往 RabbitMQ 中发送两条测试音讯,用来测试这段代码,看看成果:
都在管制台上如常打印了,如果 @RabbitHandler
注解的办法中没有一个的类型能够和你音讯的类型对的上,比方音讯都是 byte 数组类型,这里没有对应的办法去接管,零碎就会在控制台一直的报错,如果你呈现这个状况就证实你类型写的不正确。
假如你的 erduo
队列中会呈现三种类型的音讯:byte,文本和序列化,那你就必须要有对应的解决这三种音讯的办法,不然音讯发过来的时候就会因为无奈正确转换而报错。
而且应用了 @RabbitHandler
注解之后就不能再和之前一样应用 Message
做接管类型。
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {System.out.println("Message content :" + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("音讯已确认");
}
这样写的话会报类型转换异样的,所以二者选其一。
同时上文我的 @RabbitHandler
没有进行音讯确认,大家能够本人试一下进行音讯确认。
6. ???? 音讯的序列化转换
通过上文咱们曾经晓得,能被主动转换的对象只有byte[]
、String
、java 序列化对象
(实现了 Serializable 接口的对象),然而并不是所有的 Java 对象都会去实现 Serializable 接口,而且序列化的过程中应用的是 JDK 自带的序列化办法,效率低下。
所以咱们更广泛的做法是:应用 Jackson 先将数据转换成 JSON 格局发送给RabbitMQ
,再接管音讯的时候再用 Jackson 将数据反序列化进去。
这样做能够完满解决下面的痛点:音讯对象既不用再去实现 Serializable 接口,也有比拟高的效率(Jackson 序列化效率业界应该是最好的了)。
默认的音讯转换计划是音讯转换顶层接口 -MessageConverter
的一个子类:SimpleMessageConverter
,咱们如果要换到另一个音讯转换器只须要替换掉这个转换器就行了。
上图是 MessageConverter
构造树的构造树,能够看到除了 SimpleMessageConverter
之外还有一个Jackson2JsonMessageConverter
,咱们只须要将它定义为 Bean,就能够间接应用这个转换器了。
@Bean
public MessageConverter jackson2JsonMessageConverter() {return new Jackson2JsonMessageConverter(jacksonObjectMapper);
}
这样就能够了,这里的 jacksonObjectMapper
能够不传入,然而默认的 ObjectMapper
计划对 JDK8 的工夫日期序列化会不太敌对,具体能够参考我的上一篇文章:从 LocalDateTime 序列化探讨全局一致性序列化,总的来说就是定义了本人的ObjectMapper
。
同时为了接下来测试不便,我又定义了一个专门测试 JSON 序列化的队列:
@Bean
public Queue erduoJson() {
// 其三个参数:durable exclusive autoDelete
// 个别只设置一下长久化即可
return new Queue("erduo_json",true);
}
如此之后就能够进行测试了,先是 生产者代码:
public void sendObject() {Client client = new Client();
System.out.println("Message content :" + client);
rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client);
System.out.println("音讯发送结束。");
}
我又从新定义了一个 Client
对象,它和之前测试应用的 User 对象成员变量都是一样的,不一样的是它没有实现 Serializable 接口。
同时为了保留之前的测试代码,我又新建了一个RabbitJsonConsumer
,用于测试 JSON 序列化的相干生产代码,外面定义了一个动态变量:JSON_QUEUE = "erduo_json"
;
所以这段代码是将 Client
对象作为音讯发送到 "erduo_json"
队列中去,随后咱们在测试类中 run 一下进行一次发送。
紧着是 消费者代码:
@Slf4j
@Component("rabbitJsonConsumer")
@RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE)
public class RabbitJsonConsumer {
public static final String JSON_QUEUE = "erduo_json";
@RabbitHandler
public void onMessage(Client client, @Headers Map<String,Object> headers, Channel channel) throws Exception {System.out.println("Message content :" + client);
System.out.println("Message headers :" + headers);
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
System.out.println("音讯已确认");
}
}
有了上文的教训之后,这段代码了解起来也是很简略了吧,同时给出了上一节没写的如何在 @RabbitHandler
模式下进行音讯签收。
咱们间接来看看成果:
在打印的 Headers 外面,往后翻能够看到 contentType=application/json
,这个contentType
是表明了音讯的类型,这里正是阐明咱们新的音讯转换器失效了,将所有音讯都转换成了 JSON 类型。
后记
这两篇讲完了 RabbitMQ
的根本收发音讯,包含手动配置和主动配置的两种形式,这些大家认真研读之后应该会对 RabbitMQ
收发音讯没什么疑难了~
不过咱们始终以来发消息时都是应用默认的交换机,下篇将会讲述一下 RabbitMQ
的几种交换机类型,以及其应用形式。
讲完了交换机之后,这些 RabbitMQ
的罕用概念根本就欠缺了。
最近这段时间压力挺大,优狐令我八月底之前降级到三级,所以各位读者的赞对我很重要,心愿大家可能高抬贵手,帮我一哈~
好了,以上就是本期的全部内容,感激你能看到这里,欢送对本文点赞珍藏与评论,???? 你们的每个点赞都是我创作的最大能源。
我是耳朵,一个始终想做常识输入的伪文艺程序员,咱们下期见。
本文代码:码云地址 GitHub 地址