一、RabbitMq基础知识
0、概述
音讯队列的作用就是接管音讯生产者的音讯,而后将音讯发送到消费者
1、信道channel
我的了解是生产者/消费者和rabbitmq交互的一个通道,负责交换机、队列治理;音讯公布和生产治理;事务管理等
2、交换机
四种交换机:
direct:能够用一个或者多个key绑定到一个或者多个队列上
topic:反对路由的适配符 # *
Fanout播送:将音讯发送给所有的队列
Header头交换机:自定义通过头音讯属性来定义路由的匹配
3、队列:保留音讯的队列
4、消费者:音讯的接收者
5、生产者:音讯的发送者
二、 应用com.rabbitmq.client.*
操作mq
2.1、基本操作
0、环境和依赖
<!-- 环境 * jdk 1.8 * idea* springboot 2.2.6--><!-- 依赖 这里只导入这个包,其中蕴含了Rabbit client的包--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
1、创立连贯和信道
//获取连贯ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//mq主机地址factory.setPort(5672);//端口,默认时5672factory.setUsername("leyou");factory.setPassword("leyou");factory.setVirtualHost("/leyou");Connection connection = factory.newConnection();//获取信道Channel channel = connection..createChannel();
2、申明交换机 / 队列 / 绑定交换机和队列
//交换机名,交换机类型channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);/*** 第一个参数是queue:要创立的队列名* 第二个参数是durable:是否长久化。如果为true,能够在RabbitMQ解体后复原音讯* 第三个参数是exclusive:true示意一个队列只能被一个消费者占有并生产* 第四个参数是autoDelete:true示意服务器不在应用这个队列是会主动删除它* 第五个参数是arguments:包含死信队列,队列的ttl*/channel.queueDeclare(QUEUE_ONE,true,false,false,null);//绑定交换机和队列 队列名,交换机名,routekeychannel.queueBind(QUEUE_ONE,EXCHANGE,GIRL);
3、公布音讯
//1、交换机名 2、routekey 3、mandatory强制(须要return回调时必须设置为true) 4、公布音讯参数 5、音讯channel.basicPublish(EXCHANGE,GIRL,true,null,"xxx提价了".getBytes());
4、接管音讯
//接管音讯前也须要获取连贯和channel,申明队列//接管音讯Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //拿到音讯 System.out.println(new String(body,"utf-8")); }};/*** 参数阐明* 1:队列名字* 2:是否自动应答 autoACk,为false时须要手动ack* 3:消费者,当接管到消费者时会调用给对象中的 handleDelivery 办法*/channel.basicConsume(QUEUE_ONE,true,consumer);
2.2、根本利用
1、性能:
有两个人小明和小华,小明对美女感兴趣,小华对股票和没事感兴趣,应用音讯队列将他们感兴趣的音讯发送给他们两个
2、实现:
(1)写一个类来提供创立连贯和信道;
(2)生产者(发送音讯方)类发送音讯
(3)消费者(接管音讯)类接管音讯
- 连贯类
public class ConnectionUtil { /** * 应用原始的rabbitmq client api 操作mq */ private static ConnectionFactory factory = new ConnectionFactory(); private static Connection connection; /* 获取连贯 留神导包:须要导client上面的包 */ public static Connection getConnection() throws IOException, TimeoutException {// factory.setHost("localhost");// factory.setPort(5672); factory.setUsername("leyou"); factory.setPassword("leyou"); factory.setVirtualHost("/leyou"); connection = factory.newConnection(); return connection; } public static void close() throws IOException { connection.close(); } /* 创立信道 */ public static Channel getChannel() throws IOException, TimeoutException { return getConnection().createChannel(); }}
- 生产者
//生产者public class provice{ public void producerMsg() throws IOException, TimeoutException, InterruptedException { Channel channel = ConnectionUtil.getChannel(); String EXCHANGE = "direct_exchange"; channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); //定义两个队列名 String QUEUE_ONE = "beauty_queue"; String QUEUE_TWO = "food_queue"; channel.queueDeclare(QUEUE_ONE,true,false,false,null); channel.queueDeclare(QUEUE_TWO,true,false,false,null); //定义三个key String GIRL = "girl"; String SHARE = "share"; String FOOD = "food"; //绑定 channel.queueBind(QUEUE_ONE,EXCHANGE,GIRL); channel.queueBind(QUEUE_TWO,EXCHANGE,SHARE); channel.queueBind(QUEUE_TWO,EXCHANGE,FOOD); //发送音讯 /** * 参数:1交换机,2routekey 3 mandatory:强制;(须要return回调时必须设置为true) * 3参数,4音讯字节数据 */ channel.basicPublish(EXCHANGE,GIRL,true,null,"快看,是她".getBytes()); channel.basicPublish(EXCHANGE,SHARE,true,null,"股票涨了".getBytes()); channel.basicPublish(EXCHANGE,FOOD,true,null,"肯德基提价了".getBytes()); //敞开连贯 channel.close(); ConnectionUtil.close(); }}
- 消费者
public class ConsumerMq { // 生产音讯 /** * 应用原始的rabbitmq client api 操作mq */ String EXCHANGE = "direct_exchange"; String QUEUE_ONE = "beauty_queue"; String QUEUE_TWO = "food_queue"; //key String GIRL = "girl"; String SHARE = "share"; String FOOD = "food"; public void consumer() throws IOException, TimeoutException { Channel channel = ConnectionUtil.getChannel(); /** * 第一个参数是queue:要创立的队列名 * 第二个参数是durable:是否长久化。如果为true,能够在RabbitMQ解体后复原音讯 * 第三个参数是exclusive:true示意一个队列只能被一个消费者占有并生产 * 第四个参数是autoDelete:true示意服务器不在应用这个队列是会主动删除它 * 第五个参数是arguments:包含死信队列,队列的ttl, */ channel.queueDeclare(QUEUE_ONE,true,false,false,null); channel.queueDeclare(QUEUE_TWO,true,false,false,null); //在生产者绑定了交换机和队列,在这里就不须要绑定 //channel.queueBind(QUEUE_ONE,EXCHANGE,GIRL); //channel.queueBind(QUEUE_TWO,EXCHANGE,SHARE); //channel.queueBind(QUEUE_TWO,EXCHANGE,FOOD); //接管音讯 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body,"utf-8")); //手动应答ack能够在该办法中进行;参数:1.音讯tag,2.是否批量ack channel.basicAck(envelope.getDeliveryTag(),false); } }; /** * 参数阐明 * 1:队列名字 * 2:是否自动应答 autoACk 为false时须要手动ack * 3:消费者,当接管到消费者时会调用给对象中的 handleDelivery 办法 */ channel.basicConsume(QUEUE_ONE,false,consumer); channel.basicConsume(QUEUE_TWO,false,consumer); }}
2.3、mq事务,发送方确认,和音讯回调
概述
音讯的发送链路 生产者 -> exchange --> queue --> 消费者;为确保音讯发送到rabbitmq,amqp协定提供了三个机制来保障:事务,发送方确认(ack),音讯回调(returncallback);事务的形式和数据库的事务相似,这里不做具体介绍;发送方确认是当音讯发送到交换机时, broker(实现amqp协定的服务端,这里指rabbitmq)会回调发送者的一个固定办法来确认音讯胜利发送;音讯回调是产生在交换机通过路由key转发到队列的过程中,如果音讯不能通过key找到对应的queue则回调一个固定办法将音讯返回给生产者,确保音讯不失落
1、mq事务
- rabbitMq是反对事务的,然而应用事务的效率很低,在音讯数量很大的状况下影响性能
2、发送方确认
对于固定音讯体大小和线程数,如果音讯长久化,生产者confirm(或者采纳事务机制),消费者ack那么对性能有很大的影响.
音讯长久化的优化没有太好办法,用更好的物理存储(SAS, SSD, RAID卡)总会带来改善。生产者confirm这一环节的优化则次要在于客户端程序的优化之上。归纳起来,客户端实现生产者confirm有三种编程形式:
- 一般confirm模式:每发送一条音讯后,调用waitForConfirms()办法,期待服务器端confirm。实际上是一种串行confirm了。
- 批量confirm模式:每发送一批音讯后,调用waitForConfirms()办法,期待服务器端confirm。
- 异步confirm模式:提供一个回调办法,服务端confirm了一条或者多条音讯后Client端会回调这个办法。
- [ ] 一般confirm模式
//要点//第1种//一般confirm模式最简略,publish一条音讯后,期待服务器端confirm,如果服务端返回false或者超时工夫内未返回,客户端进行音讯重传。//1.发消息前channel.confirmSelect();//2.发消息后//判断音讯发送是否胜利if(channel.waitForConfirms()){ System.out.println("音讯发送胜利");}
- [ ] 批量confirm模式
批量confirm模式略微简单一点,客户端程序须要定期(每隔多少秒)或者定量(达到多少条)或者两则联合起来publish音讯,而后期待服务器端confirm, 相比一般confirm模式,批量极大晋升confirm效率,然而问题在于一旦呈现confirm返回false或者超时的状况时,客户端须要将这一批次的音讯全部重发,这会带来显著的反复音讯数量,并且,当音讯常常失落时,批量confirm性能应该是不升反降的。
channel.confirmSelect();for(int i=0;i<batchCount;i++){ channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());}if(!channel.waitForConfirms()){ System.out.println("send message failed.");}
异步confirm模式
异步confirm模式的编程实现最简单,Channel对象提供的ConfirmListener()回调办法只蕴含deliveryTag(以后Chanel收回的音讯序号),咱们须要本人为每一个Channel保护一个unconfirm的音讯序号汇合,每publish一条数据,汇合中元素加1,每回调一次handleAck办法,unconfirm汇合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm汇合最好采纳有序汇合SortedSet存储构造。实际上,SDK中的waitForConfirms()办法也是通过SortedSet保护音讯序号的。
要害代码:
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());//别忘这行代码 channel.confirmSelect();//增加监听器channel.addConfirmListener(new ConfirmListener() { public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } }});while (true) { long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); confirmSet.add(nextSeqNo);}
3、音讯回调
//要点//1.发送音讯是将第三个参数mandatory设置为truechannel.basicPublish(EXCHANGE,FOOD,true,null,"肯德基提价了".getBytes());//2.增加音讯回调监听器channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { System.out.println("音讯不可路由"+new String(bytes,"utf-8")); } });//留神:开启回调不能敞开连贯和信道,
2.4、接管方确认
1、概述
接管方ack分为手动和主动,在接管音讯时设置
//第二个参数就是指定是否手动ack false时为手动channel.basicConsume(QUEUE_ONE,false,consumer);
手动ack有三种
- 单个确认
- 单个回绝
- 批量回绝
2、代码实现
单个确认ack
//接管音讯Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body,"utf-8")); //手动应答ack能够在该办法中进行;参数:1.音讯tag,2.是否批量ack channel.basicAck(envelope.getDeliveryTag(),false); //回绝音讯;参数:1.音讯tag;2.音讯是否从新入队,当只有一个消费者时,会引起反复生产 channel.basicReject(envelope.getDeliveryTag(),false); //批量ack音讯;参数:1.音讯tag;2.是否批量ack音讯,3.是否重回队列 channel.basicNack(envelope.getDeliveryTag(),true,false); }};//这里只须要条应答的语句,我这里常识都列出来channel.basicConsume(QUEUE_ONE,false,consumer);//留神下面第二个参数要为false能力手动ack
2.5、音讯TTL和队列TTL、死信队列、提早队列
这一块临时不应用原始RabbitMq Client API实现,前面再钻研,然而会应用上面的org.springframework.amqp
来实现
三、应用org.springframework.amqp
操作mq
3.1、前言:
Spring
对RabbitMp
进行了形象,将交换机,队列,音讯,绑定,连贯等形象出实体类,不便操作,还提供了RabbitAdmit
和RabbitTemplate
来不便交换机队列的治理以及音讯的发送接管等
3.2、根本实例
0、环境和依赖
<!-- 环境 * jdk 1.8 * idea* springboot 2.2.6 --><!-- 依赖 --><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
1、实例
发送音讯通知消费者超时打折了快来购物
- 配置类
@Configurationpublic class RabbitConfig { private final static Logger log = LoggerFactory.getLogger(RabbitConfig.class); private final static String EXCHANGE_NAME = "verification_code_exchange"; private final static String VERIFICATION_CODE_QUEUE = "verification_code_queue"; private final static String VERIFICATION_CODE_ROUTE_KEY = "verification_code_key"; //死信交换机和队列和key private final static String DLX_EXCHANGE_NAME = "dlx-exchange"; private final static String DLX_KEY = "verification_code_key"; @Bean public CachingConnectionFactory connectionFactory(){ CachingConnectionFactory conn = new CachingConnectionFactory(); conn.setUsername("leyou"); conn.setPassword("leyou"); conn.setVirtualHost("/leyou"); //音讯发送到mq发送确认音讯给生产者 conn.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); //音讯发送到mq,通过绑定的key找不到queue,则发送音讯给生产者 conn.setPublisherReturns(true); return conn; } @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //设置音讯序列化 rabbitTemplate.setMessageConverter(converter()); //音讯的确认回调// rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {// @Override// public void confirm(CorrelationData correlationData, boolean b, String s) {//// }// }); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { //ack为确认音讯是否胜利发送到mq if(ack){ //胜利发送 log.info("音讯发送胜利"); } }); //改标记位设置位true时,当交换机依据本身类型和routeKey无奈找到对应的队列时, // 则mq会将音讯返还给生产者 //当为false时则mq会将音讯间接删除 rabbitTemplate.setMandatory(true); //音讯,返回码,返回内容,交换机,路由key rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)->{ //音讯 log.info("message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",message,replyCode,replyText,exchange,routingKey); }); return rabbitTemplate; } /** * 注入rabbitadmin 用来申明交换机和队列,次要作用是代替原始的应用channl申明的做法,全副交给这个对象来实现 * @param connectionFactory * @return */ @Bean public RabbitAdmin rabbitAdmit(CachingConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; } /** * 音讯序化对象 * 默认应用的是JDK的序列化,这里配置了后就能够将音讯序列化为json格局 */ @Bean public MessageConverter converter() { return new Jackson2JsonMessageConverter(); } /** * 申明一个交换机 */ @Bean public DirectExchange verificationCodeExchange(RabbitAdmin rabbitAdmin){ DirectExchange exchange = new DirectExchange(EXCHANGE_NAME); rabbitAdmin.declareExchange(exchange); return exchange; } /** * 申明一个队列 * @param rabbitAdmin * @return */ @Bean public Queue getQueue(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(VERIFICATION_CODE_QUEUE, true,false,false,null); rabbitAdmin.declareQueue(queue); return queue; } /** * 申明一个绑定 * @param rabbitAdmin * @param verificationCodeExchange * @return */ @Bean public Binding bindingQueue(RabbitAdmin rabbitAdmin,DirectExchange verificationCodeExchange){ Binding with = BindingBuilder.bind(getQueue(rabbitAdmin)).to(verificationCodeExchange).with(VERIFICATION_CODE_ROUTE_KEY); rabbitAdmin.declareBinding(with); return with; }}
阐明:下面用到了生产者confirm和音讯回调机制
1、生产者confirm要害代码://1、创立连贯时conn.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);//2、创立rabbitTemplate时rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { //ack为确认音讯是否胜利发送到mq if(ack){ //胜利发送 log.info("音讯发送胜利"); }});
2、音讯回调机制要害代码:
//1、创立连贯时conn.setPublisherReturns(true);//2、创立rabbitTemplate时//改标记位设置位true时,当交换机依据本身类型和routeKey无奈找到对应的队列时,// 则mq会将音讯返还给生产者//当为false时则mq会将音讯间接删除rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)->{ //音讯log.info("message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",message,replyCode,replyText,exchange,routingKey);});
生产者:
@Componentpublic class RabbitSender { //注入rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(){ //构建音讯 Message message = MessageBuilder.withBody( JSONObject.toJSONString(MessageModel.builder().id(msgId).context("超市打折,快来抢购!").build()).getBytes()).build(); //音讯长久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); //音讯的媒体类型 message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON); //音讯的自定义关联id CorrelationData correlationData = new CorrelationData(String.valueOf(msgId)); rabbitTemplate.convertAndSend(exchange,routingKey,message,new MessagePostProcessor(){ //音讯后置处理器,能够在上面这个办法中对音讯进行相干属性的设置 @Override public Message postProcessMessage(Message message) throws AmqpException { //比方能够设置下面 这些属性等 //message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//音讯长久化问题 //message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);//音讯的媒体类型 return message; } },correlationData); }}
消费者
@Componentpublic class RabbitReceive { @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE_NAME, type = ExchangeTypes.DIRECT), key = VERIFICATION_CODE_ROUTE_KEY, value = @Queue(value = VERIFICATION_CODE_QUEUE, autoDelete = "false"), ignoreDeclarationExceptions = "true"), concurrency = "1", // 指定监听该队列的消费者个数 ackMode = "MANUAL"// 手动ack ) public void receiveCode(Channel channel, Message msg, @Headers Map<String, Object> headers) throws IOException, InterruptedException { String msgId = (String) headers.get("spring_listener_return_correlation"); long tag = msg.getMessageProperties().getDeliveryTag(); channel.basicAck(tag, false); }}
其中:发送方确认(生产者confirm)、音讯回调下面代码都蕴含了;消费者ack则和原始办法是一样的
上面介绍音讯TTL,队列TTL,死信队列,提早队列
- 音讯和队列的TTL
//音讯ttl//在构建音讯时设置音讯的过期工夫Message message = MessageBuilder.withBody( JSONObject.toJSONString(MessageModel.builder().id(msgId).context("超市打折,快来抢购!").build()).getBytes()).build();//音讯的过期工夫message.getMessageProperties().setExpiration("5000");//队列的ttl//在创立队列时通过参数设置Map<String, Object> args = new HashMap<>();//指定死信交换机args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);//指定死信队列的keyargs.put("x-dead-letter-routing-key", DLX_KEY);//设置队列中音讯的过期工夫 msargs.put("x-message-ttl",10000);//整个队列的过期工夫,过期后整个队列会被删除//args.put("x-expires",10000);Queue queue = new Queue(VERIFICATION_CODE_QUEUE, true,false,false,args);
下面还包含死信队列的属性设置,和死信队列key,对于死信队列的配置,还须要配置一个死信交换机和一个死信队列;当有音讯或队列的ttl过期,音讯超过队列最大长度,音讯被回绝且设置不从新回队列,则音讯会被转发到死信交换机,再转发到死信队列。
- 对于提早队列的实现办法有两种
- 应用死信队列,用一个设置了ttl的队列来寄存音讯,该队列不须要消费者监听,而后给该队列配置死信交换机和队列,消费者监听死信队列,这样就能达到工夫达到提早收到音讯的目标
- 应用rabbitmq插件的形式实现,这里先不写,放到下一篇笔记中
最初
感激你看到这里,看完有什么的不懂的能够在评论区问我,感觉文章对你有帮忙的话记得给我点个赞,每天都会分享java相干技术文章或行业资讯,欢送大家关注和转发文章!