踩的一点点小坑
如果你没配置环境变量,那么请到 RocketMQ 的 bin 目录下启动 nameServer 和 broker。尽管命令可能没报错,然而你发现就是启动不起来,说来惭愧,搞这个问题,钻研了一个下午。
另外 RocketMq 所依赖的 jdk 门路是 Oralce 的 JDK,之前我的 linux 服务器装的是 OpenJDK,试了良久,发现就是启动不起来,临时没找到解决方案,只好卸载 OpenJDK。
主从同步
主从同步和集群的概念很相似,主从同步就能够算到集群中,最终的目标也是进步零碎的承载能力和可靠性。
在 RocketMQ 学习笔记(一) 初遇篇咱们曾经搭建了一个 RocketMQ 了,是主节点,上面咱们要搭建的就是从节点。RocketMQ 的主从同步绝对于 MySQL 的主从同步有点不一样,个别状况下 MySQL 中主从同步,只有主节点负责写,从负责读。然而 RocketMQ 有点不按套路出牌,写是向主节点上写,读却不是只读从,
生产位点 CommitLog
每个 topic 有多个队列,消费者从 以后什么地位拉去音讯进行生产,哪个地位就是生产点。
CommitLog: 音讯主体以及元数据的存储主体,存储 Producer 端写入的音讯主体内容, 音讯内容不是定长的。
当 Master Broker 发现 Consumer 的生产位点与 CommitLog 的最新值的差值的容量超过该机器内存的百分比(accessMessageInMemoryMaxRatio=40%),会举荐 Consumer 从 Slave Broker 中去读取数据,升高 Master Broker 的 IO。
主从同步的配置
个别状况下,不加非凡阐明,都是在 Linux 下进行配置。
- hosts 中配置从节点的域名映射 主节点和从节点做雷同的配置
vi /etc/hosts -- 关上并编辑 hosts 文件, 增加如下配置:
192.168.2.128 mqnameserver1
192.168.2.128 mqmaster1
192.168.2.129 mqnameserver2
192.168.2.129 mqmaster1slaver1
nameServer 是均等的,不存在主从,192.168.2.128 这台计算机是主节点,192.168.2.129 是从节点。
- 批改主节点的配置文件
- 配置从节点
其实能够在主节点上配置完之后,通过近程复制,复制到从节点。这里咱们只展现从节点须要配置的。这里咱们配置的是两主两从异步刷新,配置文件门路是 /usr/rocketmq/conf/2m-2s-async。上面有如下配置文件:
咱们批改的是 broker-a-s.properties 这个配置文件。
- 须要配置的:
brokerId=1 -- 大于 0 是从
deleteWhen=04
fileReservedTime=48
flushDiskType=ASYNC_FLUSH
brokerRole=SLAVE -- 角色 是从
autoCreateTopicEnable =true
defaultTopicQueueNums = 4
listenPort = 10911
storePathRootDir=/usr/rocketmq/mqstore
storePathCommitLog=/usr/rocketmq/mqstore/commitlog
storePathConsuQueue=/usr/rocketmq/mqstore/commitlog/consumequeue
storePathIndex=/usr/rocketmq/mqstore/commitlog/consumequeue
maxMessageSize=65535
namesrvAddr = mqnameserver1:9876;mqnameserver2:9876 -- 配置 nameserver 的地址
而后就配置实现了,接下来咱们来启动一下,先主后从。
记得改一下从节点的 RocektMQ 的启动配置改为 1G,防火墙放行,这都是在初遇篇讲过的,遗记了再看下。
主节点启动命令:
nohup sh mqnamesrv &
nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties&
从节点启动命令:
nohup sh mqnamesrv &
-- 留神从节点的 broker 用 broker-a-s.properties
nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/ broker-a-s.properties&
-- 启动胜利的标记,输出 JPS,呈现上面这个:
2209 BrokerStartup
5527 Jps
2173 NamesrvStartup
而后咱们上监控看下:
当初能够再做下测试,向音讯队列再发送音讯,而后尝试干掉主节点,再启用消费者生产。
同步音讯 异步音讯 延时音讯 单向音讯
这里又碰到了同步和异步,这是个在开发畛域很常见的名词,咱们来解释一下:
同步和异步关注的是音讯通信机制 (synchronous communication/ asynchronous communication)。
所谓同步,就是在发动一个调用时,在没失去后果之前,该调用就不返回,也就是说调用者在被动的期待这个调用的后果。
相似于烧水壶,烧水壶在没有揭示性能的时候,咱们不晓得水什么时候烧开,所以就在烧水的时候就始终在烧水壶旁边待着,看烧水壶开了没,
异步就是,在发动一个调用时,间接返回了,后续被调用方通过回调函数将调用后果发送给调用方。
相似于升级版的烧水壶,放上火之后,间接返回,烧开的时候会有提醒。
比拟通俗易懂的介绍:
怎么了解阻塞非阻塞与同步异步的区别?
提早音讯: 就是等一段时间之后再发送。目前不反对自定义,只反对几个级别。
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
单向音讯就是发送完音讯,没返回后果。罕用于日志发送。
这里其实也就是介绍 API 的应用:
我比拟喜爱 RocketMQ 的一个重要起因就是这个是文档写的比拟好,而且是中文。正文曾经打的很具体了,如果你不想看我写的 demo 也能够间接去看官网文档, 上面是官网示例的地址:
https://github.com/apache/roc…
可靠性有多高
之前咱们在介绍音讯队列引论的时候,咱们说过 RocketMQ 是高牢靠的,那么有多高呢?
影响音讯可靠性的几种状况:
- Broker 非正常敞开
- Broker 异样 Crash
- OS Crash(操作系统解体)
- 机器掉电,然而能立刻复原供电状况
- 机器无奈开机(可能是 cpu、主板、内存等关键设备损坏)
- 磁盘设施损坏
而后 1、2、3、4 四种状况都属于硬件资源可立刻复原状况,RocketMQ 在这四种状况下能保障音讯不丢,或者失落大量数据(依赖刷盘形式是同步还是异步)。
5、6 属于单点故障,且无奈复原,一旦产生,在此单点上的音讯全副失落。RocketMQ 在这两种状况下,通过异步复制,可保障 99% 的音讯不丢,然而依然会有极少量的音讯可能失落。通过同步双写技术能够完全避免单点,同步双写势必会影响性能,适宜对音讯可靠性要求极高的场合,例如与 Money 相干的利用。注:RocketMQ 从 3.0 版本开始反对同步双写。
偏移量
偏移量: 消费者将要生产的地位。
那么这个偏移量存储在哪里呢?不同的生产形式,存储地位不同。
如果是 push 模式,即音讯队列被动的向消费者推送数据,那么偏移量就存储在服务器,因为此时音讯队列是晓得消费者生产到哪里了。
如果是 pull 模式,即消费者被动的向音讯队列拉取数据,那么偏移量就应该存储在本地,因为消费者通常不止一个,通常状况下,咱们会搭建消费者集群 (后文会讲),每个消费者生产的进度不同。
偏移量次要与三个类无关: OffsetStore(接口)、LocalFileOffsetStore(实现类)、RemoteBrokerOffsetStore(实现类)
生产模式
push 生产模式
播送模式: 将音讯向所有的消费者发送,每个消费者领有残缺的生产。
集群模式: 也就是消费者集群,消费者在一个小组,即小组名称雷同,那么该小组内的消费者即为一个集群。
设置也非常简单 , 向上面这样:
-- 这是播送模式 MessageModel 是一个枚举类型的
consumer.setMessageModel(MessageModel.BROADCASTING);
-- 这是集群,不写也行,默认即为集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
pull 生产模式
相比拟 push,pull(拉取模式)须要留神的就是,本来由服务器保护的偏移量,当初须要咱们来保护。
- 无调度拉取
public class PullMsgDemo {private static final Map<MessageQueue, Long> offSetMap = new HashMap<>();
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer("pullGroup");
// 设置 nameServer 的地址
defaultMQPullConsumer.setNamesrvAddr(Constant.NAMESRV_ADDR.getEnumValue());
// 开启消费者
defaultMQPullConsumer.start();
// 拉取主题对应的队列
Set<MessageQueue> queues = defaultMQPullConsumer.fetchSubscribeMessageQueues("MyTopic4");
// 遍历音讯
for (MessageQueue queue : queues) {
// 因为音讯的数量是不确定的, 所以要用死循环
while (true) {
// mq, 一个主题领有许多队列, 每个队列的音讯又附带标签
// subExpression,
// offset, 偏移量
// maxNums 一次拉取多少音讯
// pullBlockIfNotFound 一共四个参数
// 从名字能够推断, 如果拉不到就陷入阻塞
PullResult pullResult = defaultMQPullConsumer.pullBlockIfNotFound(queue, null, getOffset(queue), 20);
// 将偏移量重设
setOffset(queue, pullResult.getNextBeginOffset());
// 获取音讯
List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
// 不能无限度的拉下去, 通过 pullResult 来获取拉取状态,
// 这是一个枚举值
PullStatus pullStatus = pullResult.getPullStatus();
// 阐明有音讯
if (pullStatus == PullStatus.FOUND){
// 遍历
for (MessageExt messageExt : msgFoundList) {System.out.println(messageExt);
}
}
// 没有新音讯了
if (pullStatus == PullStatus.NO_NEW_MSG){break;}
}
}
defaultMQPullConsumer.shutdown();
System.out.println("拉取完结");
}
private static void setOffset(MessageQueue queue, long nextBeginOffset) {offSetMap.put(queue,nextBeginOffset);
}
// 首次获取必定为 0
private static Long getOffset(MessageQueue queue) {return offSetMap.get(queue) == null ? 0L : offSetMap.get(queue);
}
}
- 调度拉取 工夫调度(每隔 3 秒获取)
private static void SchedulePullMsg() throws MQClientException {MQPullConsumerScheduleService consumerScheduleService = new MQPullConsumerScheduleService("pullGroup");
// 设置 nameServer 的地址
consumerScheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Constant.NAMESRV_ADDR.getEnumValue());
//
// 第二个是回调函数
consumerScheduleService.registerPullTaskCallback("MyTopic4", (mq, pullTaskContext) -> {
// mq 是音讯队列 获取消费者
MQPullConsumer pullConsumer = pullTaskContext.getPullConsumer();
// fromStore 是在本地进行保护偏移量
try {
// 获取偏移量
long offSet = pullConsumer.fetchConsumeOffset(mq, false);
PullResult pullResult = pullConsumer.pull(mq, "MyTags3", offSet, 20);
PullStatus pullStatus = pullResult.getPullStatus();
if (pullStatus == PullStatus.FOUND) {List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
// 遍历
for (MessageExt messageExt : msgFoundList) {System.out.println(messageExt);
}
}
// 没有新音讯了
// if (pullStatus == PullStatus.NO_NEW_MSG) {// System.out.println("音讯拉取结束");
// }
pullConsumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();
}
// 设置拉取的频率 每 3 秒一次
pullTaskContext.setPullNextDelayTimeMillis(3000);
});
// 这里跟
consumerScheduleService.start();}
音讯部分有序
(轮询调度算法)Round-Robin Scheduling: 轮询调度算法的原理是每一次把来自用户的申请轮流调配给外部中的服务器,从 1 开始,直到 N(外部服务器个数),而后从新开始循环。
很多时候两数相除的时候,咱们只关怀当一个整数除以一个正整数所得的余数,如果你还记得离散数学的话,离散数学中对于求余的实践咱们称之为模算术,这也是数论的内容。求余在 java 中的符号是 % , a 和 b 都是整数,不言而喻,a % b 的余数肯定小于 b。留神这个简略的定理,咱们将在音讯的部分有序中应用到它。
默认状况下,RocketMQ 会采取 Round Robin 轮询,将音讯发送到不同的队列,记得咱们在刚配置的时候吗?咱们配置了 4 个队列。而生产音讯的时候是从多个队列上拉取音讯,这种状况下发送和生产是不能保障有序的。然而如果管制发送的程序音讯只顺次发送到一个队列中,生产的时候只从这个 queue 上生产,那么这就保障了程序。当发送和生产的队列只有一个,咱们称之为全局有序; 如果有多个 queue 参加则是分区有序,即绝对于每个对了都是有序。上面咱们联合一个例子来介绍分区有序。
一个订单的程序流程是: 创立、付款、推送、实现。对于订单场景来说,生产者必然是很多的,消费者可能也不止一个,咱们须要保障订单号雷同的音讯放入一个队列,在不应用全局有序的状况下。生产时,同一个 OrderId 获取到的必定是同一个队列。咱们默认是配置了 4 个队列。看到这里你可能曾经想到了,对的,咱们用 OrderId 对 队列的大小进行模运算,也就是取余,这样咱们就能保障雷同订单的音讯放入一个队列中
原本实例代码是打算放在 GitHub 上的,奈何这段时间拜访 GitHub 的速度很慢,码云又要注册能力下载,所以就只好放在了文章中,例子来自于 RocketMq 官网示例的改装,这一点我感觉 RocketMQ 的官网示例做的很好。
public class OrderProducer {
public static class OrderStep {
private Long orderId;
private String desc;
public OrderStep() {}
public Long getOrderId() {return orderId;}
public void setOrderId(Long orderId) {this.orderId = orderId;}
public String getDesc() {return desc;}
public void setDesc(String desc) {this.desc = desc;}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 制作模仿数据
*
* @return
*/
private List<OrderStep> BuilderOrders() {List<OrderStep> orderStepList = new ArrayList<>();
OrderStep orderStepDemo = new OrderStep();
orderStepDemo.setOrderId(15103111039L);
orderStepDemo.setDesc("创立");
orderStepList.add(orderStepDemo);
orderStepDemo = new OrderStep();
orderStepDemo.setOrderId(15103111065L);
orderStepDemo.setDesc("创立");
orderStepList.add(orderStepDemo);
orderStepDemo = new OrderStep();
orderStepDemo.setOrderId(15103111039L);
orderStepDemo.setDesc("付款");
orderStepList.add(orderStepDemo);
orderStepDemo = new OrderStep();
orderStepDemo.setOrderId(15103117235L);
orderStepDemo.setDesc("创立");
orderStepList.add(orderStepDemo);
orderStepDemo = new OrderStep();
orderStepDemo.setOrderId(15103111065L);
orderStepDemo.setDesc("付款");
orderStepList.add(orderStepDemo);
orderStepDemo = new OrderStep();
orderStepDemo.setOrderId(15103117235L);
orderStepDemo.setDesc("付款");
orderStepList.add(orderStepDemo);
orderStepDemo = new OrderStep();
orderStepDemo.setOrderId(15103111065L);
orderStepDemo.setDesc("实现");
orderStepList.add(orderStepDemo);
orderStepDemo = new OrderStep();
orderStepDemo.setOrderId(15103111039L);
orderStepDemo.setDesc("推送");
orderStepList.add(orderStepDemo);
orderStepDemo = new OrderStep();
orderStepDemo.setOrderId(15103117235L);
orderStepDemo.setDesc("实现");
orderStepList.add(orderStepDemo);
orderStepDemo = new OrderStep();
orderStepDemo.setOrderId(15103111039L);
orderStepDemo.setDesc("实现");
orderStepList.add(orderStepDemo);
return orderStepList;
}
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {DefaultMQProducer defaultMQProducer = new DefaultMQProducer("myProducer");
defaultMQProducer.setNamesrvAddr(Constant.NAMESRV_ADDR.getEnumValue());
// 开启音讯队列
defaultMQProducer.start();
// 设置 nameServer 的地址
List<OrderStep> orderList = new OrderProducer().BuilderOrders();
// 生产音讯
for (int i = 0; i < 10 ; i++) {Message msg = new Message("MyTopic3","tagsA", orderList.get(i).toString().getBytes());
// 不必 Lambda 会更清晰一点
SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg;
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
orderList.get(i).toString()));
}
defaultMQProducer.shutdown();}
}
消费者:
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order");
consumer.setNamesrvAddr(Constant.NAMESRV_ADDR.getEnumValue());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("MyTopic3" , "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);
for (MessageExt msg : msgs) {System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
MQ 事务简述
咱们联合下面的示意图来介绍事务,RocketMQ 中事务的状态由三种: 提交状态、回滚状态、中间状态.
- TransactionStatus.CommitTransaction: 提交事务,它容许消费者生产此音讯。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该音讯将被删除,不容许被生产。
- TransactionStatus.Unknown: 中间状态,它代表须要查看音讯队列来确定状态。
通信的过程大抵是相似的,生产者向 MQ 投放音讯相似于邮寄函件,收回去之后,通常必要的随同过程是收到回信,或者收到信已胜利送到。同样的生产者在发送音讯也是一个双向的,生产者在向音讯对了发送音讯之后,并不分明,本人是否胜利发送音讯,这通常须要 MQ Server 发送一个状态,来告知生产者是否是收到了音讯。在生产者是事务生产者的状况下,生产者在收到 MQ Server 的响应之后,先去更新本地事务的状态,而后在进行提交或者回滚。咱们都晓得通信是不稳固的,生产者提交的 commit/Rollback,MQ Server 可能没收到。没收到之后,MQ Server 就会申请生产者查看本地事务状态,生产者曾经确认服务端收到了音讯,那么生产者就会再发一次 commit/Rollback, 个别咱们称之为 RocketMQ 的弥补机制。
示例:
TransactionMQProducer txProducer = new TransactionMQProducer("txPro");
txProducer.setNamesrvAddr(Constant.NAMESRV_ADDR.getEnumValue());
// 开启事务
txProducer.setTransactionListener(new TransactionListener() {
// 执行本地事务 将 MQ Server 的响应更新到本地事务中
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {System.out.println(o);
if ("msg0".equals(message.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else if ("msg1".equals(message.getTags())) {return LocalTransactionState.ROLLBACK_MESSAGE;}else{return LocalTransactionState.UNKNOW;}
}
// 查看本地事务的状态
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("的确回查了");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
txProducer.start();
for (int i = 0; i < 3; i++) {Message message = new Message("txTopic", "msg" + i, ("msg" + i + "content").getBytes());
// null 代表对每个音讯都进行事务管制
TransactionSendResult sendResult = txProducer.sendMessageInTransaction(message, null);
System.out.println("发送:" + sendResult);
}
// 别把 producer shutdown shutdown 怎么重试
当初去生产的话,是只会生产到两条音讯。
留神: 提早音讯、批量音讯不反对事务机制。
MQ 概念补充
咱们晓得 RocketMQ 是保留在内存中的,那么机器总会有重启或者意外情况,那么咱们就须要复原数据,对于 RocketMq 来说,数据长久化到磁盘有两种形式: 同步刷盘、异步刷盘。主从之间的复制形式有: 同步复制、异步复制。异步速度快,同步则牢靠高。
RocketMQ 的 nameserver 是否用 zookeeper 代替?
能够,然而 nameServer 曾经足够用了,zookeeper 提供的性能很多,RocketMQ 用不上,比如说一个挂掉之后,再选举一个。每个 nameServe 都是平等的,RocketMQ 会向所有的 nameServer 注册。
nameServer 须要实现的局部工作: topicQueue(主题队列)、brokerAddress(音讯队列的地址)、brokerLive(哪些 broker 是存活的)。
参考资料
RocketMQ 生产位点
Apache RocketMQ 开发者指南