关于activemq:activemq入门安装

1、资源筹备1.1 环境Centos7、jdk8、activemq5.15.81.2 activemq5.15.8 安装包官网下载地址:http://activemq.apache.org/ac... 国外下载比较慢,能够通过这个网址下载:https://download.csdn.net/dow... 2、装置步骤2.1 上传下载好的activemq安装包到/var目录并解压tar -zxvf apache-activemq-5.15.8-bin.tar.gz -C /var 2.2 批改目录名称mv /var/apache-activemq-5.15.8/ /var/activemq/2.3 做成零碎服务2.3.1 创立一个systemd服务文件vi /usr/lib/systemd/system/activemq.service2.3.2 在该文件放入以下内容[Unit]Description=ActiveMQ serviceAfter=network.target[Service]Type=forkingExecStart=/var/activemq/bin/activemq startExecStop=/var/activemq/bin/activemq stopUser=rootGroup=rootRestart=alwaysRestartSec=9StandardOutput=syslogStandardError=syslogSyslogIdentifier=activemq[Install]WantedBy=multi-user.target2.3.3 找到java命令所在的目录whereis java2.3.4 设置activemq配置文件/var/activemq/bin/env中的JAVA_HOME# Location of the java installation# Specify the location of your java installation using JAVA_HOME, or specify the# path to the "java" binary using JAVACMD# (set JAVACMD to "auto" for automatic detection)JAVA_HOME="/usr/local/java/jdk1.8.0_181"JAVACMD="auto"2.3.5 通过systemctl治理activemq启停#启动activemq服务: systemctl start activemq#查看服务状态: systemctl status activemq#创立软件链接:ln -s /usr/lib/systemd/system/activemq.service /etc/systemd/system/multi-user.target.wants/activemq.service#开机自启: systemctl enable activemq#检测是否开启胜利(enable): systemctl list-unit-files |grep activemq2.3.6 防火墙配置,Web治理端口默认为8161,通信端口默认为61616增加并重启防火墙 ...

February 23, 2022 · 1 min · jiezi

多维度对比5款主流分布式MQ消息队列妈妈再也不担心我的技术选型了

