1 、简介
ActiveMQ

同类产品: RabbitMQ 、 Kafka、Redis(List)

1.1 比照RabbitMQ
最靠近的同类型产品,常常拿来比拟,性能伯仲之间,基本上能够相互代替。最次要区别是二者的协定不同RabbitMQ的协定是AMQP(Advanced Message Queueing Protoco),而ActiveMQ应用的是JMS(Java Messaging Service )协定。顾名思义JMS是针对Java体系的传输协定,队列两端必须有JVM,所以如果开发环境都是java的话举荐应用ActiveMQ,能够用Java的一些对象进行传递比方Map、BLob、Stream等。而AMQP通用行较强,非java环境常常应用,传输内容就是规范字符串。

另外一点就是RabbitMQ用Erlang开发,装置前要装Erlang环境,比拟麻烦。ActiveMQ解压即可用不必任何装置。

1.2 比照KafKa
Kafka性能超过ActiveMQ等传统MQ工具,集群扩展性好。

弊病是:

在传输过程中可能会呈现音讯反复的状况,

不保障发送程序

一些传统MQ的性能没有,比方音讯的事务性能。

所以通常用Kafka解决大数据日志。

1.3 比照Redis
其实Redis自身利用List能够实现音讯队列的性能,然而性能很少,而且队列体积较大时性能会急剧下降。对于数据量不大、业务简略的场景能够应用。

2 装置 ActiveMQ
拷贝apache-activemq-5.14.4-bin.tar.gz到Linux服务器的/opt下

解压缩 tar -zxvf apache-activemq-5.14.4-bin.tar.gz

重命名 mv apache-activemq-5.14.4 activemq

vim /opt/activemq/bin/activemq

查看java环境:vim /etc/profile 或者 echo $JAVA_HOME

减少两行

JAVA_HOME=”/opt/jdk1.8.0_152″

JAVA_CMD=”/opt/jdk1.8.0_152/bin”

注册服务

ln -s /opt/activemq/bin/activemq /etc/init.d/activemq(软连贯必须应用绝对路径)

chkconfig –add activemq

禁止应用

cp /opt/activemq/bin/activemq /etc/init.d/activemq

启动服务

service activemq start

敞开服务

service activemq stop

通过netstat 查看端口

netstat -tlnp

t:示意tcp

l:示意监听

n: 将ip和端口转换成域名和服务名

p:显示的程序名

activemq两个重要的端口,一个是提供音讯队列的默认端口:61616

另一个是控制台端口8161

通过控制台测试

启动生产端

进入网页控制台

账号/明码默认: admin/admin

点击Queues

察看客户端

3、在Java中应用音讯队列
3.1 在gmall-service-util中导入依赖坐标

<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-activemq</artifactId>   <exclusions>      <exclusion>         <groupId>org.slf4j</groupId>         <artifactId>slf4j-log4j12</artifactId>      </exclusion>   </exclusions></dependency>
<dependency>   <groupId>org.apache.activemq</groupId>   <artifactId>activemq-pool</artifactId>   <version>5.15.2</version>   <exclusions>      <exclusion>         <groupId>org.slf4j</groupId>         <artifactId>slf4j-log4j12</artifactId>      </exclusion>   </exclusions></dependency>

3.2 在payment我的项目中增加producer端

public class ProducerTest {

public static void main(String[] args) throws JMSException {    // 创立连贯工厂    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(“tcp://192.168.67.201:61616”);    Connection connection = connectionFactory.createConnection();    connection.start();    // 创立session 第一个参数示意是否反对事务,false时,第二个参数Session.AUTO_ACKNOWLEDGE主动签收,Session.CLIENT_ACKNOWLEDGE手动签收,DUPS_OK_ACKNOWLEDGE 订阅时签收其中一个    // 第一个参数设置为true时,第二个参数能够疏忽 服务器设置为SESSION_TRANSACTED    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    // 创立队列    Queue queue = session.createQueue(“Atguigu”);    MessageProducer producer = session.createProducer(queue);    // 创立音讯对象    ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();    activeMQTextMessage.setText(“hello ActiveMq!”);    // 发送音讯    producer.send(activeMQTextMessage);    producer.close();    connection.close();}

}

留神:如果有事务须要先提交事务session.commit();

Number Of Pending Messages 期待生产的音讯 这个是以后未出队列的数量。能够了解为总接管数-总出队列数

Number Of Consumers 消费者 这个是消费者端的消费者数量

Messages Enqueued 进入队列的音讯 进入队列的总数量,包含出队列的。 这个数量只增不减

Messages Dequeued 出了队列的音讯 能够了解为是消费者生产掉的数量

总结:

当有一个音讯进入这个队列时,期待生产的音讯是1,进入队列的音讯是1。
当音讯生产后,期待生产的音讯是0,进入队列的音讯是1,出队列的音讯是1.
在来一条音讯时,期待生产的音讯是1,进入队列的音讯就是2.

3.3 在payment我的项目中增加consumer端
public class ConsumerTest {

public static void main(String[] args) throws JMSException {    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, “tcp://192.168.67.201:61616”);    // 创立连贯    Connection connection = activeMQConnectionFactory.createConnection();    connection.start();    // 创立会话    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    // 创立队列    Queue queue = session.createQueue(“Atguigu”);    // 创立Consumer    MessageConsumer consumer = session.createConsumer(queue);    // 接管音讯    consumer.setMessageListener(new MessageListener() {        @Override        public void onMessage(Message message) {            // 参数就是收到的音讯            if (message instanceof  TextMessage){                try {                    String text = ((TextMessage) message).getText();                    System.out.println(text+“接管的音讯!”);                } catch (JMSException e) {                    e.printStackTrace();                }            }        }    });    consumer.close();    session.close();}

}

3.4 对于事务管制
producer提交时的事务

事务开启 true

只执行send并不会提交到队列中,只有当执行session.commit()时,音讯才被真正的提交到队列中。

事务不开启 false

只有执行send,就进入到队列中。

consumer 接管时的事务

事务开启,签收必须写

Session.SESSION_TRANSACTED

收到音讯后,音讯并没有真正的被生产。音讯只是被锁住。一旦呈现该线程死掉、抛异样,或者程序执行了session.rollback()那么音讯会开释,从新回到队列中被别的生产端再次生产。java培训

事务不开启,签收形式抉择

Session.AUTO_ACKNOWLEDGE

只有调用comsumer.receive办法 ,主动确认。

事务不开启,签收形式抉择

Session.CLIENT_ACKNOWLEDGE

须要客户端执行 message.acknowledge(),否则视为未提交状态,线程完结后,其余线程还能够接管到。

这种形式跟事务模式很像,区别是不能手动回滚,而且能够独自确认某个音讯。

手动签收

事务不开启,签收形式抉择

Session.DUPS_OK_ACKNOWLEDGE

在Topic模式下做批量签收时用的,能够进步性能。然而某些状况音讯可能会被反复提交,应用这种模式的consumer要能够解决反复提交的问题。

3.5 长久化与非长久化
通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置

长久化的益处就是当activemq宕机的话,音讯队列中的音讯不会失落。非长久化会失落。然而会耗费肯定的性能。

长久化:当服务器宕机,音讯仍然存在。

非长久化:当服务器宕机,音讯不存在。

在zookeeper中,有长久化-非长久化。