RocketMQ初入门踩坑记

64次阅读

共计 4736 个字符,预计需要花费 12 分钟才能阅读完成。

本文主要是讲在 Centos 中安装 RocketMQ 并做简单的示例。如果你按照本文安装 100% 是可以成功的,如果按照阿里官方的说明,那只能呵呵了~

安装

官方地址为:https://rocketmq.apache.org/d…
本人安装如下:

// 下载最新的 rocketmq
wget http://apache-mirror.8birdsvideo.com/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip

// 解压
unzip rocketmq-all-4.4.0-bin-release.zip

// 切换到 mq 目录
cd rocketmq-all-4.4.0-bin-release

//name server 启动
nohup ./bin/mqnamesrv -n 111.231.XX.XX:9876 &

//-c conf/broker.conf autoCreateTopicEnable=true 参数需要带上,不然 topic 需要手动创建
nohup sh bin/mqbroker -n 111.231.XX.XX:9876 -c conf/broker.conf autoCreateTopicEnable=true &

配置, 切换到 mq 的 bin 目录下

cd rocketmq-all-4.4.0-bin-release/bin

rocketmq 默认最低内存为 4g, 机器内存不够用的话,找到 runserver.sh 和 runbroker.sh 编辑如下:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

运行

运行官方 demo,发现如下错误:

21:20:22.249 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:640)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1310)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1256)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:339)
    at org.apache.rocketmq.example.simple.Producer.main(Producer.java:40)

运行以下命令查看 broker 配置并写入远程 ip 地址:

// 查看 broker 配置
sh ./bin/mqbroker -m

// 关闭 broker
sh bin/mqshutdown broker

// 将本机远程 ip 写入配置文件中
echo 'brokerIP1=111.231.XX.XX' > conf/broker.properties 

// 重新启动 broker
nohup sh bin/mqbroker -n 111.231.XX.XX:9876 -c conf/broker.conf autoCreateTopicEnable=true &

管理控制台安装

Git 地址:https://github.com/apache/roc…

git clone git@github.com:apache/rocketmq-externals.git
cd  rocketmq-external/rocketmq-console/
mvn clean package -Dmaven.test.skip=true

打完包后,运行以下命令

java -jar rocketmq-console-ng-1.0.1.jar --server.port=12181 --rocketmq.config.namesrvAddr=111.231.XX.XX:9876

打开 http://localhost:12181 访问控制台,像如下

在 Procuder 这个页面查询时会出现如下异常:

java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1  DESC: the producer group[] not exist
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
        at com.google.common.base.Throwables.propagate(Throwables.java:160)
        at org.apache.rocketmq.console.service.impl.ProducerServiceImpl.getProducerConnection(ProducerServiceImpl.java:38)
        at org.apache.rocketmq.console.controller.ProducerController.producerConnection(ProducerController.java:39)

请把代码中 producer.shutdown()这句注掉,生产环境中请加上。

 //producer.shutdown();

代码示例(官方)

生产者

package org.apache.rocketmq.example.simple;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

        producer.setNamesrvAddr("111.231.XX.XX:9876");
        producer.start();

        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {e.printStackTrace();
            }

        //producer.shutdown();}
}

消费者

package org.apache.rocketmq.example.simple;

import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class PushConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        //consumer.setConsumeTimestamp("20181109221800");
        consumer.setNamesrvAddr("111.231.XX.XX:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

有更多的文章,请关注查看,更有面试宝典相送

正文完
 0