1、引言对于即时通讯系统(包括IM、消息推送系统等)来说,MQ消息中件间是非常常见的基础软件,但市面上种类众多、各有所长的MQ消息中件间产品,该怎么去选择?这是个问题! 对于很多经验不足的开发者来说,一个公司内部用的IM聊天系统,总用户量也不过百十来人,动辄就是Kafka、MongoDB,美其名曰为了高性能和可扩展性,真是大炮打蚊子。而对于中大型的即时通讯场景来说,有的开发者确为了贪图使用简单、资料全面,反而使用臃肿不堪的ActiveMQ,这就有点失去章法了。 唧唧歪歪这么多,那什么样的场景到底该用哪种MQ消息中件间产品合适?读完本文您或许就有了答案。 本文将从17个维度综合对比Kafka、RabbitMQ、ZeroMQ、RocketMQ、ActiveMQ这5款当前最主流的MQ消息中间件产品,希望能为您的下一次产品的架构设计和MQ消息中间件选型提供参考依据。 学习交流: 即时通讯/推送技术开发交流4群:101279154[推荐]移动端IM开发入门文章:《新手入门一篇就够:从零开发移动端IM》(本文同步发布于:http://www.52im.net/thread-26...) 2、相关资料官网地址: Kafka:http://kafka.apache.org/ RabbitMQ:https://www.rabbitmq.com/ ZeroMQ:http://zeromq.org/ RocketMQ:http://rocketmq.apache.org/ ActiveMQ:http://activemq.apache.org/ 相关文章: 《IM开发基础知识补课(五):通俗易懂,正确理解并用好MQ消息队列》 《IM系统的MQ消息中间件选型:Kafka还是RabbitMQ?》 3、维度1:资料文档1)Kafka:资料数量中等。有Kafka作者自己写的书,网上资料也有一些。 2)RabbitMQ:资料数量多。有一些不错的书,网上资料多。 3)ZeroMQ:资料数量少。专门写ZeroMQ的书较少,网上的资料多是一些代码的实现和简单介绍。 4)RocketMQ:资料数量少。专门写RocketMQ的书目前有了两本;网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述。 5)ActiveMQ:资料数量多。没有专门写ActiveMQ的书,网上资料多。 4、维度2:开发语言1)Kafka:Scala 2)RabbitMQ:Erlang 3)ZeroMQ:C语言 4)RocketMQ:Java 5)ActiveMQ:Java 5、维度3:支持的协议1)Kafka:自己定义的一套…(基于TCP) 2)RabbitMQ:AMQP 3)ZeroMQ:TCP、UDP 4)RocketMQ:自己定义的一套… 5)ActiveMQ:OpenWire、STOMP、REST、XMPP、AMQP 6、维度4:消息存储1)Kafka: 内存、磁盘、数据库。支持大量堆积。 Kafka的最小存储单元是分区,一个topic包含多个分区,Kafka创建主题时,这些分区会被分配在多个服务器上,通常一个broker一台服务器。 分区首领会均匀地分布在不同的服务器上,分区副本也会均匀的分布在不同的服务器上,确保负载均衡和高可用性,当新的broker加入集群的时候,部分副本会被移动到新的broker上。 根据配置文件中的目录清单,Kafka会把新的分区分配给目录清单里分区数最少的目录。 默认情况下,分区器使用轮询算法把消息均衡地分布在同一个主题的不同分区中,对于发送时指定了key的情况,会根据key的hashcode取模后的值存到对应的分区中。 2)RabbitMQ: 内存、磁盘。支持少量堆积。 RabbitMQ的消息分为持久化的消息和非持久化消息,不管是持久化的消息还是非持久化的消息都可以写入到磁盘。 持久化的消息在到达队列时就写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。 非持久化的消息一般只存在于内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存。 引入镜像队列机制,可将重要队列“复制”到集群中的其他broker上,保证这些队列的消息不会丢失。 配置镜像的队列,都包含一个主节点master和多个从节点slave,如果master失效,加入时间最长的slave会被提升为新的master,除发送消息外的所有动作都向master发送,然后由master将命令执行结果广播给各个slave,RabbitMQ会让master均匀地分布在不同的服务器上,而同一个队列的slave也会均匀地分布在不同的服务器上,保证负载均衡和高可用性。 3)ZeroMQ: 消息发送端的内存或者磁盘中。不支持持久化。 4)RocketMQ: 磁盘。支持大量堆积。 commitLog文件存放实际的消息数据,每个commitLog上限是1G,满了之后会自动新建一个commitLog文件保存数据。 ConsumeQueue队列只存放offset、size、tagcode,非常小,分布在多个broker上。 ConsumeQueue相当于CommitLog的索引文件,消费者消费时会从consumeQueue中查找消息在commitLog中的offset,再去commitLog中查找元数据。 ConsumeQueue存储格式的特性,保证了写过程的顺序写盘(写CommitLog文件),大量数据IO都在顺序写同一个commitLog,满1G了再写新的。 加上RocketMQ是累计4K才强制从PageCache中刷到磁盘(缓存),所以高并发写性能突出。 5)ActiveMQ: 内存、磁盘、数据库。支持少量堆积。 7、维度5:消息事务1)Kafka:支持 2)RabbitMQ:支持。客户端将信道设置为事务模式,只有当消息被RabbitMQ接收,事务才能提交成功,否则在捕获异常后进行回滚。使用事务会使得性能有所下降 3)ZeroMQ:不支持 4)RocketMQ:支持 5)ActiveMQ:支持 8、维度6:负载均衡8.1 Kafka支持负载均衡。 1)一个broker通常就是一台服务器节点。 对于同一个Topic的不同分区,Kafka会尽力将这些分区分布到不同的Broker服务器上,zookeeper保存了broker、主题和分区的元数据信息。 分区首领会处理来自客户端的生产请求,Kafka分区首领会被分配到不同的broker服务器上,让不同的broker服务器共同分担任务。 每一个broker都缓存了元数据信息,客户端可以从任意一个broker获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。 2)Kafka的消费者组订阅同一个topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。 ...

June 21, 2019 · 3 min · jiezi

Spring-boot-集成-ActiveMQ

安装ActiveMQ到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下,下载链接如下:http://activemq.apache.org/do...。 进入bin 目录,如果我们是32位的机器,就双击 win32 目录下的 activemq.bat,如果是64位机器,则双击 win64 目录下的 activemq.bat ,运行结果如下:成功之后在浏览器输入 http://127.0.0.1:8161/ 地址,可以看到 ActiveMQ 的管理页面,用户名和密码默认都是 admin Spring Boot 整合 ActiveMQ工程结构 添加 pom 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId></dependency>config 配置 # activemqspring.activemq.broker-url=tcp://localhost:61616spring.activemq.user=adminspring.activemq.password=admin#默认为true表示使用内存的activeMQ,不需要安装activeMQ serverspring.activemq.in-memory=true #如果此处设置为true,需要加如下的依赖包# <groupId>org.apache.activemq</groupId># <artifactId>activemq-pool</artifactId># 否则会自动配置失败,报JmsMessagingTemplate注入失败spring.activemq.pool.enabled=false队列模式创建 消息提供者:Producer.java import javax.jms.Destination; import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsMessagingTemplate;import org.springframework.stereotype.Service; @Servicepublic class Producer { @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装 private JmsMessagingTemplate jmsTemplate; // 发送消息,destination是发送到的队列,message是待发送的消息 public void sendMessage(Destination destination, final String message){ jmsTemplate.convertAndSend(destination, message); }}创建消费者一: Consumer.java import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component; @Componentpublic class Consumer { // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息 @JmsListener(destination = "mytest.queue") public void receiveQueue(String text) { System.out.println("Consumer收到的报文为:"+text); }}创建消费者二:Consumer1 .java ...

June 13, 2019 · 2 min · jiezi

五分钟快速了解ActiveMQ案例简单且详细

最近得闲,探索了一下ActiveMQ。 ActiveMQ消息队列,信息收发的容器,作用有异步消息,流量削锋,应用耦合。同行还有 Kafka、RabbitMQ、RocketMQ、ZeroMQ、MetaMQ 。安装下载地址:http://activemq.apache.org/co... window版本的解压后双击/bin/activemq.bat 即可启动 它有自己的可视化页面:http://localhost:8161/admin/ 默认访问密码是:admin/admin如果需要修改在:/conf/jetty-realm.properties 中修改 JmsTemplate在springboot上整合的,使用spring 的JmsTemplate来操作ActiveMQ一、首先在pom文件中导入所需的jar包坐标: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>二、新增一个ActiveMQ的配置文件spring-jms.xml <?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- 配置JMS连接工厂 --> <bean id="innerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${spring.activemq.broker-url}" /> </bean> <!--配置连接池--> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="innerConnectionFactory" /> <property name="maxConnections" value="100"></property> </bean> <!-- 配置JMS模板,Spring提供的JMS工具类 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="pooledConnectionFactory" /> <property name="defaultDestination" ref="JmsSenderDestination" /> <property name="receiveTimeout" value="10000" /> </bean></beans>三、在启动类上配置以生效 @ImportResource(locations={"classpath:/config/spring-jms.xml"})四、在application.properties中配置ActiveMQ 的连接地址 spring.activemq.broker-url=tcp://localhost:61616准备就绪;开始写生产者和消费者,我这里把生产者和消费者写在一个项目里面。在这之前需要明白两个概念队列(Queue)和主题(Topic)传递模型队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型: 点对点(point-to-point,简称PTP)Queue消息传递模型:一个消息生产者对应一个消费者发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型:一个消息生产者对应多个个消费者QUEUE先在spring-jms.xml里添加配置一个队列名称Queue_love <bean id="JmsSenderDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg> <value>Queue_love</value></constructor-arg></bean> 创建一个生产者来发送消息;@Qualifier("JmsSenderDestination")指定了发送到上面配置的Queue_love队列 @Componentpublic class JmsSender {@Autowiredprivate JmsTemplate jmsTemplate;@Qualifier("JmsSenderDestination")@Autowiredprotected Destination destination;public void sendMessage(final String msg) { logger.info("QUEUE destination :" + destination.toString() + ", 发送消息:" + msg); jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(final Session session) throws JMSException { return session.createTextMessage(msg); } });}}创建一个消费者来消费消息: ...

April 29, 2019 · 2 min · jiezi

ActiveMQrabbitmq

JMS模型Java消息服务应用程序结构支持两种模型: 点对点或队列模型发布/订阅模型在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:只有一个消费者将获得消息生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。每一个成功处理的消息都由接收者签收 发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费者可以获得消息在发布者和订阅者之间存在时间依赖性。发布者需要创建一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者创建了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。 ActiveMQ基本组件:Broker,消息代理,表示消息队列服务器实体,接受客户端连接,提供消息通信的核心服务。Producer,消息生产者,业务的发起方,负责生产消息并传输给 Broker 。Consumer,消息消费者,业务的处理方,负责从 Broker 获取消息并进行业务逻辑处理。Topic,主题,发布订阅模式下的消息统一汇集地,不同生产者向 Topic 发送消息,由 Broker 分发到不同的订阅者,实现消息的广播。Queue,队列,点对点模式下特定生产者向特定队列发送消息,消费者订阅特定队列接收消息并进行业务逻辑处理。Message,消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务 数据,实现消息的传输。connector(broker与client,broker与broker)的协议:vm,tcp,xxxx持久消息的存储:KahaDB(基于文件),journaal+JDBC,你内存呢,levelDb(leveldb+zk作为M-S复制方案)集群:M-S:数据单独HA方案共享+锁获取master/slave切换。leveldb+zk/KahaDB+SAN类似分片:networkConnector。对于订阅,一直在brocker中转发,直到消费。对于queue顺序保证比较困难。https://shift-alt-ctrl.iteye.... AMQPhttp://docs.oasis-open.org/am... TYPE - type system and encodingTransport - AMQP transport layer(全双工可靠递交对等,connection=>多channel的session=>links)Messaging - AMQP Messaging Layer (对消息确认,拒绝,持久化等规定)Transactions - AMQP Transactions Layer(事务提交等)Security - AMQP Security Layers rabbitmq 组件Message消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。Publisher消息的生产者,也是一个向交换器发布消息的客户端应用程序。Exchange交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange 类型Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers(基本不用)Binding绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Queue消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。Connection网络连接,比如一个TCP连接。Channel信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。Consumer消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。Virtual Host虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。Broker表示消息队列服务器实体。交换机:1.直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应绑定键的队列。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:1)将一个队列(一个消费者一个队列)绑定到某个交换机上时,赋予该绑定一个绑定键(Binding Key),假设为R;2)当一个携带着路由键(Routing Key)为R的消息被发送给直连交换机时,交换机会把它路由给绑定键为R的队列直连交换机的队列通常是循环分发任务给多个消费者(我们称之为轮询)2.生产者(P)生产消息 1 将消息 1 推送到 Exchange,由于 Exchange Type=fanout 这时候会遵循 fanout 的规则将消息推送到所有与它绑定 Queue3.topic集群与持久化rabbitmq不支持动态扩展,erlang性能高可以共享 user、vhost、exchange等,所有的数据和状态都是必须在所有节点上复制的,队列例外,只在单个节点而不是所有节点上创建完整的队列信息(元数据、状态、内容,RabbitMQ 2.6.0之后提供了镜像队列以避免集群节点故障导致的队列内容不可用)(猜测:消息的持久应该包含交换机上的和队列中的,发给所有队列后消息应该可删除了,消费后队列中的可删除了)磁盘节点+内存节点,要求集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入火离开集群时,它们必须要将该变更通知到至少一个磁盘节点,如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。 ...

April 29, 2019 · 1 min · jiezi

高级开发人员必备技术:MQ

也许在你们公司从没有使用过MQ,也不知道这东西是用来干什么的,但是一旦你进入大公司你就会发现,这东西处处可见。今天就来说说MQ方面的东西,我公众号有activemq的 demo,大家可以自己去看看。什么是MQMessage Queue简称MQ,中文消息队列。“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。你可以把它理解为一个中间件,帮你完成多个系统或应用间的消息传递。为什么要使用MQ首先它有3个核心,解耦,异步,削峰,因此我们可以想到以下使用场景:你的系统要和多个系统发生关系,别的系统要从你这获取一些数据,今天A系统和你要这样的数据,明天B系统说你的数据有问题,后天C系统让你加个别的数据。你一个人要维护解决很多问题,忙得不可开交。而有了MQ,你就可以通过Pub/Sub 发布订阅消息这么一个模型,系统就跟其它系统彻底解耦了。只要把消息放到队列里,其它系统就自己去处理自己需要的数据,自己不再考虑该给谁发送数据。比如:下完订单,不再去通知库存做同步处理。把该物品的信息放在队列中,库存自己去选择什么时候去处理计算自己的库存。还是上面的例子,之前的流程,user浏览器端发起购物请求3ms,订单系统处理数据库300ms,之后库存系统处理数据库300ms,这样同步情况下加起来就要603ms。即使前端有加载提示框,等待时间超过300ms,人眼是能感受到这种延迟的,体验很不好,速度越快才能留住user。现在使用MQ采用异步消息处理,假如消息放进队列需要3ms,那么最终的响应时间是3+3=6ms,对于创建订单和库存计算user并不关心,这样极大的提高了响应时间。一个大的网站或是应用,总会迎来访问量的高峰,可能是营销活动突然带来的大流量,或是节假日。比如双十一,购物人数突然猛增,并发数提高,数据库的压力突然增大,超出了每秒钟的处理能力,就会导致网站瘫痪。使用mq后,数据库可以不必立马处理这么多的请求,可以自己选择能承受的消息慢慢去处理。所有的消息积压在队列中,而不是同时积压到数据库。加入队列中积压了1亿条数据,而我的数据库只能每秒处理100万条数据,那我就每秒从队列中取出100万条来处理,始终不会超出阈值,这样数据库就不会被挤垮。把峰值慢慢消耗。现在想想你为什么没有使用到mq吧?或是考略使用mq使用后带来的威胁任何事物都有它的两面性,既然有优点那也有缺点:系统可用性降低万一mq挂了,队列里面的数据没有了,其它系统数据还没处理完,这可咋整?系统的复杂度提高了你用个mq是爽了,其它系统也要对应的修改自己的系统,来消费队列中的消息。硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。一致性问题你订单系统操作成功了,但是库存系统却失败了,这样导致了数据的不一致。所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。主流的MQ产品KafkaActiveMQRabbitMQ特性ActiveMQRabbitMQRocketMQKafka单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支撑高吞吐10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同 RocketMQ功能支持MQ 领域的功能极其完备基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用上面表格来自:https://github.com/doocs/adva…推荐使用早期大家都在使用ActiveMQ ,适合小型项目,如果你尝试使用MQ,你可以选择。RabbitMQ社区活跃度比较高,开源,有问题可以在社区寻求帮助。但是底层使用了erlang 语言,不是小公司又能力掌控的 。RocketMQ 阿里出品,是用的中国公司比较多,经历过使用场景的考验,且自家产品也在用,不用担心。但是社区活跃度不高。推荐使用。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

January 25, 2019 · 1 min · jiezi

SpringBoot ActiveMq JmsTemplate 异步发送、非持久化

ActiveMq事务ActiveMq事务的作用就是在发送、接收处理消息过程中,如果出现问题,可以回滚。ActiveMq异步/同步发送以下摘抄自https://blog.csdn.net/songhai… 同步发送:消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。异步发送如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到 broker 的确认之前一直阻塞 Producer.send 方法。当发送方法在一个事务上下文中时,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着所有的持久消息都以被写到二级存储中。想要使用异步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true如果设置了alwaysSyncSend=true系统将会忽略useAsyncSend设置的值都采用同步 1) 当alwaysSyncSend=false时,“NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送” 2) 当alwaysSyncSend=false时,如果指定了useAsyncSend=true,“PERSISTENT”类型的消息使用异步发送。如果useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送。总结:默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送。 jms.sendTimeout:发送超时时间,默认等于0,如果jms.sendTimeout>0将会忽略(alwaysSyncSend、useAsyncSend、消息是否持久化)所有的消息都是用同步发送!官方连接:http://activemq.apache.org/as…配置使用异步发送方式1.在连接上配置cf = new ActiveMQConnectionFactory(“tcp://locahost:61616?jms.useAsyncSend=true”);2.通过ConnectionFactory((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);3.通过connection((ActiveMQConnection)connection).setUseAsyncSend(true);SpringBoot JMS实现异步发送1.如果在配置中使用了连接池,那么SpringBoot默认会使用PooledConnectionFactory,ActiveMQConnectionFactory的useAsyncSend默认会true。使用连接池配置如下activemq: in-memory: true broker-url: tcp://127.0.0.1:61616 pool: enabled: true max-connections: 5 user: password:2.修改JmsTemplate 默认参数JmsTemplate template = new JmsTemplate(pooledConnectionFactory);//设备为true,deliveryMode, priority, timeToLive等设置才会起作用template.setExplicitQosEnabled(true);//设为非持久化模式template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);完整代码如下:@Slf4j@Configurationpublic class ActiveConfig { /** * 配置用于异步发送的非持久化JmsTemplate / @Autowired @Bean @Primary public JmsTemplate asynJmsTemplate(PooledConnectionFactory pooledConnectionFactory) { JmsTemplate template = new JmsTemplate(pooledConnectionFactory); template.setExplicitQosEnabled(true); template.setDeliveryMode(DeliveryMode.NON_PERSISTENT); log.info(“jsmtemplate ————->sessionTransacted:{}",template.isSessionTransacted()); log.info(“jsmtemplate ————->ExplicitQosEnabled:{}",template.isExplicitQosEnabled()); return template; } /* * 配置用于同步发送的持久化JmsTemplate */ @Autowired @Bean public JmsTemplate synJmsTemplate(PooledConnectionFactory pooledConnectionFactory) { JmsTemplate template = new JmsTemplate(pooledConnectionFactory); log.info(“jsmtemplate ————->sessionTransacted:{}",template.isSessionTransacted()); log.info(“jsmtemplate ————->ExplicitQosEnabled:{}",template.isExplicitQosEnabled()); return template; }//如果对于SpringBoot自动生成的PooledConnectionFactory需要调优,可以自己生PooledConnectionFactory调优参数// private PooledConnectionFactory getPooledConnectionFactory(String userName,String password,String brokerURL) {// ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName,password,brokerURL);// ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();// activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);// PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);// pooledConnectionFactory.setMaxConnections(5);// return pooledConnectionFactory;// } ...

December 19, 2018 · 1 min · jiezi