关于java:Java开发中常用的消息队列工具-ActiveMQ

32次阅读

共计 4478 个字符,预计需要花费 12 分钟才能阅读完成。

1、简介
ActiveMQ

同类产品:RabbitMQ、Kafka、Redis(List)

1.1 比照 RabbitMQ
最靠近的同类型产品,常常拿来比拟,性能伯仲之间,基本上能够相互代替。最次要区别是二者的协定不同 RabbitMQ 的协定是 AMQP(Advanced Message Queueing Protoco),而 ActiveMQ 应用的是 JMS(Java Messaging Service)协定。顾名思义 JMS 是针对 Java 体系的传输协定,队列两端必须有 JVM,所以如果开发环境都是 java 的话举荐应用 ActiveMQ,能够用 Java 的一些对象进行传递比方 Map、BLob、Stream 等。而 AMQP 通用行较强,非 java 环境常常应用,传输内容就是规范字符串。

另外一点就是 RabbitMQ 用 Erlang 开发,装置前要装 Erlang 环境,比拟麻烦。ActiveMQ 解压即可用不必任何装置。

1.2 比照 KafKa
Kafka 性能超过 ActiveMQ 等传统 MQ 工具,集群扩展性好。

弊病是:

在传输过程中可能会呈现音讯反复的状况,

不保障发送程序

一些传统 MQ 的性能没有,比方音讯的事务性能。

所以通常用 Kafka 解决大数据日志。

1.3 比照 Redis
其实 Redis 自身利用 List 能够实现音讯队列的性能,然而性能很少,而且队列体积较大时性能会急剧下降。对于数据量不大、业务简略的场景能够应用。

2 装置 ActiveMQ
拷贝 apache-activemq-5.14.4-bin.tar.gz 到 Linux 服务器的 /opt 下

解压缩 tar -zxvf apache-activemq-5.14.4-bin.tar.gz

重命名 mv apache-activemq-5.14.4 activemq

vim /opt/activemq/bin/activemq

查看 java 环境:vim /etc/profile 或者 echo $JAVA_HOME

减少两行

JAVA_HOME=”/opt/jdk1.8.0_152″

JAVA_CMD=”/opt/jdk1.8.0_152/bin”

注册服务

ln -s /opt/activemq/bin/activemq /etc/init.d/activemq(软连贯必须应用绝对路径)

chkconfig –add activemq

禁止应用

cp /opt/activemq/bin/activemq /etc/init.d/activemq

启动服务

service activemq start

敞开服务

service activemq stop

通过 netstat 查看端口

netstat -tlnp

t: 示意 tcp

l: 示意监听

n: 将 ip 和端口转换成域名和服务名

p: 显示的程序名

activemq 两个重要的端口,一个是提供音讯队列的默认端口:61616

另一个是控制台端口 8161

通过控制台测试

启动生产端

进入网页控制台

账号 / 明码默认:admin/admin

点击 Queues

察看客户端

3、在 Java 中应用音讯队列
3.1 在 gmall-service-util 中导入依赖坐标

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>
<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
   <version>5.15.2</version>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

3.2 在 payment 我的项目中增加 producer 端

public class ProducerTest {

public static void main(String[] args) throws JMSException {
    // 创立连贯工厂
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(“tcp://192.168.67.201:61616”);
    Connection connection = connectionFactory.createConnection();
    connection.start();
    // 创立 session 第一个参数示意是否反对事务,false 时,第二个参数 Session.AUTO_ACKNOWLEDGE 主动签收,Session.CLIENT_ACKNOWLEDGE 手动签收,DUPS_OK_ACKNOWLEDGE 订阅时签收其中一个
    // 第一个参数设置为 true 时,第二个参数能够疏忽 服务器设置为 SESSION_TRANSACTED
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 创立队列
    Queue queue = session.createQueue(“Atguigu”);

    MessageProducer producer = session.createProducer(queue);
    // 创立音讯对象
    ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
    activeMQTextMessage.setText(“hello ActiveMq!”);
    // 发送音讯
    producer.send(activeMQTextMessage);
    producer.close();
    connection.close();}

}

留神:如果有事务须要先提交事务 session.commit();

Number Of Pending Messages 期待生产的音讯 这个是以后未出队列的数量。能够了解为总接管数 - 总出队列数

Number Of Consumers 消费者 这个是消费者端的消费者数量

Messages Enqueued 进入队列的音讯 进入队列的总数量, 包含出队列的。这个数量只增不减

Messages Dequeued 出了队列的音讯 能够了解为是消费者生产掉的数量

总结:

当有一个音讯进入这个队列时,期待生产的音讯是 1,进入队列的音讯是 1。
当音讯生产后,期待生产的音讯是 0,进入队列的音讯是 1,出队列的音讯是 1.
在来一条音讯时,期待生产的音讯是 1,进入队列的音讯就是 2.

3.3 在 payment 我的项目中增加 consumer 端
public class ConsumerTest {

public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,“tcp://192.168.67.201:61616”);
    // 创立连贯
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();
    // 创立会话
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 创立队列
    Queue queue = session.createQueue(“Atguigu”);
    // 创立 Consumer
    MessageConsumer consumer = session.createConsumer(queue);
    // 接管音讯
    consumer.setMessageListener(new MessageListener() {
        @Override
        public void onMessage(Message message) {
            // 参数就是收到的音讯
            if (message instanceof  TextMessage){
                try {String text = ((TextMessage) message).getText();
                    System.out.println(text+“接管的音讯!”);
                } catch (JMSException e) {e.printStackTrace();
                }
            }
        }
    });
    consumer.close();
    session.close();}

}

3.4 对于事务管制
producer 提交时的事务

事务开启 true

只执行 send 并不会提交到队列中,只有当执行 session.commit()时,音讯才被真正的提交到队列中。

事务不开启 false

只有执行 send,就进入到队列中。

consumer 接管时的事务

事务开启,签收必须写

Session.SESSION_TRANSACTED

收到音讯后,音讯并没有真正的被生产。音讯只是被锁住。一旦呈现该线程死掉、抛异样,或者程序执行了 session.rollback()那么音讯会开释,从新回到队列中被别的生产端再次生产。java 培训

事务不开启,签收形式抉择

Session.AUTO_ACKNOWLEDGE

只有调用 comsumer.receive 办法,主动确认。

事务不开启,签收形式抉择

Session.CLIENT_ACKNOWLEDGE

须要客户端执行 message.acknowledge(), 否则视为未提交状态,线程完结后,其余线程还能够接管到。

这种形式跟事务模式很像,区别是不能手动回滚, 而且能够独自确认某个音讯。

手动签收

事务不开启,签收形式抉择

Session.DUPS_OK_ACKNOWLEDGE

在 Topic 模式下做批量签收时用的,能够进步性能。然而某些状况音讯可能会被反复提交,应用这种模式的 consumer 要能够解决反复提交的问题。

3.5 长久化与非长久化
通过 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置

长久化的益处就是当 activemq 宕机的话,音讯队列中的音讯不会失落。非长久化会失落。然而会耗费肯定的性能。

长久化:当服务器宕机,音讯仍然存在。

非长久化:当服务器宕机,音讯不存在。

在 zookeeper 中,有长久化 - 非长久化。

正文完
 0