聊聊rocketmqclientgo的strategy

序本文次要钻研一下rocketmq-client-go的strategy AllocateStrategyrocketmq-client-go-v2.0.0/consumer/strategy.go type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueueAllocateStrategy定义了一个funcAllocateByAveragelyrocketmq-client-go-v2.0.0/consumer/strategy.go func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue { if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 { return nil } var ( find bool index int ) for idx := range cidAll { if cidAll[idx] == currentCID { find = true index = idx break } } if !find { rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{ rlog.LogKeyConsumerGroup: consumerGroup, "consumerId": currentCID, "cidAll": cidAll, }) return nil } mqSize := len(mqAll) cidSize := len(cidAll) mod := mqSize % cidSize var averageSize int if mqSize <= cidSize { averageSize = 1 } else { if mod > 0 && index < mod { averageSize = mqSize/cidSize + 1 } else { averageSize = mqSize / cidSize } } var startIndex int if mod > 0 && index < mod { startIndex = index * averageSize } else { startIndex = index*averageSize + mod } num := utils.MinInt(averageSize, mqSize-startIndex) result := make([]*primitive.MessageQueue, 0) for i := 0; i < num; i++ { result = append(result, mqAll[(startIndex+i)%mqSize]) } return result}AllocateByAveragely办法会计算averageSize,而后再依据averageSize计算startIndex,最初取mqAll[(startIndex+i)%mqSize]AllocateByAveragelyCirclerocketmq-client-go-v2.0.0/consumer/strategy.go ...

July 13, 2020 · 3 min · jiezi

聊聊rocketmqclientgo的remoteBrokerOffsetStore

序本文次要钻研一下rocketmq-client-go的remoteBrokerOffsetStore remoteBrokerOffsetStorerocketmq-client-go-v2.0.0/consumer/offset_store.go type remoteBrokerOffsetStore struct { group string OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"` client internal.RMQClient namesrv internal.Namesrvs mutex sync.RWMutex}remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性NewRemoteOffsetStorerocketmq-client-go-v2.0.0/consumer/offset_store.go func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore { return &remoteBrokerOffsetStore{ group: group, client: client, namesrv: namesrv, OffsetTable: make(map[primitive.MessageQueue]int64), }}NewRemoteOffsetStore办法实例化remoteBrokerOffsetStorepersistrocketmq-client-go-v2.0.0/consumer/offset_store.go func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) { r.mutex.Lock() defer r.mutex.Unlock() if len(mqs) == 0 { return } used := make(map[primitive.MessageQueue]struct{}, 0) for _, mq := range mqs { used[*mq] = struct{}{} } for mq, off := range r.OffsetTable { if _, ok := used[mq]; !ok { delete(r.OffsetTable, mq) continue } err := r.updateConsumeOffsetToBroker(r.group, mq, off) if err != nil { rlog.Warning("update offset to broker error", map[string]interface{}{ rlog.LogKeyConsumerGroup: r.group, rlog.LogKeyMessageQueue: mq.String(), rlog.LogKeyUnderlayError: err.Error(), "offset": off, }) } else { rlog.Info("update offset to broker success", map[string]interface{}{ rlog.LogKeyConsumerGroup: r.group, rlog.LogKeyMessageQueue: mq.String(), "offset": off, }) } }}persist办法遍历OffsetTable,执行r.updateConsumeOffsetToBrokerremoverocketmq-client-go-v2.0.0/consumer/offset_store.go ...

July 12, 2020 · 3 min · jiezi

聊聊rocketmqclientgo的localFileOffsetStore

序本文次要钻研一下rocketmq-client-go的localFileOffsetStore OffsetStorerocketmq-client-go-v2.0.0/consumer/offset_store.go type OffsetStore interface { persist(mqs []*primitive.MessageQueue) remove(mq *primitive.MessageQueue) read(mq *primitive.MessageQueue, t readType) int64 update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)}OffsetStore定义了persist、remove、read、update办法localFileOffsetStorerocketmq-client-go-v2.0.0/consumer/offset_store.go type localFileOffsetStore struct { group string path string OffsetTable map[MessageQueueKey]int64 // mutex for offset file mutex sync.Mutex}localFileOffsetStore定义了group、path、OffsetTable、mutex属性NewLocalFileOffsetStorerocketmq-client-go-v2.0.0/consumer/offset_store.go func NewLocalFileOffsetStore(clientID, group string) OffsetStore { store := &localFileOffsetStore{ group: group, path: filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"), OffsetTable: make(map[MessageQueueKey]int64), } store.load() return store}NewLocalFileOffsetStore创立localFileOffsetStore,而后执行store.load()loadrocketmq-client-go-v2.0.0/consumer/offset_store.go func (local *localFileOffsetStore) load() { local.mutex.Lock() defer local.mutex.Unlock() data, err := utils.FileReadAll(local.path) if os.IsNotExist(err) { return } if err != nil { rlog.Info("read from local store error, try to use bak file", map[string]interface{}{ rlog.LogKeyUnderlayError: err, }) data, err = utils.FileReadAll(filepath.Join(local.path, ".bak")) } if err != nil { rlog.Info("read from local store bak file error", map[string]interface{}{ rlog.LogKeyUnderlayError: err, }) return } datas := make(map[MessageQueueKey]int64) wrapper := OffsetSerializeWrapper{ OffsetTable: datas, } err = jsoniter.Unmarshal(data, &wrapper) if err != nil { rlog.Warning("unmarshal local offset error", map[string]interface{}{ "local_path": local.path, rlog.LogKeyUnderlayError: err.Error(), }) return } if datas != nil { local.OffsetTable = datas }}load办法通过utils.FileReadAll(local.path)读取data,而后通过jsoniter.Unmarshal(data, &wrapper)将数据组装到local.OffsetTablereadrocketmq-client-go-v2.0.0/consumer/offset_store.go ...

July 11, 2020 · 2 min · jiezi

聊聊rocketmqclientgo的pushConsumer

序本文次要钻研一下rocketmq-client-go的pushConsumer pushConsumerrocketmq-client-go-v2.0.0/consumer/push_consumer.go type pushConsumer struct { *defaultConsumer queueFlowControlTimes int queueMaxSpanFlowControlTimes int consumeFunc utils.Set submitToConsume func(*processQueue, *primitive.MessageQueue) subscribedTopic map[string]string interceptor primitive.Interceptor queueLock *QueueLock done chan struct{} closeOnce sync.Once}pushConsumer定义了queueFlowControlTimes、queueMaxSpanFlowControlTimes、consumeFunc、submitToConsume、subscribedTopic、interceptor、queueLock、done、closeOnce属性NewPushConsumerrocketmq-client-go-v2.0.0/consumer/push_consumer.go func NewPushConsumer(opts ...Option) (*pushConsumer, error) { defaultOpts := defaultPushConsumerOptions() for _, apply := range opts { apply(&defaultOpts) } srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs) if err != nil { return nil, errors.Wrap(err, "new Namesrv failed.") } if !defaultOpts.Credentials.IsEmpty() { srvs.SetCredentials(defaultOpts.Credentials) } defaultOpts.Namesrv = srvs if defaultOpts.Namespace != "" { defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName } dc := &defaultConsumer{ client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil), consumerGroup: defaultOpts.GroupName, cType: _PushConsume, state: int32(internal.StateCreateJust), prCh: make(chan PullRequest, 4), model: defaultOpts.ConsumerModel, consumeOrderly: defaultOpts.ConsumeOrderly, fromWhere: defaultOpts.FromWhere, allocate: defaultOpts.Strategy, option: defaultOpts, namesrv: srvs, } p := &pushConsumer{ defaultConsumer: dc, subscribedTopic: make(map[string]string, 0), queueLock: newQueueLock(), done: make(chan struct{}, 1), consumeFunc: utils.NewSet(), } dc.mqChanged = p.messageQueueChanged if p.consumeOrderly { p.submitToConsume = p.consumeMessageOrderly } else { p.submitToConsume = p.consumeMessageCurrently } p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...) return p, nil}NewPushConsumer办法实例化defaultConsumer及pushConsumerStartrocketmq-client-go-v2.0.0/consumer/push_consumer.go ...

July 10, 2020 · 7 min · jiezi

聊聊rocketmqclientgo的PullConsumer

序本文次要钻研一下rocketmq-client-go的PullConsumer PullConsumerrocketmq-client-go-v2.0.0/consumer/pull_consumer.go type PullConsumer interface { // Start Start() // Shutdown refuse all new pull operation, finish all submitted. Shutdown() // Pull pull message of topic, selector indicate which queue to pull. Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) // PullFrom pull messages of queue from the offset to offset + numbers PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) // updateOffset update offset of queue in mem UpdateOffset(queue *primitive.MessageQueue, offset int64) error // PersistOffset persist all offset in mem. PersistOffset(ctx context.Context) error // CurrentOffset return the current offset of queue in mem. CurrentOffset(queue *primitive.MessageQueue) (int64, error)}PullConsumer定义了Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset办法defaultPullConsumerrocketmq-client-go-v2.0.0/consumer/pull_consumer.go ...

July 9, 2020 · 3 min · jiezi

聊聊rocketmqclientgo的TraceInterceptor

序本文主要研究一下rocketmq-client-go的TraceInterceptor TraceInterceptorrocketmq-client-go-v2.0.0/producer/interceptor.go // WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.func WithTrace(traceCfg *primitive.TraceConfig) Option { return func(options *producerOptions) { ori := options.Interceptors options.Interceptors = make([]primitive.Interceptor, 0) options.Interceptors = append(options.Interceptors, newTraceInterceptor(traceCfg)) options.Interceptors = append(options.Interceptors, ori...) }}WithTrace方法在options.Interceptors后追加TraceInterceptornewTraceInterceptorrocketmq-client-go-v2.0.0/producer/interceptor.go func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor { dispatcher := internal.NewTraceDispatcher(traceCfg) dispatcher.Start() return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error { beginT := time.Now() err := next(ctx, req, reply) producerCtx := primitive.GetProducerCtx(ctx) if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() { return next(ctx, req, reply) } // SendOneway && SendAsync has no reply. if reply == nil { return err } result := reply.(*primitive.SendResult) if result.RegionID == "" || !result.TraceOn { return err } sendSuccess := result.Status == primitive.SendOK costT := time.Since(beginT).Nanoseconds() / int64(time.Millisecond) storeT := beginT.UnixNano()/int64(time.Millisecond) + costT/2 traceBean := internal.TraceBean{ Topic: producerCtx.Message.Topic, Tags: producerCtx.Message.GetTags(), Keys: producerCtx.Message.GetKeys(), StoreHost: producerCtx.BrokerAddr, ClientHost: utils.LocalIP, BodyLength: len(producerCtx.Message.Body), MsgType: producerCtx.MsgType, MsgId: result.MsgID, OffsetMsgId: result.OffsetMsgID, StoreTime: storeT, } traceCtx := internal.TraceContext{ RequestId: primitive.CreateUniqID(), // set id TimeStamp: time.Now().UnixNano() / int64(time.Millisecond), TraceType: internal.Pub, GroupName: producerCtx.ProducerGroup, RegionId: result.RegionID, TraceBeans: []internal.TraceBean{traceBean}, CostTime: costT, IsSuccess: sendSuccess, } dispatcher.Append(traceCtx) return err }}newTraceInterceptor方法首先通过internal.NewTraceDispatcher(traceCfg)创建dispatcher,然后执行dispatcher.Start方法,之后返回一个func,该func会构造traceCtx,然后执行dispatcher.Append(traceCtx)小结WithTrace方法在options.Interceptors后追加TraceInterceptor;而newTraceInterceptor方法则创建TraceInterceptor ...

July 7, 2020 · 1 min · jiezi

聊聊rocketmqclientgo的transactionProducer

序本文主要研究一下rocketmq-client-go的transactionProducer transactionProducerrocketmq-client-go-v2.0.0/producer/producer.go type transactionProducer struct { producer *defaultProducer listener primitive.TransactionListener}transactionProducer定义了producer及listener属性NewTransactionProducerrocketmq-client-go-v2.0.0/producer/producer.go func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) { producer, err := NewDefaultProducer(opts...) if err != nil { return nil, errors.Wrap(err, "NewDefaultProducer failed.") } return &transactionProducer{ producer: producer, listener: listener, }, nil}NewTransactionProducer方法实例化transactionProducerStartrocketmq-client-go-v2.0.0/producer/producer.go func (tp *transactionProducer) Start() error { go primitive.WithRecover(func() { tp.checkTransactionState() }) return tp.producer.Start()}Start方法先异步执行tp.checkTransactionState(),然后执行tp.producer.Start()checkTransactionStaterocketmq-client-go-v2.0.0/producer/producer.go func (tp *transactionProducer) checkTransactionState() { for ch := range tp.producer.callbackCh { switch callback := ch.(type) { case *internal.CheckTransactionStateCallback: localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg) uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) if uniqueKey == "" { uniqueKey = callback.Msg.MsgId } header := &internal.EndTransactionRequestHeader{ CommitLogOffset: callback.Header.CommitLogOffset, ProducerGroup: tp.producer.group, TranStateTableOffset: callback.Header.TranStateTableOffset, FromTransactionCheck: true, MsgID: uniqueKey, TransactionId: callback.Header.TransactionId, CommitOrRollback: tp.transactionState(localTransactionState), } req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil) req.Remark = tp.errRemark(nil) err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req, tp.producer.options.SendMsgTimeout) if err != nil { rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{ "callback": callback.Addr.String(), "request": req.String(), rlog.LogKeyUnderlayError: err, }) } default: rlog.Error(fmt.Sprintf("unknown type %v", ch), nil) } }}checkTransactionState方法遍历tp.producer.callbackCh,根据type来不同处理,目前支持CheckTransactionStateCallback,它会构造EndTransactionRequestHeader执行tp.producer.client.InvokeOneWayShutdownrocketmq-client-go-v2.0.0/producer/producer.go ...

July 7, 2020 · 2 min · jiezi

惊艳阿里面试官掌握RocketMQ与Kafka中如何实现事务

文末有惊喜!RocketMQ的事务是如何实现的?首先我们来看 RocketMQ 的事务。我在之前的课程中,已经给大家讲解过 RocketMQ 事务的大致流程,这里我们再一起通过代码,重温一下这个流程。 //+V:BGM7756,免费领取资料public class CreateOrderService { @Inject private OrderDao orderDao; //注入订单表的DAO @Inject private ExecutorService executorService; //注入一个ExecutorService private TransactionMQProducer producer; //初始化transactionListener和producer @Init public void init() throws MQClientException { TransactionListener transactionListener = createTransactionListener(); producer = new TransactionMQProducer("myGroup"); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); } //创建订单服务的请求入口 @PUT @RequestMapping(...) public Boolean createOrder(@RequestBody CreateOrderRequest request) { //根据创建订单请求创建一条消息 Message msg = createMessage(request); //发送事务消息 SendResult sendResult = producer.sendMessageInTransaction(msg, request); //返回:事务是否成功 return sendResult.getSendStatus() == SendStatus.SEND_OK; } private TransactionListener createTransactionListener() { return new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { CreateOrderRequest request = (CreateOrderRequest ) arg; try { //执行本地事务创建订单 orderDao.createOrderInDB(request); //如果没抛异常说明执行成功,提交事务消息 return LocalTransactionState.COMMIT_MESSAGE; } catch (Throwable t) { //失败则直接回滚事务消息 return LocalTransactionState.ROLLBACK_MESSAGE; } } //反查本地事务 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { //从消息中获得订单ID String orderId = msg.getUserProperty("orderId"); //去数据库中查询订单号是否存在,如果存在则提交事务; //如果不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回UNKNOW//(PS:这里RocketMQ有个拼写错误:UNKNOW) return orderDao.isOrderIdExistsInDB(orderId)? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW; } } ; } //....}//+V:BGM7756,免费领取资料在这个流程中,我们提供一个创建订单的服务,功能就是在数据库中插入一条订单记录,并发送一条创建订单的消息,要求写数据库和发消息这两个操作在一个事务内执行,要么都成功,要么都失败。在这段代码中,我们首先在 init() 方法中初始化了 transactionListener和发生 RocketMQ 事务消息的变量 producer。真正提供创建订单服务的方法是createOrder(),在这个方法里面,我们根据请求的参数创建一条消息,然后调用RocketMQ producer 发送事务消息,并返回事务执行结果。 ...

July 6, 2020 · 3 min · jiezi

聊聊rocketmqclientgo的defaultProducer

序本文主要研究一下rocketmq-client-go的defaultProducer defaultProducerrocketmq-client-go-v2.0.0/producer/producer.go type defaultProducer struct { group string client internal.RMQClient state int32 options producerOptions publishInfo sync.Map callbackCh chan interface{} interceptor primitive.Interceptor}defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptorNewDefaultProducerrocketmq-client-go-v2.0.0/producer/producer.go func NewDefaultProducer(opts ...Option) (*defaultProducer, error) { defaultOpts := defaultProducerOptions() for _, apply := range opts { apply(&defaultOpts) } srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs) if err != nil { return nil, errors.Wrap(err, "new Namesrv failed.") } if !defaultOpts.Credentials.IsEmpty() { srvs.SetCredentials(defaultOpts.Credentials) } defaultOpts.Namesrv = srvs producer := &defaultProducer{ group: defaultOpts.GroupName, callbackCh: make(chan interface{}), options: defaultOpts, } producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh) producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...) return producer, nil}NewDefaultProducer方法通过internal.NewNamesrv创建NameServerAddrs,之后实例化defaultProducer,然后实例化internal.GetOrNewRocketMQClient及primitive.ChainInterceptorsStartrocketmq-client-go-v2.0.0/producer/producer.go ...

July 5, 2020 · 3 min · jiezi

聊聊rocketmqclientgo的apigo

序本文主要研究一下rocketmq-client-go的api.go Producerrocketmq-client-go-v2.0.0/api.go type Producer interface { Start() error Shutdown() error SendSync(ctx context.Context, mq ...*primitive.Message) (*primitive.SendResult, error) SendAsync(ctx context.Context, mq func(ctx context.Context, result *primitive.SendResult, err error), msg ...*primitive.Message) error SendOneWay(ctx context.Context, mq ...*primitive.Message) error}func NewProducer(opts ...producer.Option) (Producer, error) { return producer.NewDefaultProducer(opts...)}Producer定义了Start、Shutdown、SendSync、SendAsync、SendOneWay方法;NewProducer方法通过producer.NewDefaultProducer创建ProducerTransactionProducerrocketmq-client-go-v2.0.0/api.go type TransactionProducer interface { Start() error Shutdown() error SendMessageInTransaction(ctx context.Context, mq *primitive.Message) (*primitive.TransactionSendResult, error)}func NewTransactionProducer(listener primitive.TransactionListener, opts ...producer.Option) (TransactionProducer, error) { return producer.NewTransactionProducer(listener, opts...)}TransactionProducer方法定义了Start、Shutdown、SendMessageInTransaction方法;NewTransactionProducer方法通过producer.NewTransactionProducer创建TransactionProducerPushConsumerrocketmq-client-go-v2.0.0/api.go type PushConsumer interface { // Start the PullConsumer for consuming message Start() error // Shutdown the PullConsumer, all offset of MessageQueue will be sync to broker before process exit Shutdown() error // Subscribe a topic for consuming Subscribe(topic string, selector consumer.MessageSelector, f func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error)) error // Unsubscribe a topic Unsubscribe(topic string) error}func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) { return consumer.NewPushConsumer(opts...)}PushConsumer定义了Start、Shutdown、Subscribe、Unsubscribe方法;NewPushConsumer通过consumer.NewPushConsumer创建PushConsumerPullConsumerrocketmq-client-go-v2.0.0/api.go ...

July 4, 2020 · 3 min · jiezi

MySQL数据库增量日志解析工具canal-实战

简介canal,阿里开源工具,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费应用场景数据库实时备份业务cache刷新索引构建和实时维护,例:将商品数据推送到es中构建倒排索引带业务逻辑的增量数据处理,例:增量数据推送到第三方平台官网https://github.com/alibaba/canal 原理 MySQL master将数据写入binlogcanal 向master发送dump协议master 收到dump请求,推送binlog给canalcanal 解析binlog,可讲数据投递到MQ系统中,目前支持kafka、RocketMQ安装配置mysql建议的mysql版本是5.7.xmysql8.0.x见官方说明https://github.com/alibaba/ca... 修改mysql配置文件my.cnf [mysqld]log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式server_id=1新增用户并授权,测试的话可以直接使用root用户 CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '123456'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;配置canal查看是否安装java ➜ canal-admin java -versionjava version "1.8.0_251"Java(TM) SE Runtime Environment (build 1.8.0_251-b08)Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)若没有安装,去oracle官网下载1.8版本(切勿选择高版本),选择适合自己系统的版本安装即可,mac系统下载的dmg,直接点击安装,不再累述image.png 访问release页面,下载最新稳定版1.14wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz 解压到指定目录 mkdir /tmp/canaltar zxvf canal.deployer-1.1.4.tar.gz -C /tmp/canal进入到canal目录,查看文件 ...

June 3, 2020 · 4 min · jiezi

搭建rocketmq阅读源码环境

记录下搭建rocketmq阅读源码的环境 1.clone代码 git clone git@github.com:apache/rocketmq.git2.导入idea 3.找一个目录作为rocketmq的工作目录比如D:rocketmq-home新建三个文件夹conf: 配置文件logs: 日志文件store: broker持久化目录在idea中找到distribution目录把conf目录下面的logback_namesrv.xmllogback_broker.xmlbroker.conf文件拷贝到D:rocketmq-homeconf目录 broker.cnf增加 namesrvAddr=127.0.0.1:9876storePathRootDir=D:\\rocketmq-home\\storestorePathCommitLog=D:\\rocketmq-home\\store\\commitLogstorePathConsumeQueue=D:\\rocketmq-home\\store\\consumequeuestorePathIndex=D:\\rocketmq-home\\store\\indexstoreCheckpoint=D:\\rocketmq-home\\store\\checkpointabortFile=D:\\rocketmq-home\\store\\abort4.找到namesrv的启动类NameSrvStartup设置环境变量ROCKETMQ_HOME=D:rocketmq-home 5.找到broker的启动类BrokerStartup设置环境变量ROCKETMQ_HOME=D:rocketmq-home

June 2, 2020 · 1 min · jiezi

rocketMQ461-系列教程-提炼篇

1、namrsrv 与 broker1.1、namesrv 与 broker 架构 从架构图来看, namesrv 充当的角色是 注册中心。只不过有点特殊的是, namesrv 之间互不通信。 1.2、namesrv 与 broker 的通信broker 每隔 30s 会向 namesrv 注册自身的信息。namesrv 每隔 10s 检查 120s 内 无响应的 broker, 并进行剔除。 那么 namesrv 与 broker 的通信模型,会出现一个问题。那就是, namesrv 至少需要 120s 左右才会感知到 broker 死亡。 1.3、rocketmq 的消息模型在 rocketMQ 消息模型中,有以下几个主要角色:生产者、消费者、队列。之间的沟通方式为如下。 图主要表达几个信息:1、topic 实际上是逻辑结构,queue 才是 物理结构,也就是 rocketmq 是基于 队列 进行消费的2、消费者是以组为单位进行消费的。3、在图中, 我特意让 groupA 只订阅 topicA,而没有订阅 topicB。这个原因是, rocketMQ 在被设计时,就不希望一个消费者同时处理多个类型的消息。因此同一个 consumerGroup 下的 consumer 职责应该是一样的,不要干不同的事情(即消费多个topic)。 2、消息发送者2.1、消息发送的三种方式同步发送同步发送时,需要等待 broker 将消息存入 commitlog 文件后,才会返回,生产者线程阻塞。异步发送异步发送时,是使用线程池提交任务的。核心线程、最大线程数量=cpu 核数。见 DefaultMQProducerImpl。 发送消息,不需要等到服务器返回结果。单向发送只管发,不管成功与否2.2、消息发送流程大体流程:验证消息 => 找到 broker => 选择队列 => 消息发送。 ...

May 27, 2020 · 2 min · jiezi

聊聊rocketmq的retryTimesWhenSendFailed

序本文主要研究一下rocketmq的retryTimesWhenSendFailed sendDefaultImplrocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java public class DefaultMQProducerImpl implements MQProducerInner { private final InternalLogger log = ClientLogger.getLog(); private final Random random = new Random(); private final DefaultMQProducer defaultMQProducer; private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final RPCHook rpcHook; protected BlockingQueue<Runnable> checkRequestQueue; protected ExecutorService checkExecutor; private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; private ExecutorService asyncSenderExecutor; //...... private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException( "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); } throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); } //......}sendDefaultImpl方法通过tryToFindTopicPublishInfo找到topicPublishInfo,如果不为null且是ok的,则根据communicationMode计算timesTotal,其中是CommunicationMode.SYNC的话,timesTotal为1 + defaultMQProducer.getRetryTimesWhenSendFailed(),否则为1;之后就是最多循环timesTotal次执行sendKernelImpl,RemotingException异常及MQBrokerException异常中responseCode为TOPIC_NOT_EXIST、SERVICE_NOT_AVAILABLE、SYSTEM_ERROR、NO_PERMISSION、NO_BUYER_ID、NOT_IN_CURRENT_UNIT或者是sendResult.getSendStatus()不等于SendStatus.SEND_OK的会重试sendKernelImplrocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ...

November 5, 2019 · 6 min · jiezi

聊聊rocketmq的retryTimesWhenSendAsyncFailed

序本文主要研究一下rocketmq的retryTimesWhenSendAsyncFailed DefaultMQProducerImplrocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java public class DefaultMQProducerImpl implements MQProducerInner { private final InternalLogger log = ClientLogger.getLog(); private final Random random = new Random(); private final DefaultMQProducer defaultMQProducer; private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final RPCHook rpcHook; protected BlockingQueue<Runnable> checkRequestQueue; protected ExecutorService checkExecutor; private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; private ExecutorService asyncSenderExecutor; //...... private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } //......}sendKernelImpl方法根据communicationMode做不同的处理,如果是ASYNC,则通过mQClientFactory.getMQClientAPIImpl().sendMessage来发送消息返回sendResult,这里通过defaultMQProducer.getRetryTimesWhenSendAsyncFailed()获取retryTimesWhenSendAsyncFailedMQClientAPIImplrocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ...

November 3, 2019 · 6 min · jiezi

聊聊rocketmq的MessageQueueSelector

序本文主要研究一下rocketmq的MessageQueueSelector MessageQueueSelectorrocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/MessageQueueSelector.java public interface MessageQueueSelector { MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}MessageQueueSelector接口定义了select方法,返回MessageQueue;它有几个实现类,分别是SelectMessageQueueByHash、SelectMessageQueueByRandom、SelectMessageQueueByMachineRoomSelectMessageQueueByHashrocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java public class SelectMessageQueueByHash implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); if (value < 0) { value = Math.abs(value); } value = value % mqs.size(); return mqs.get(value); }}SelectMessageQueueByHash实现了MessageQueueSelector接口,其select方法取arg参数的hashcode的绝对值,然后对mqs.size()取余,得到目标队列在mqs的下标SelectMessageQueueByRandomrocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandom.java public class SelectMessageQueueByRandom implements MessageQueueSelector { private Random random = new Random(System.currentTimeMillis()); @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = random.nextInt(mqs.size()); return mqs.get(value); }}SelectMessageQueueByRandom实现了MessageQueueSelector接口,其select方法直接根据mqs.size()随机一个值作为目标队列在mqs的下标SelectMessageQueueByMachineRoomrocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java ...

November 2, 2019 · 2 min · jiezi

聊聊RocketMQTemplate

序本文主要研究一下RocketMQTemplate RocketMQTemplaterocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean { private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class); private DefaultMQProducer producer; private ObjectMapper objectMapper; private String charset = "UTF-8"; private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash(); private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!! //...... @Override public void afterPropertiesSet() throws Exception { if (producer != null) { producer.start(); } } @Override protected void doSend(String destination, Message<?> message) { SendResult sendResult = syncSend(destination, message); log.debug("send message to `{}` finished. result:{}", destination, sendResult); } @Override protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) { String content; if (payload instanceof String) { content = (String) payload; } else { // If payload not as string, use objectMapper change it. try { content = objectMapper.writeValueAsString(payload); } catch (JsonProcessingException e) { log.error("convert payload to String failed. payload:{}", payload); throw new RuntimeException("convert to payload to String failed.", e); } } MessageBuilder<?> builder = MessageBuilder.withPayload(content); if (headers != null) { builder.copyHeaders(headers); } builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN); Message<?> message = builder.build(); if (postProcessor != null) { message = postProcessor.postProcessMessage(message); } return message; } @Override public void destroy() { if (Objects.nonNull(producer)) { producer.shutdown(); } for (Map.Entry<String, TransactionMQProducer> kv : cache.entrySet()) { if (Objects.nonNull(kv.getValue())) { kv.getValue().shutdown(); } } cache.clear(); } //......}RocketMQTemplate继承了spring-messaging的AbstractMessageSendingTemplate,实现了InitializingBean, DisposableBean接口;提供了syncSend、syncSendOrderly、asyncSend、asyncSendOrderly、sendOneWay、sendOneWayOrderly、sendMessageInTransaction等方法afterPropertiesSet方法执行producer.start();destroy方法执行producer.shutdown()以及TransactionMQProducer的shutdown并清空cache集合doSend方法内部调用的是syncSend方法,返回的sendResult仅仅debug输出;doConvert方法针对String类型的payload不做处理,其他类型使用objectMapper.writeValueAsString转为String作为content,然后构造message,执行postProcessor.postProcessMessage,然后返回syncSendrocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java ...

November 2, 2019 · 6 min · jiezi

该如何选择消息队列

在高并发业务场景下,消息队列在流量削峰、解耦上有不可替代的作用。当前使用较多的消息队列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等。 消息队列这么多,到底该选择哪款消息队列呢? 选择消息队列的基本标准虽然这些消息队列在功能和特性方面各有优劣,但我们在选择的时候要有一个基本标准。 首先,必须是开源的产品。开源意味着,如果有一天你使用的消息队列遇到了一个影响你系统业务的 Bug,至少还有机会通过修改源代码来迅速修复或规避这个 Bug,解决你的系统的问题,而不是等待开发者发布的下一个版本来解决。 其次,这个产品必须是近年来比较流行并且有一定社区活跃度的产品。流行的好处是,只要使用场景不太冷门,遇到 Bug 的概率会非常低,因为大部分遇到的 Bug,其他人早就遇到并且修复了。在使用过程中遇到的一些问题,也比较容易在网上搜索到类似的问题,然后很快的找到解决方案。还有一个优势就是,流行的产品与周边生态系统会有一个比较好的集成和兼容。 最后,作为一款及格的消息队列,必须具备的几个特性包括: 消息的可靠传递:确保不丢消息;Cluster:支持集群,确保不会因为某个节点宕机导致服务不可用,当然也不能丢消息;性能:具备足够好的性能,能满足绝大多数场景的性能要求。接下来看一下有哪些符合上面这些条件,可供选择的开源消息队列。 RabbitMQ 首先,我们来看下消息队列 RabbitMQ。RabbitMQ 于 2007 年发布,是使用 Erlang 编程语言编写的,最早是为电信行业系统之间的可靠通信设计的,也是少数几个支持 AMQP 协议的消息队列之一。 RabbitMQ:轻量级、迅捷,它的宣传口号,也很明确地表明了 RabbitMQ 的特点:Messaging that just works,开箱即用的消息队列。也就是说,RabbitMQ 是一个相当轻量级的消息队列,非常容易部署和使用。 RabbitMQ 一个比较有特色的功能是支持非常灵活的路由配置,和其他消息队列不同的是,它在生产者(Producer)和队列(Queue)之间增加了一个 Exchange 模块,可以理解为交换机。 Exchange 模块的作用和交换机非常相似,根据配置的路由规则将生产者发出的消息分发到不同的队列中。路由的规则也非常灵活,甚至可以自己来实现路由规则。如果正好需要这个功能,RabbitMQ 是个不错的选择。 RabbitMQ 的客户端支持的编程语言大概是所有消息队列中最多的。 接下来说下 RabbitMQ 的几个问题: RabbitMQ 对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。RabbitMQ 的性能是这几个消息队列中最差的,大概每秒钟可以处理几万到十几万条消息。如果应用对消息队列的性能要求非常高,那不要选择 RabbitMQ。RabbitMQ 使用的编程语言 Erlang,扩展和二次开发成本高。RocketMQ RocketMQ 是阿里巴巴在 2012 年开源的消息队列产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,后来捐赠给 Apache 软件基金会,2017 正式毕业,成为 Apache 的顶级项目。RocketMQ 在阿里内部被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,Binglog 分发等场景。经历过多次双十一考验,它的性能、稳定性和可靠性都是值得信赖的。 RocketMQ 有着不错的性能,稳定性和可靠性,具备一个现代的消息队列应该有的几乎全部功能和特性,并且它还在持续的成长中。 ...

October 15, 2019 · 1 min · jiezi

RocketMQ主从同步源码分析

微信公众号「后端进阶」,专注后端技术分享:Java、Golang、WEB框架、分布式中间件、服务治理等等。之前写了一篇关于 RocketMQ 队列与 Kafka 分区副本的区别文章,里面提到了 RocketMQ 的消息冗余主要是通过主备同步机制实现的,这跟 Kafka 分区副本的 Leader-Follower 模型不同,HA(High Available) 指的是高可用性,而 RocketMQ 的HA机制是通过主备同步实现消息的高可用。 HA 核心类HA 的实现逻辑放在了 store 存储模块的ha目录中,其核心实现类如下: HAService:主从同步的核心实现类HAService$AcceptSocketService:主服务器监听从服务器连接实现类HAService$GroupTransferService:主从同步通知类,实现同步复制和异步复制的功能HAService$HAClient:从服务器连接主服务实现类HAConnection:主服务端 HA 连接对象的封装,当主服务器接收到从服务器发过来的消息后,会封装成一个 HAConnection 对象,其中里面又封装了读 Socket 连接实现与 写 Socket 连接实现:HAConnection$ReadSocketService:主服务器读实现类HAConnection$WriteSocketService:主服务器写实现类RocketMQ 主从同步的整体工作机制大致是: 从服务器主动建立 TCP 连接主服务器,然后每隔 5s 向主服务器发送 commitLog 文件最大偏移量拉取还未同步的消息;主服务器开启监听端口,监听从服务器发送过来的信息,主服务器收到从服务器发过来的偏移量进行解析,并返回查找出未同步的消息给从服务器;客户端收到主服务器的消息后,将这批消息写入 commitLog 文件中,然后更新 commitLog 拉取偏移量,接着继续向主服务拉取未同步的消息。Slave -> Master 过程从 HA 实现逻辑可看出,可大致分为两个过程,分别是从服务器上报偏移量,以及主服务器发送未同步消息到从服务器。 从上面的实现类可知,从服务器向主服务器上报偏移量的逻辑在 HAClient 类中,HAClient 类是一个继承了 ServiceThread 类,即它是一个线程服务类,在 Broker 启动后,Broker 启动开一条线程定时执行从服务器上报偏移量到主服务器的任务。 org.apache.rocketmq.store.ha.HAService.HAClient#run: public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 主动连接主服务器,获取socketChannel对象 if (this.connectMaster()) { if (this.isTimeToReportOffset()) { // 执行上报偏移量到主服务器 boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { this.closeMaster(); } } // 每隔一秒钟轮询一遍 this.selector.select(1000); // 处理主服务器发送过来的消息 boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); } // ...... } else { this.waitForRunning(1000 * 5); } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); this.waitForRunning(1000 * 5); } } log.info(this.getServiceName() + " service end");}以上是 HAClient 线程 run 方法逻辑,主要是做了主动连接主服务器,并上报偏移量到主服务器,以及处理主服务器发送过来的消息,并不断循环执行以上逻辑。 ...

October 14, 2019 · 5 min · jiezi

RocketMQ分布式事务

原理描述 生产者向broker发送一条未经commit不可消费的事务性消息(半消息)。如果发送成功返回SEND_OK,则执行本地事务,执行成功则commit,commit过的消息可正常被服务端消费。执行失败则rollback,rollback的消息则被删除。还有一种情况,就是broker没有收到确认消息,则会回查本地事务的状态,看是commit,还是rollback。 演示过程请求http://localhost:8080/callback/transaction?text=888&param=1 正常被消费 请求http://localhost:8080/callback/transaction?text=888&param=2 被回滚掉 请求http://localhost:8080/callback/transaction?text=888&param=7 回查事务状态 项目github地址https://github.com/Nirvana010...

July 11, 2019 · 1 min · jiezi

RocketMQ-可视化环境搭建和基础代码使用

RocketMQ 是一款分布式消息中间件,最初是由阿里巴巴消息中间件团队研发并大规模应用于生产系统,满足线上海量消息堆积的需求, 在 2016 年底捐赠给 Apache 开源基金会成为孵化项目,经过不到一年时间正式成为了 Apache 顶级项目。早期阿里曾经基于 ActiveMQ 研发消息系统, 随着业务消息的规模增大,瓶颈逐渐显现,后来也考虑过Kafka,但因为在低延迟和高可靠性方面没有选择,最后才自主研发了 RocketMQ, 各方面的性能都比目前已有的消息队列要好,RocketMQ 和 Kafka 在概念和原理上都非常相似,所以也经常被拿来对比;RocketMQ 默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。本文分为三部分,如下图所示: 1 安装 RocketMQ—Windows 版本 (1)下载 Windows 安装包Windows 版本下载地址:http://rocketmq.apache.org/release_notes/下载并解压 rocketmq 安装包。 (2)配置系统环境变量配置系统变量 ROCKETMQ_HOME=“D:softrocketmq-all-4.5.1-bin-release”,如下图所示:注意:每个人 rocketmq 存放目录不一样,我的在 D:soft 下,用户根据自己的环境配置相应的系统变量。 因为接下来启动 mqnamesrv.cmd 中使用到了环境变量 %ROCKETMQ_HOME%,所以这里需要配置此系统变量。 (3)启动 namesrv进入 rocketmq 的 bin 目录,执行 start mqnamesrv.cmd ,执行成功如下图所示:注意:启动之后,不能关闭此窗口。 (4)启动 broker还是在 bin 目录下执行 start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true ,执行成功如下图所示:同样不要关闭以上运行窗口。完成以下步骤,说明你的 RocketMQ 已经按照成功了。 2 安装可视化插件 (1)下载插件打开连接 https://github.com/apache/roc... 下载可视化插件 rocketmq-externals,如下图所示:点击 Download ZIP 进行下载。 ...

July 3, 2019 · 2 min · jiezi

多维度对比5款主流分布式MQ消息队列妈妈再也不担心我的技术选型了

1、引言对于即时通讯系统(包括IM、消息推送系统等)来说,MQ消息中件间是非常常见的基础软件,但市面上种类众多、各有所长的MQ消息中件间产品,该怎么去选择?这是个问题! 对于很多经验不足的开发者来说,一个公司内部用的IM聊天系统,总用户量也不过百十来人,动辄就是Kafka、MongoDB,美其名曰为了高性能和可扩展性,真是大炮打蚊子。而对于中大型的即时通讯场景来说,有的开发者确为了贪图使用简单、资料全面,反而使用臃肿不堪的ActiveMQ,这就有点失去章法了。 唧唧歪歪这么多,那什么样的场景到底该用哪种MQ消息中件间产品合适?读完本文您或许就有了答案。 本文将从17个维度综合对比Kafka、RabbitMQ、ZeroMQ、RocketMQ、ActiveMQ这5款当前最主流的MQ消息中间件产品,希望能为您的下一次产品的架构设计和MQ消息中间件选型提供参考依据。 学习交流: 即时通讯/推送技术开发交流4群:101279154[推荐]移动端IM开发入门文章:《新手入门一篇就够:从零开发移动端IM》(本文同步发布于:http://www.52im.net/thread-26...) 2、相关资料官网地址: Kafka:http://kafka.apache.org/ RabbitMQ:https://www.rabbitmq.com/ ZeroMQ:http://zeromq.org/ RocketMQ:http://rocketmq.apache.org/ ActiveMQ:http://activemq.apache.org/ 相关文章: 《IM开发基础知识补课(五):通俗易懂,正确理解并用好MQ消息队列》 《IM系统的MQ消息中间件选型:Kafka还是RabbitMQ?》 3、维度1:资料文档1)Kafka:资料数量中等。有Kafka作者自己写的书,网上资料也有一些。 2)RabbitMQ:资料数量多。有一些不错的书,网上资料多。 3)ZeroMQ:资料数量少。专门写ZeroMQ的书较少,网上的资料多是一些代码的实现和简单介绍。 4)RocketMQ:资料数量少。专门写RocketMQ的书目前有了两本;网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述。 5)ActiveMQ:资料数量多。没有专门写ActiveMQ的书,网上资料多。 4、维度2:开发语言1)Kafka:Scala 2)RabbitMQ:Erlang 3)ZeroMQ:C语言 4)RocketMQ:Java 5)ActiveMQ:Java 5、维度3:支持的协议1)Kafka:自己定义的一套…(基于TCP) 2)RabbitMQ:AMQP 3)ZeroMQ:TCP、UDP 4)RocketMQ:自己定义的一套… 5)ActiveMQ:OpenWire、STOMP、REST、XMPP、AMQP 6、维度4:消息存储1)Kafka: 内存、磁盘、数据库。支持大量堆积。 Kafka的最小存储单元是分区,一个topic包含多个分区,Kafka创建主题时,这些分区会被分配在多个服务器上,通常一个broker一台服务器。 分区首领会均匀地分布在不同的服务器上,分区副本也会均匀的分布在不同的服务器上,确保负载均衡和高可用性,当新的broker加入集群的时候,部分副本会被移动到新的broker上。 根据配置文件中的目录清单,Kafka会把新的分区分配给目录清单里分区数最少的目录。 默认情况下,分区器使用轮询算法把消息均衡地分布在同一个主题的不同分区中,对于发送时指定了key的情况,会根据key的hashcode取模后的值存到对应的分区中。 2)RabbitMQ: 内存、磁盘。支持少量堆积。 RabbitMQ的消息分为持久化的消息和非持久化消息,不管是持久化的消息还是非持久化的消息都可以写入到磁盘。 持久化的消息在到达队列时就写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。 非持久化的消息一般只存在于内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存。 引入镜像队列机制,可将重要队列“复制”到集群中的其他broker上,保证这些队列的消息不会丢失。 配置镜像的队列,都包含一个主节点master和多个从节点slave,如果master失效,加入时间最长的slave会被提升为新的master,除发送消息外的所有动作都向master发送,然后由master将命令执行结果广播给各个slave,RabbitMQ会让master均匀地分布在不同的服务器上,而同一个队列的slave也会均匀地分布在不同的服务器上,保证负载均衡和高可用性。 3)ZeroMQ: 消息发送端的内存或者磁盘中。不支持持久化。 4)RocketMQ: 磁盘。支持大量堆积。 commitLog文件存放实际的消息数据,每个commitLog上限是1G,满了之后会自动新建一个commitLog文件保存数据。 ConsumeQueue队列只存放offset、size、tagcode,非常小,分布在多个broker上。 ConsumeQueue相当于CommitLog的索引文件,消费者消费时会从consumeQueue中查找消息在commitLog中的offset,再去commitLog中查找元数据。 ConsumeQueue存储格式的特性,保证了写过程的顺序写盘(写CommitLog文件),大量数据IO都在顺序写同一个commitLog,满1G了再写新的。 加上RocketMQ是累计4K才强制从PageCache中刷到磁盘(缓存),所以高并发写性能突出。 5)ActiveMQ: 内存、磁盘、数据库。支持少量堆积。 7、维度5:消息事务1)Kafka:支持 2)RabbitMQ:支持。客户端将信道设置为事务模式,只有当消息被RabbitMQ接收,事务才能提交成功,否则在捕获异常后进行回滚。使用事务会使得性能有所下降 3)ZeroMQ:不支持 4)RocketMQ:支持 5)ActiveMQ:支持 8、维度6:负载均衡8.1 Kafka支持负载均衡。 1)一个broker通常就是一台服务器节点。 对于同一个Topic的不同分区,Kafka会尽力将这些分区分布到不同的Broker服务器上,zookeeper保存了broker、主题和分区的元数据信息。 分区首领会处理来自客户端的生产请求,Kafka分区首领会被分配到不同的broker服务器上,让不同的broker服务器共同分担任务。 每一个broker都缓存了元数据信息,客户端可以从任意一个broker获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。 2)Kafka的消费者组订阅同一个topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。 ...

June 21, 2019 · 3 min · jiezi

开源一个kafka增强okmq100

本工具的核心思想就是:赌。只有两个基础组件同时死亡,才会受到严重影响。哦,断电除外。mq是个好东西,我们都在用。这也决定了mq应该是高高高可用的。某团就因为这个组件,出了好几次生产事故,呵呵。 大部分业务系统,要求的消息语义都是at least once,即都会有重复消息,但保证不会丢。即使这样,依然有很多问题: 一、mq可用性无法保证。 mq的意外死亡,造成生产端发送失败。很多消息要通过扒取日志进行回放,成本高耗时长。 二、mq阻塞业务正常进行。 mq卡顿或者网络问题,会造成业务线程卡在mq的发送方法上,正常业务进行不下去,造成灾难性的后果。 三、消息延迟。 mq死了就用不着说了,消息还没投胎就已死亡。消息延迟主要是客户端消费能力不强,或者是消费通道单一造成的。 使用组合存储来保证消息的可靠投递,就是okmq。 注意:okmq注重的是可靠性。对于顺序性、事务等其他要素,不予考虑。当然,速度是必须的。设计想法我即使用两套redis来模拟一些mq操作,都会比现有的一些解决方案要强。但这肯定不是我们需要的,因为redis的堆积能力太有限,内存占用率直线上升的感觉并不太好。 但我们可以用redis来作为额外的发送确认机制。这个想法,在《使用多线程增加kafka消费能力》一文中曾经提到过,现在到了实现的时候了。 首先看下使用ApiOkmqKafkaProducer producer = new ProducerBuilder().defaultSerializer().eanbleHa("redis").any("okmq.redis.mode", "single").any("okmq.redis.endpoint", "127.0.0.1:6379").any("okmq.redis.poolConfig.maxTotal", 100).servers("localhost:9092").clientID("okMQProducerTest").build();Packet packet = new Packet();packet.setTopic("okmq-test-topic");packet.setContent("i will send you a msg");producer.sendAsync(packet, null);producer.shutdown();以redis为例我们按照数字标号来介绍: 1、 在消息发送到kafka之前,首先入库redis。由于后续回调需要用到一个唯一表示,我们在packet包里添加了一个uuid。 2、 调用底层的api,进行真正的消息投递。 3、 通过监听kafka的回调,删除redis中对应的key。在这里可以得到某条消息确切的的ack时间。那么长时间没有删除的,就算是投递失败的消息。 4、 后台会有一个线程进行这些失败消息的遍历和重新投递。我们叫做recovery。最复杂的也就是这一部分。对于redis来说,会首先争抢一个持续5min的锁,然后遍历相关hashkey。 所以,对于以上代码,redis发出以下命令: 1559206423.395597 [0 127.0.0.1:62858] "HEXISTS" "okmq:indexhash" "okmq:5197354"1559206423.396670 [0 127.0.0.1:62858] "HSET" "okmq:indexhash" "okmq:5197354" ""1559206423.397300 [0 127.0.0.1:62858] "HSET" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" "{\"content\":\"i will send you a msg104736623015238\",\"topic\":\"okmq-test-topic\",\"identify\":\"2b9b33fd-95fd-4cd6-8815-4c572f13f76e\",\"timestamp\":1559206423318}"1559206423.676212 [0 127.0.0.1:62858] "HDEL" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e"1559206428.327788 [0 127.0.0.1:62861] "SET" "okmq:recovery:lock" "01fb85a9-0670-40c3-8386-b2b7178d4faf" "px" "300000"1559206428.337930 [0 127.0.0.1:62858] "HGETALL" "okmq:indexhash"1559206428.341365 [0 127.0.0.1:62858] "HSCAN" "okmq:5197354" "0"1559206428.342446 [0 127.0.0.1:62858] "HDEL" "okmq:indexhash" "okmq:5197354"1559206428.342788 [0 127.0.0.1:62861] "GET" "okmq:recovery:lock"1559206428.343119 [0 127.0.0.1:62861] "DEL" "okmq:recovery:lock"以上问题解答所以对于以上的三个问题,回答如下:一、mq可用性无法保证。 ...

June 5, 2019 · 1 min · jiezi

消息中间件RocketMQ一-环境搭建完整版

每章一点正能量:自我控制是最强者的本能。——萧伯纳前言最近在学习消息中间件——RocketMQ,打算把这个学习过程记录下来。此章主要介绍环境搭建。此次主要是单机搭建(条件有限),包括在Windows、Linux环境下的搭建,以及console监控平台搭建,最后加一demo验证一下。 环境准备在搭建RocketMQ之前,请先确保如下环境已经搭建完毕 Java环境(我的JDK1.8)Maven环境(我的3.6.1目前最新版)Git环境没有搭建的同学走传送门: JDK环境搭建: JAVA8环境搭建Maven环境搭建: Windows环境下使用Nexus 3.X 搭建Maven私服及使用介绍Git环境搭建:Git环境搭建及配置 1. Windows环境下搭建1.1 下载官方网站:http://rocketmq.apache.org/ 目前最新版的是V4.5.0,点击进去。选择下载 rocketmq-all-4.5.0-bin-release.zip。弹出另外一个页面,这里选择rocketmq-all-4.5.0-bin-release.zip进行下载。下载成功后,选择一个目录放好并解压。 1.2 修改JVM配置以上操作完毕之后,进入目录bin目录,我这里是H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin。找到runserver.cmd和runbroker.cmd中的JAVA_OPT。原JAVA_OPT: set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"将 Xms Xmx 这两个值改小一些,改为1g,如: set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"自己根据虚拟机内存大小设置,超出内存大小可能会报错。1.3 配置环境变量上述步骤执行完毕后,我们需要将RocketMQ安装目录的bin目录配置到环境变量中。 1.4 启动以上配置都完成,接下来就是启动过程。中间有点坑,请务必按步骤安装。 在RocketMQ安装目录的bin目录下,执行命令cmd: 我的目录: H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin可以通过shift+鼠标右击 触发cmd窗口选项。也可以通过win+R 在窗口输入cmd,进入cmd窗口后移动到bin目录下。 1.4.1 启动NAMESERVER执行命令:start mqnamesrv.cmd成功后会弹出提示框,此框勿关闭。 1.4.3 启动BROKER执行命令:‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’注意:假如弹出提示框提示‘错误: 找不到或无法加载主类 xxxxxx’。打开runbroker.cmd,然后将‘%CLASSPATH%’加上英文双引号。打开 runbroker.cmd 进行修改原: set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"修改后: set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""再次执行命令:启动成功! 这时候一共有三个窗口。 ...

May 10, 2019 · 3 min · jiezi

本地RocketMQ的安装与调试

本地RocketMQ的安装与调试标签:【RocketMQ】1. 启动进入RocketMQ-ALL的源码项目。执行maven打包:mvn -Prelease-all -DskipTests clean install -U进入打包好的文件cd /Users/rtw/IdeaProjects/RocketMQ/distribution/target/apache-rocketmq/bin进入distribution/conf. 将其中的broker.conf 、 logback_broker.xml、logback_namesrv.xml复制到rocketmq_all/conf下。修改broker.conf 的配置:brokerClusterName = DefaultClusterbrokerName = broker-abrokerId = 0deleteWhen = 04fileReservedTime = 48brokerRole = ASYNC_MASTERflushDiskType = ASYNC_FLUSHnamesrvAddr=127.0.0.1:9876#存储路径storePathRootDir=/Users/rtw/IdeaProjects/RocketMQ/store#commitLog 存储路径storePathCommitLog=/Users/rtw/IdeaProjects/RocketMQ/store/commitlog#消费队列存储路径存储路径storePathConsumeQueue=/Users/rtw/IdeaProjects/RocketMQ/store/consumequeue#消息索引存储路径storePathIndex=/Users/rtw/IdeaProjects/RocketMQ/store/index#checkpoint 文件存储路径storeCheckpoint=/Users/rtw/IdeaProjects/RocketMQ/store/checkpoint#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVE brokerRole=ASYNC_MASTER#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#abort 文件存储路径abortFile=/Users/rtw/IdeaProjects/RocketMQ/store/abort运行org.apache.rocketmq.namesrv.NamesrvStartup,需要进行配置:添加Environment variables: ROCKETMQ_HOME=/Users/rtw/IdeaProjects/RocketMQ运行org.apache.rocketmq.broker.BrokerStartup,需要进行配置:添加Environment variables: ROCKETMQ_HOME=/Users/rtw/IdeaProjects/RocketMQ配置Program arguments,也就是项目的broker.conf的位置:-c /Users/rtw/IdeaProjects/RocketMQ/conf/broker.conf日志位置见RocketMQ/conf/logback_broker.xml.运行org.apache.rocketmq.example.quickstart.Producer创建消息,注意需要添加producer.setNamesrvAddr(“127.0.0.1:9876”);运行org.apache.rocketmq.example.quickstart.Consumer消费消息,注意添加consumer.setNamesrvAddr(“127.0.0.1:9876”);2. RocketMQRocketMQ是一款高性能消息中间件,其核心的优势:可靠的消息存储。消息发送的高性能与低延迟。强大的消息堆积能力与消息处理能力。严格的顺序消息存储。懂得取舍,消息中间件的理想状态是一条消息能且只被消费一次,但要做到这一点必然需要牺牲性能。RocketMQ保证消息至少被消费一次,但不承诺消息不会被消费者多次消费。其消息的幂等由消费者自己实现。2.1 设计理念使用NameServer,摒弃了业内常用的Zookeeper充当信息管理的“注册中心”。因为Topic路由信息无须在集群之间保持强一致性,追求最终一致性,并且能容忍分钟级的不一致。高效的IO存储机制。容忍设计缺陷,RocketMQ保证消息至少被消费一次,但不承诺消息不会被消费者多次消费。其消息的幂等由消费者自己实现。2.2 设计目标RocketMQ作为一款消息中间件,需要解决如下问题:架构模式:RocketMQ与大部分消息中间件一样,采用发布订阅模式,基本的参与组件主要包括:消息发送者、消息服务器(消息存储)、消息消费、路由发现。顺序消息:消息消费者按照消息达到消息存储服务器的顺序消费。消息过滤:RocketMQ消息过滤支持在 ==服务端== 与 ==消费端==的消息过滤机制。消息在Broker端过滤。Broker可以只将消息消费者感兴趣的消息发送给消息过滤机制。消息在消费端过滤,消息过滤方式完全由消息消费者自定义,但缺点是有很多无效消息会从Broker传输到消费者。消息存储:消息的堆积能力 和 消息存储性能。RocketMQ追求消息存储的高性能,引入内存映射机制,所有有主题的消息顺存储在同一个文件中。消息高可用性:消息到达(消费)低延迟:确保消息必须被消费一次:RocketMQ通过消息消费确认机制(ACK)来确保消息至少被消费一次,但由于ACK消息有可能丢失等其他原因,==RocketMQ无法做到消息只被消费一次,有重复消费的可能。==3. 启动参数brokerRole=?;SYNC_MASTER、ASYNC_MASTER、SLAVE。SYNC和ASYNC表示Master和Slave同步消息设置,SYNC的意思是当Slave和Master消息同步完成后,再返回发送成功的状态。

April 10, 2019 · 1 min · jiezi

深度解析RocketMQ Topic的创建机制

我还记得第一次使用rocketmq的时候,需要去控制台预先创建topic,我当时就想为什么要这么设计,于是我决定撸一波源码,带大家从根源上吃透rocketmq topic的创建机制。topic在rocketmq的设计思想里,是作为同一个业务逻辑消息的组织形式,它仅仅是一个逻辑上的概念,而在一个topic下又包含若干个逻辑队列,即消息队列,消息内容实际是存放在队列中,而队列又存储在broker中,下面我用一张图来说明topic的存储模型:其实rocketmq中存在两种不同的topic创建方式,一种是我刚刚说的预先创建,另一种是自动创建,下面我开车带大家从源码的角度来详细地解读这两种创建机制。自动创建默认情况下,topic不用手动创建,当producer进行消息发送时,会从nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么会默认拉取broker启动时默认创建好名为“TBW102”的Topic:org.apache.rocketmq.common.MixAll:// Will be created at broker when isAutoCreateTopicEnablepublic static final String AUTO_CREATE_TOPIC_KEY_TOPIC = “TBW102”;自动创建的开关配置在BrokerConfig中,通过autoCreateTopicEnable字段进行控制,org.apache.rocketmq.common.BrokerConfig:@ImportantFieldprivate boolean autoCreateTopicEnable = true;在broker启动时,会调用TopicConfigManager的构造方法,autoCreateTopicEnable打开后,会将“TBW102”保存到topicConfigTable中:org.apache.rocketmq.broker.topic.TopicConfigManager#TopicConfigManager:// MixAll.AUTO_CREATE_TOPIC_KEY_TOPICif (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}broker会通过发送心跳包将topicConfigTable的topic信息发送给nameserver,nameserver将topic信息注册到RouteInfoManager中。继续看消息发送时是如何从nameserver获取topic的路由信息:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 生产者第一次发送消息,topic在nameserver中并不存在 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { // 第二次请求会将isDefault=true,开启默认“TBW102”从namerserver获取路由信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; }}如上方法,topic首次发送消息,此时并不能从namserver获取topic的路由信息,那么接下来会进行第二次请求namserver,这时会将isDefault=true,开启默认“TBW102”从namerserver获取路由信息,此时的“TBW102”topic已经被broker默认注册到nameserver了:org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:if (isDefault && defaultMQProducer != null) { // 使用默认的“TBW102”topic获取路由信息 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } }}如果isDefault=true并且defaultMQProducer不为空,从nameserver中获取默认路由信息,此时会获取所有已开启自动创建开关的broker的默认“TBW102”topic路由信息,并保存默认的topic消息队列数量。org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:TopicRouteData old = this.topicRouteTable.get(topic);boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic);} else { log.info(“the topic[{}] route info changed, old[{}] ,new[{}]”, topic, old, topicRouteData);}从本地缓存中取出topic的路由信息,由于topic是第一次发送消息,这时本地并没有该topic的路由信息,所以对比该topic路由信息对比“TBW102”时changed为true,即有变化,进入以下逻辑:org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:// Update sub info{ Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } }}将“TBW102”topic路由信息构建TopicPublishInfo,并将用topic为key,TopicPublishInfo为value更新本地缓存,到这里就明白了,原来broker们千辛万苦创建“TBW102”topic并将其路由信息注册到nameserver,被新来的topic获取后立即用“TBW102”topic的路由信息构建出一个TopicPublishInfo并且据为己有,由于TopicPublishInfo的路由信息时默认“TBW102”topic,因此真正要发送消息的topic也会被负载发送到“TBW102”topic所在的broker中,这里我们可以将其称之为偷梁换柱的做法。当broker接收到消息后,会在msgCheck方法中调用createTopicInSendMessageMethod方法,将topic的信息塞进topicConfigTable缓存中,并且broker会定时发送心跳将topicConfigTable发送给nameserver进行注册。自动创建与消息发送时获取topic信息的时序图:预先创建其实这个叫预先创建似乎更加适合,即预先在broker中创建好topic的相关信息并注册到nameserver中,然后client端发送消息时直接从nameserver中获取topic的路由信息,但是手动创建从动作上来将更加形象通俗易懂,直接告诉你,你的topic信息需要在控制台上自己手动创建。预先创建需要通过mqadmin提供的topic相关命令进行创建,执行:./mqadmin updateTopic官方给出的各项参数如下:usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]-t <arg> [-u <arg>] [-w <arg>]-b,–brokerAddr <arg> create topic to which broker-c,–clusterName <arg> create topic to which cluster-h,–help Print help-n,–namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876-o,–order <arg> set topic’s order(true|false-p,–perm <arg> set topic’s permission(2|4|6), intro[2:W 4:R; 6:RW]-r,–readQueueNums <arg> set read queue nums-s,–hasUnitSub <arg> has unit sub (true|false-t,–topic <arg> topic name-u,–unit <arg> is unit topic (true|false-w,–writeQueueNums <arg> set write queue nums我们直接定位到其实现类执行命令的方法:通过broker模式创建:org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:// -b,–brokerAddr <arg> create topic to which brokerif (commandLine.hasOption(‘b’)) { String addr = commandLine.getOptionValue(‘b’).trim(); defaultMQAdminExt.start(); defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); return;}从commandLine命令行工具获取运行时-b参数重的broker的地址,defaultMQAdminExt是默认的rocketmq控制台执行的API,此时调用start方法,该方法创建了一个mqClientInstance,它封装了netty通信的细节,接着就是最重要的一步,调用createAndUpdateTopicConfig将topic配置信息发送到指定的broker上,完成topic的创建。通过集群模式创建:org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:// -c,–clusterName <arg> create topic to which clusterelse if (commandLine.hasOption(‘c’)) { String clusterName = commandLine.getOptionValue(‘c’).trim(); defaultMQAdminExt.start(); Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : masterSet) { defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); System.out.printf(“create topic to %s success.%n”, addr); } return;}通过集群模式创建与通过broker模式创建的逻辑大致相同,多了根据集群从nameserver获取集群下所有broker的master地址这个步骤,然后在循环发送topic信息到集群中的每个broker中,这个逻辑跟指定单个broker是一致的。这也说明了当用集群模式去创建topic时,集群里面每个broker的queue的数量相同,当用单个broker模式去创建topic时,每个broker的queue数量可以不一致。预先创建时序图:何时需要预先创建Topic?建议线下开启,线上关闭,不是我说的,是官方给出的建议:rocketmq为什么要这么设计呢?经过一波源码深度解析后,我得到了我想要的答案:根据上面的源码分析,我们得出,rocketmq在发送消息时,会先去获取topic的路由信息,如果topic是第一次发送消息,由于nameserver没有topic的路由信息,所以会再次以“TBW102”这个默认topic获取路由信息,假设broker都开启了自动创建开关,那么此时会获取所有broker的路由信息,消息的发送会根据负载算法选择其中一台Broker发送消息,消息到达broker后,发现本地没有该topic,会在创建该topic的信息塞进本地缓存中,同时会将topic路由信息注册到nameserver中,那么这样就会造成一个后果:以后所有该topic的消息,都将发送到这台broker上,如果该topic消息量非常大,会造成某个broker上负载过大,这样消息的存储就达不到负载均衡的目的了。扫面下方二维码,关注我的公众号,开车带你临摹各种源码,来不及解释了快上车! ...

March 31, 2019 · 2 min · jiezi

RocketMQ 存储文件

磁盘存储文件-rw-r–r– 1 root root 0 Jan 18 11:54 abort-rw-r–r– 1 root root 4.0K Mar 14 17:39 checkpointdrwxr-xr-x 2 root root 34 Feb 14 14:33 commitlogdrwxr-xr-x 2 root root 280 Mar 14 17:40 configdrwxr-xr-x 7 root root 138 Feb 20 10:28 consumequeuedrwxr-xr-x 2 root root 31 Feb 20 10:18 index-rw-r–r– 1 root root 4 Feb 20 10:18 lockabort:RocketMQ 启动时生成,正常关闭时删除,如果启动时存在该文件,代表 RocketMQ 被异常关闭checkpoint:文件检查点,存储 commitlog 、consumequeue、indexfile 最后一次刷盘时间或时间戳index:消息索引文件存储目录consumequeue:消息消费队列存储目录commitlog:消息存储目录config:运行时的配置信息,包含主席消息过滤信息、集群消费模式消息消费进度、延迟消息队列拉取进度、消息消费组配置信息、topic配置属性等CommitLog 文件drwxr-xr-x 2 root root 34 Feb 14 14:33 .drwxr-xr-x 6 root root 113 Feb 14 13:59 ..-rw-r–r– 1 root root 1.0G Mar 14 17:48 00000000000000000000-rw-r–r– 1 root root 1.0G Mar 19 21:33 00000000006000000000文件名为 20 位数字组织,以该文件第一条消息的偏移量为文件名,长度不足 20 的在前面补 0。文件默认大小为 1G,可根据 mappedFileSizeCommitLog 属性改变文件大小。存储所有消息内容,写满一个文件后生成新的 commitlog 文件。所有 topic 的数据存储在一起,逻辑视图如下:ConsumeQueue 文件RocketMQ 基于主题的订阅模式实现消息消费,由于同一主题的消息不连续的存储在 CommitLog 文件中,遍历 CommitLog 文件会导致效率非常低下,为了适应消息消费的检索需求,设计了消息消费队列文件。一个 ConsumeQueue 文件可以作为检索指定 topic 的消息索引。[root@xxxx consumequeue]# tree -L 3|– smsCallbackReply| |– 0| | -- 00000000000000000000| |-- 1| | – 00000000000000000000| |– 2| | -- 00000000000000000000| – 3| -- 00000000000000000000|-- smsCallbackStatus| |-- 0| | |-- 00000000000000000000| | – 00000000000006000000| |– 1| | |– 00000000000000000000| | -- 00000000000006000000| |-- 2| | |-- 00000000000000000000| | – 00000000000006000000| -- 3| |-- 00000000000000000000| – 00000000000006000000ConsumeQueue 文件存储消息的逻辑偏移量,而不存储消息的全部内容,存储格式如下:Index 索引文件RocketMQ 引入了 Hash 索引机制为消息建立索引,对 CommitLog 进行数据索引。索引文件布局如下:IndexHead 数据:beginTimestamp:该索引文件包含消息的最小存储时间endTimestamp:该索引文件包含消息的最大存储时间beginPhyoffset:该索引文件中包含消息的最小物理偏移量(commitlog 文件偏移量)endPhyoffset:该索引文件中包含消息的最大物理偏移量(commitlog 文件偏移量)hashSlotCount:hashslot个数,并不是 hash 槽使用的个数,在这里意义不大,indexCount:已使用的 Index 条目个数Hash 槽:一个 IndexFile 默认包含 500W 个 Hash 槽,每个 Hash 槽存储的是落在该 Hash 槽的 hashcode 最新的 Index 的索引Index 条目列表hashcode:key 的 hashcodephyoffset:消息对应的物理偏移量timedif:该消息存储时间与第一条消息的时间戳的差值,小于 0 表示该消息无效preIndexNo:该条目的前一条记录的 Index 索引,hash 冲突时,根据该值构建链表结构Checkpoint 文件记录 CommitLog,ConsumeQueue,IndexFile 的刷盘时间点,文件固定长度为 4k,其中只用该文件的前 24个字节,存储格式如下: ...

March 15, 2019 · 2 min · jiezi

RocketMQ 在平安银行的实践和应用

随着互联网金融业务和相关技术的不断发展,传统金融行业为满足业务快速发展需求,正在积极引入各类开源技术,以快速抢占市场。那么,以金融和科技作为双驱动的平安银行在开源技术的引入方面是如何评估,运用到哪些业务场景,以及面对复杂的网络环境,是如何去部署的呢?本文将以 Apache RocketMQ 为例,和您一起了解平安银行在开源技术选型方面的思考和实践。RocketMQ 在平安银行的应用场景;复杂网络环境下的部署实践;多隔离区场景下的部署情况;多 IDC 场景下的部署情况;改造实践和遇到的小插曲;RocketMQ 在平安银行的应用场景目前,平安银行通过 RocketMQ 解决了数据预加、信息通知和状态变化方面的业务需求,接下来,我们通过 App 登录、资产总览和工资理财 3 个应用场景来展开讲下。App 登录:当用户打开平安银行 App 的时候,系统会根据用户的登录 ID 去加载相应的用户数据,比如银行卡、信用卡和理财产品等,以及一些系统通知。这个场景下,我们用到了 RocketMQ 的异步处理能力,即预加载需要使用的数据,提升用户体验。资产总览:进入平安银行 App 资产总览的页面,页面显示账户余额、各类理财产品(黄金、基金和股票等),以及贷款等方面的信息。平安银行对接了各类基金、黄金和股票等来自不同金融主体、不同系统的数据,具有种类多、异构系统多和变化快的特点。我们的目标就是让资产总览数据尽可能准确,不同种类的资产变动的时候发出通知,资产系统收到通知后,根据变化的数据来计算出用户当前的资产总览。工资理财:工资理财是指每月工资下发到银行卡后,系统可以实现自动买入用户设置好的理财产品,例如买一些定投类的理财产品。这里信息的传递流程是:银行卡里的余额出现变动,系统把变动信息发送到消息引擎Consumer 端进行消费,通知用户账户已经出现变化;系统判断变动是否来源于代发工资;如果是,系统会再发送一条消息;理财的 Consumer 进行消费;判断现在是不是应该买入工资理财类的产品;如果是,自动买入用户设定好的理财产品;自动买入之后,余额再次变动,系统会再一次通知,这一次通知,判断的就是一些其他的逻辑了。那么,在这些场景中,我们对消息引擎会有哪些要求呢?A、高可用、高可靠和高性能,这是金融行业引入开源技术的基本要求;B、堆积能力,代发工资的用户很多,一个公司的员工会在某个时间点集中发放;C、顺序能力,即账户变动在先,发出通知在后;D、事务性能力,如果没有事务性,有可能会出现账户变动了,但没有触发购买理财产品的情况;E、重试和延迟消息功能,比如工资发放的时候,可能是在晚上,这时候系统会自动把购买理财的动作放在第二天白天去操作,再发出通知;F、消息回溯能力,就是出现问题的时候,我们可以把这个消息进行回溯,重新进行消息的处理,提供完整的消息轨迹;在技术选型的过程中,RocketMQ 符合我们在这些典型使用场景中对消息产品的需求,在引入的过程中,平安银行也对社区做了相应的贡献。复杂网络环境下的部署实践多测试子环境下的服务调用场景平安银行有多套测试子环境,用来对不同的feature进行测试,即图中的 FAT、FAT001、FAT002、FAT003等。传统银行系统由大型机时代向更面向互联网用户的零售时代转型过程中,不可避免微服务化,传统较集中式系统会被划分为较多的微服务,正常的情况如下图,服务 A 调用服务 B,服务 B 调用服务 C,服务 C 调用服务 D。随着业务的需求,新的 feature,我们需要对服务 A 和 B 进行改动。相比在FAT环境里去部署测试,更合适的方式是另起一套 FAT 环境,这里我们命名为 FAT001,把服务A和B部署上去,A 调用 B,B会调用原来 FAT 环境里的 C 和 D。此时,另一个新的需求,需要对服务 A 和 C 进行改动。如果直接发布到FAT 或 FAT001 肯定是不对的,会影响正在进行的测试,此时,我们会再起一套测试环境,命名为FAT002,发布服务 A 和 C。由于 FAT002 里没有服务 B,所以服务A要调用服务 B 就需要去 FAT 环境(FAT 定义为较稳定的测试子环境)。服务 B 调用服务 C 的时候,就不应该调用本环境的 C了,而是调动 FAT002 的 C 才能实现测试功能。再假设,系统同时还会有另外一个 feature 在测试 C 和 D,此时的调用逻辑也是一样的,系统调用服务 A 和 B 的时候是在 FAT,调用服务 C 和 D 的时候会到 FAT003 的环境。以上的服务调用场景是可以通过微服务框架解决的,进行全链路测试,但在生产环境中,用户的真实请求比测试环境中会更复杂一些。真实的用户请求过程我们看一下真实的用户请求。APP发起一个请求请求,进入网关,需要知道请求哪一个测试环境。通常的做法是:测试人员需要在APP上选好子环境,例如选择 FAT001,我们可以直接把请求 FAT001 的网关(每个环境网关单独部署),或者在requestheader上添加标识,让网关去区分它来源于哪一个环境(网关统一部署)。假如网关判断请求来源于 FAT002,那就会把分发给 FAT002环境进行处理。消息层面,如何处理这类用户请求以上是服务调用的请求路径,比较好理解,但到了消息队列上,问题会变得复杂些,假如一个 feature 只是更改了消费者,那如何把这条消息传递到改的消费者应用上去进行测试,而不被其它环境的消费者消费掉,这是我们需要去考虑的问题。来看下具体的情况,集群部署了 Broke A 和 Broke B,TopicA 分别部署在这两个Broker上。 此时,Producer Group A 向 Topic A 中写数据,Consumer Group A去消费,这种情况下是不会有任何问题。但如果新增一套 FAT001 的环境,要求 FAT001 发布的消息,只能由 FAT001 来消费,FAT 不能消费,这种情况下我们该怎么解决?在消息上面加一些路由、或是加一些Tag、Filter、消息的Property?这些都不能解决我们的问题。️每个子环境部署一套 RocketMQ?一方面成本太高,另一方面如果这个feture测试完成了,需要把相关的 应用再切回 FAT 进行处理,实现太过复杂。️我们想一下,多个 feature 同时进行测试,DB 会部署一套还是多套?首先一个 feature 不会更改所在的应用,一般来说 DB 是部署一套的,在数据库里面添加字段,来识别数据是来源于哪一个子环境,如果多套部署,不更改的应用取不到新部署的 DB 数据,无法进行全链路测试,所以同样的,我们也没有在每个子环境都部署一套 RocketMQ,而是部署统一部署,通过 RPC 路由把请求路由到正确的生产者集,改造消息路由算法把消息路由到正确的消费者进行处理。真实的用户请求过程在上图中生产者变更的场景下,默认的场景 FAT发送,FAT 消费 ,没有问题的,假设 FAT001 的生产者发布了,需要 FAT001 发送到MQ集群,FAT 是可以直接消费。在上图生产者和消费者同时变更的场景下,如果消费者在 FAT001也部署了应用,需要FAT消费者不能消费由FAT001产生的消息,而是由 FAT001的消费者去消费。我们的处理方式是在逻辑上把Topic A下面的Queue进行分组,相当于加了逻辑分组,如果消费者在 FAT001 有部署,我们会把 Queue 的分组扩大,在其默认设置的情况下再翻一倍,新增的 Queue 就是给到 FAT001 进行消费的。再来看看只有消费者变更的场景,如上图。假设有个feature只需要更改消费者,部署在 FAT001。也是可以通过逻辑分组的方式,实现生产者根据请求来源来发送消息到队列 FAT001 逻辑分组内的 Queue,从而只让部署在 FAT001 的消费者消费。通过以上 3 个场景,我们了解到添加逻辑分组的必要性,实现过程并不复杂。主要做了以下调整:️这个逻辑分组什么时候建立?新建 Topic 的时候,全部建好?还是 Consumer 上线/下线的时候动态的去进行调整?我们选择了动态创建的方式,这个过程中,我们添加了 Meta Server 进行元数据管理,进行动态创建:添加 Meta Service,管理的元数据包括 Producer、Consumer、Topic、Queue 和 Subenv等信息:调整 Producer,取Request Head 里面请求来源(FAT、FAT001、FAT002…),如果 Topic 对应的存在分组,选择分组的 Queue,否则发到默认分组呢的Queue;调整 Consumer,上线时判断应用部署的分组(FAT、FAT001、FAT002…),如果Topic不存在对应的分组,则新建;存在,则 rebalalce (新Consumer节点上线),下线时,判断该分组是否还存在 其它Consumer实例,若不存在,删除分组,否则 rebalalce(Consumer某一节点下线);多隔离区场景下的部署实践由于对安全上的重视,金融行业的网络环境相比其他行业会更复杂。整个隔离区分为以下几个部分:DMZ 区:外网可以直接访问,用于放置网关;Web 区:面向的是用户手机,或者网页上可以看到的功能应用;核心区:包含核心的调用逻辑功能,和交易、订单相关的核心应用,例如 DB 和存储;外联区:用于访问外网,有时候也会部署一些 Poxy 代理,因为内网不能直接访问外网,需要通过代理去访问外网;专用区域:对接基金、三方存管等外部系统。在金融行业,如果某个系统是闭环的,那必须要去做隔离;管理区:是指对整个区域的应用要进行集中管理,且数据流动是单向的,只能取数据,不能通过管理区把某区域的数据推送到另一区域。此外,从安全的角度出发,所有的区域在生产环境下都通过防火墙进行隔离,这就给我们部署 RocketMQ 带来了很大的实施难度。如果只部署一套,面对这么多的防火墙,生产者消费者到集群的的流量跨墙,会给网络带来极大的不稳定,遇到瓶颈,防火墙几乎无法在线平滑扩容;如果每个子环境都部署一套,又带来运维复杂度,而且还是存在数据同步和跨墙消费的问题。最终,我们采用的是折中的办法,即统一部署加分隔离区部署,这样做的益处是:防火墙是开大策略,保证安全的前提下,符合监管要求;针对跨隔离区消费的问题,我们采用复制的方式,模拟消费者重新写入目标集群;多IDC场景下的部署实践同城多IDC,可以认为近似局域网,比较好处理,但异地多IDC多活场景,目前我们还没有特别好的解方案,多活不可避免面临数据分片、数据合并和数据冲突的解决等问题。如果 Topic 下数据有多活需求,我们暂时通过复制方式来处理。但这类手工模拟消费者消费数据写入新集群的复制方式,会存在一些问题,即复制到另一个集群之后 offset 会改变,处理逻辑就会有偏差。我们通过 pull 的方式自定义的去根据 offset 去进行消费。当故障转移到新的集群需要去消费的时候,需要获取到新集群里面正确的offset 值。此时,这个值和之前集群里的已经不一样了,需要根据时间得到新集群里正确的offset 值,再去进行消费。在没有更好的解决方案前,治理就显得很关键了。不过,我们注意到,在 RocketMQ 最新发布的版本里,提供了 DLedger 的特性,DLedger 定位的是一个工业级的 Java Library,可以友好地嵌入各类 Java 系统中,满足其高可用、高可靠、强一致的需求。我们会尽快对这一特性进行集成和测试。改造实践和遇到的小插曲我们在对 RocketMQ 的使用过程中,添加了以下功能或特性:A. 为生产者提供消息的堆积能力。B. 将所有配置管理部署到配置中心,并做云端化处理,以满足动态修改的需求。C. 在 4.3 版本还未提供事务处理能力前,我们在本地数据库里去建一张消息表,数据库更改数据状态的时候,会同步将数据写入消息表。若发送失败,则进行补偿。并在客户端里进行封装。D. 实现统一的消息者幂等处理。E. 添加身份认证和消息认证(注:RocketMQ 4.3 版本中已经实现身份认证功能)当然,也遇到了一些小插曲,基本都是使用上的小问题,可能大家也会碰到:A. 一个应用使用多个RocketMQ集群时,未加载到正确的配置。在Client 端,如果没有对 instancename 进行配置,一个应用连多个集群会失败。B. 在大数据的场景下,内存溢出。订阅的 Topic 越多,每个 Queue 在本地缓存的 message 也会越多,默认每个 Queue 1000条,可能会把内存打爆,可根据实际情况调整。C. 删除数据时 IO 抖动,默认每天凌晨4点删除数据,量上来后出现 IO 抖动,配置了消息删除策略,默认逗号分隔开,多配几个时间删除就可以了。D. Broker上日志报延迟消息找不到数据文件。在主备切换的演练过程中,出现了延迟消息在 Broker 上处理异常的情况。当主切换到备端时,延迟消息在 Broker 上保存的文件被自动删除,再切回主,由于延时消息的元数据感觉在,会查询文件进行处理,此时已找不到文件。E. 挂 NAS 的时候,IP 获取了 NAS 的私网地址,并被提交给了服务端。以上就是我们在部署过程中遇到的一些小插曲,基本都是可以很快定位原因,解决的。总的来看,RocketMQ 对平安银行的消息系统建设的帮助是非常大的,尤其是满足了数据持久化、顺序消费和回溯的需求,此外,在消息的查询方面,也比我们之前使用的消息引擎好很多。最后再分享一点自己对中间件的一点感悟:中间件使用重在治理,规范不先行,开发两行泪。本文作者:吴建峰,GitHub ID @devilfeng,来自平安银行平台架构部基础框架团队。更多 RocketMQ 的实践案例:RocketMQ x 微众银行RocketMQ x 同程艺龙RocketMQ x 滴滴出行本文作者:中间件小哥阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

March 11, 2019 · 2 min · jiezi

从RocketMQ看长轮询(Long Polling)

前言消息队列一般在消费端都会提供push和pull两种模式,RocketMQ同样实现了这两种模式,分别提供了两个实现类:DefaultMQPushConsumer和DefaultMQPullConsumer;两种方式各有优势:push模式:推送模式,即服务端有数据之后立马推送消息给客户端,需要客户端和服务器建立长连接,实时性很高,对客户端来说也简单,接收处理消息即可;缺点就是服务端不知道客户端处理消息的能力,可能会导致数据积压,同时也增加了服务端的工作量,影响服务端的性能;pull模式:拉取模式,即客户端主动去服务端拉取数据,主动权在客户端,拉取数据,然后处理数据,再拉取数据,一直循环下去,具体拉取数据的时间间隔不好设定,太短可能会导致大量的连接拉取不到数据,太长导致数据接收不及时;RocketMQ使用了长轮询的方式,兼顾了push和pull两种模式的优点,下面首先对长轮询做简单介绍,进而分析RocketMQ内置的长轮询模式。长轮询长轮询通过客户端和服务端的配合,达到主动权在客户端,同时也能保证数据的实时性;长轮询本质上也是轮询,只不过对普通的轮询做了优化处理,服务端在没有数据的时候并不是马上返回数据,会hold住请求,等待服务端有数据,或者一直没有数据超时处理,然后一直循环下去;下面看一下如何简单实现一个长轮询;1.实现步骤1.1客户端轮询发送请求客户端应该存在一个一直循环的程序,不停的向服务端发送获取消息请求;1.2服务端处理数据服务器接收到客户端请求之后,首先查看是否有数据,如果有数据则直接返回,如果没有则保持连接,等待获取数据,服务端获取数据之后,会通知之前的请求连接来获取数据,然后返回给客户端;1.3客户端接收数据正常情况下,客户端会马上接收到服务端的数据,或者等待一段时间获取到数据;如果一直获取不到数据,会有超时处理;在获取数据或者超时处理之后会关闭连接,然后再次发起长轮询请求;2.实现实例以下使用netty模拟一个http服务器,使用HttpURLConnection模拟客户端发送请求,使用BlockingQueue存放数据;服务端代码public class Server { public static void start(final int port) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup woker = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); try { serverBootstrap.channel(NioServerSocketChannel.class).group(boss, woker) .childOption(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(“http-decoder”, new HttpServerCodec()); ch.pipeline().addLast(new HttpServerHandler()); } }); ChannelFuture future = serverBootstrap.bind(port).sync(); System.out.println(“server start ok port is " + port); DataCenter.start(); future.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); woker.shutdownGracefully(); } } public static void main(String[] args) throws Exception { start(8080); }}netty默认支持http协议,直接使用即可,启动端口为8080;同时启动数据中心服务,相关代码如下:public class DataCenter { private static Random random = new Random(); private static BlockingQueue<String> queue = new LinkedBlockingQueue<>(); private static AtomicInteger num = new AtomicInteger(); public static void start() { while (true) { try { Thread.sleep(random.nextInt(5) * 1000); String data = “hello world” + num.incrementAndGet(); queue.put(data); System.out.println(“store data:” + data); } catch (InterruptedException e) { e.printStackTrace(); } } } public static String getData() throws InterruptedException { return queue.take(); }}为了模拟服务端没有数据,需要等待的情况,这里使用BlockingQueue来模拟,不定期的往队列里面插入数据,同时对外提供获取数据的方法,使用的是take方法,没有数据会阻塞知道有数据为止;getData在类HttpServerHandler中使用,此类也很简单,如下:public class HttpServerHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpRequest) { FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); httpResponse.content().writeBytes(DataCenter.getData().getBytes()); httpResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, “text/plain; charset=UTF-8”); httpResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, httpResponse.content().readableBytes()); ctx.writeAndFlush(httpResponse); } }}获取到客户端的请求之后,从数据中心获取一条消息,如果没有数据,会进行等待,直到有数据为止;然后使用FullHttpResponse返回给客户端;客户端使用HttpURLConnection来和服务端建立连接,不停的拉取数据,代码如下:public class Client { public static void main(String[] args) { while (true) { HttpURLConnection connection = null; try { URL url = new URL(“http://localhost:8080”); connection = (HttpURLConnection) url.openConnection(); connection.setReadTimeout(10000); connection.setConnectTimeout(3000); connection.setRequestMethod(“GET”); connection.connect(); if (200 == connection.getResponseCode()) { BufferedReader reader = null; try { reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), “UTF-8”)); StringBuffer result = new StringBuffer(); String line = null; while ((line = reader.readLine()) != null) { result.append(line); } System.out.println(“时间:” + new Date().toString() + “result = " + result); } finally { if (reader != null) { reader.close(); } } } } catch (IOException e) { e.printStackTrace(); } finally { if (connection != null) { connection.disconnect(); } } } }}以上只是简单的模拟了长轮询的方式,下面重点来看看RocketMQ是如何实现长轮询的;RocketMQ长轮询RocketMQ的消费端提供了两种消费模式分别是:DefaultMQPushConsumer和DefaultMQPullConsumer,其中DefaultMQPushConsumer就是使用的长轮询,所以下面重点分析此类;1.PullMessage服务从名字可以看出来就是客户端从服务端拉取数据的服务,看里面的一个核心方法:@Override public void run() { log.info(this.getServiceName() + " service started”); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error(“Pull Message Service Run Method exception”, e); } } log.info(this.getServiceName() + " service end”); }服务启动之后,会一直不停的循环调用拉取数据,PullRequest可以看作是拉取数据需要的参数,部分代码如下:public class PullRequest { private String consumerGroup; private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; private boolean lockedFirst = false; …省略…}每个MessageQueue 对应了封装成了一个PullRequest,因为拉取数据是以每个Broker下面的Queue为单位,同时里面还一个ProcessQueue,每个MessageQueue也同样对应一个ProcessQueue,保存了这个MessageQueue消息处理状态的快照;还有nextOffset用来标识读取的位置;继续看一段pullMessage中的内容,给服务端发送请求的头内容:PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);}PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult;其中有一个参数是SuspendTimeoutMillis,作用是设置Broker的最长阻塞时间,默认为15秒,前提是没有消息的情况下,有消息会立刻返回;2.PullMessageProcessor服务从名字可以看出,服务端用来处理pullMessage的服务,下面重点看一下processRequest方法,其中包括对获取不同结果做的处理: switch (response.getCode()) { case ResponseCode.SUCCESS: …省略… break; case ResponseCode.PULL_NOT_FOUND: if (brokerAllowSuspend && hasSuspendFlag) { long pollingTimeMills = suspendTimeoutMillisLong; if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; } case ResponseCode.PULL_RETRY_IMMEDIATELY: break; case ResponseCode.PULL_OFFSET_MOVED: …省略… break; default: assert false;一共处理了四个类型,我们关心的是在没有获取到数据的情况下是如何处理的,可以重点看一下ResponseCode.PULL_NOT_FOUND,表示没有拉取到数据,此时会调用PullRequestHoldService服务,从名字可以看出此服务用来hold住请求,不会立马返回,response被至为了null,不给客户端响应;下面重点看一下PullRequestHoldService:@Override public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. “, e); } } log.info(”{} service end", this.getServiceName()); }此方法主要就是通过不停的检查被hold住的请求,检查是否已经有数据了,具体检查哪些就是在ResponseCode.PULL_NOT_FOUND中调用的suspendPullRequest方法:private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<String, ManyPullRequest>(1024); public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (null == mpr) { mpr = new ManyPullRequest(); ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); if (prev != null) { mpr = prev; } } mpr.addPullRequest(pullRequest); }将需要hold处理的PullRequest放入到一个ConcurrentHashMap中,等待被检查;具体的检查代码在checkHoldRequest中:private void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); try { this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error(“check hold request failed. topic={}, queueId={}”, topic, queueId, e); } } } }此方法用来获取指定messageQueue下最大的offset,然后用来和当前的offset来比较,来确定是否有新的消息到来;往下看notifyMessageArriving方法:public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { List<PullRequest> requestList = mpr.cloneListAndClear(); if (requestList != null) { List<PullRequest> replayList = new ArrayList<PullRequest>(); for (PullRequest request : requestList) { long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error(“execute request when wakeup failed.”, e); } continue; } } if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error(“execute request when wakeup failed.”, e); } continue; } replayList.add(request); } if (!replayList.isEmpty()) { mpr.addPullRequest(replayList); } } } }方法中两个重要的判定就是:比较当前的offset和maxoffset,看是否有新的消息到来,有新的消息返回客户端;另外一个就是比较当前的时间和阻塞的时间,看是否超过了最大的阻塞时间,超过也同样返回;此方法不光在PullRequestHoldService服务类中循环调用检查,同时在DefaultMessageStore中消息被存储的时候调用;其实就是主动检查和被动通知两种方式。3.PullCallback回调服务端处理完之后,给客户端响应,回调其中的PullCallback,其中在处理完消息之后,重要的一步就是再次把pullRequest放到PullMessageService服务中,等待下一次的轮询;总结本文首先介绍了两种消费消息的模式,介绍了其中的优缺点,然后引出了长轮询,并且在本地简单模拟了长轮询,最后重点介绍了RocketMQ中是如何实现的长轮询。示例代码地址GithubGitee ...

March 6, 2019 · 4 min · jiezi

RocketMQ生产者消息篇

系列文章RocketMQ入门篇RocketMQ生产者流程篇RocketMQ生产者消息篇前言上文RocketMQ生产者流程篇中详细介绍了生产者发送消息的流程,本文将重点介绍发送消息的通信模式以及各种不同的消息类型。通信模式RocketMQ提供了三种通讯模式,分别是:同步,异步和单向;可以查看内部类CommunicationMode:public enum CommunicationMode { SYNC, ASYNC, ONEWAY,}下面分别看一下三种通讯模式如何使用1.同步方式看一个简单的发送同步消息的实例:public class SyncProducer { public static void main(String[] args) throws Exception { System.setProperty(“rocketmq.namesrv.domain”, “localhost”); // 构造Producer DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName2”); // producer.setNamesrvAddr(“192.168.237.128:9876”); // 初始化Producer,整个应用生命周期内,只需要初始化1次 producer.start(); for (int i = 0; i < 1; i++) { Message msg = new Message(“TopicTest6”, “TagA”, (“Hello RocketMQ” + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } producer.shutdown(); }}最简单的直接指定一个message参数默认使用的就是同步方式发送消息,可以看到在发送完消息之后,会立马返回了发送结果SendResult:SendResult [sendStatus=SEND_OK, msgId=0A0D5307324873D16E9365360AC60000, offsetMsgId=0A0D530700002A9F0000000000001200, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=0]2.异步方式看一个简单的发送异步消息的实例:public class AsyncProducer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName2”); producer.setRetryTimesWhenSendAsyncFailed(3); producer.setNamesrvAddr(“192.168.237.128:9876”); producer.start(); for (int i = 0; i < 1; i++) { Message msg = new Message(“TopicTest6”, “TagA”, (“Hello RocketMQ” + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); } }}可以看到在发送消息时指定了SendCallback回调类,send发送方法返回值为void,发送成功之后会回调SendCallback的onSuccess方法,异常调用onException方法;发送成功日志如下:SendResult [sendStatus=SEND_OK, msgId=0A0D5307261473D16E936536591E0000, offsetMsgId=0A0D530700002A9F00000000000012B2, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=1]3.单向方式看一个简单的发送单向消息的实例:public class OneWayProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName2”); producer.setNamesrvAddr(“192.168.237.128:9876”); producer.start(); for (int i = 0; i < 1; i++) { Message msg = new Message(“TopicTest6”, “TagA”, (“Hello RocketMQ” + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } producer.shutdown(); }}单向发送消息发送之后没有响应,但是在消费端可以收到消息,如下所示:ConsumeMessageThread_6Receive New Messages :[MessageExt [queueId=2, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1550648529526, bornHost=/10.13.83.7:54213, storeTimestamp=1550648529530, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000001416, commitLogOffset=5142, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1550648529533, UNIQ_KEY=0A0D530747DC73D16E93653766750000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId=‘null’}]]4.发送状态以上同步和异步实例显示的发送状态都是SEND_OK,除了此状态还有其他三个状态:FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT和SLAVE_NOT_AVAILABLE;具体可以查看内部类SendStatus:public enum SendStatus { SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, SLAVE_NOT_AVAILABLE,}FLUSH_DISK_TIMEOUT:刷盘超时,Broker设置的刷盘策略为SYNC_FLUSH才可能出现此错误;FLUSH_SLAVE_TIMEOUT:主从同步超时,Broker设置了slave,并且指定同步策略为SYNC_Master;SLAVE_NOT_AVAILABLE:找不到salve,同样是Broke指定同步策略为SYNC_Master;SEND_OK:表示发送成功,以上情况都没有出现。注:必要时需要对各种异常场景进行处理,保证高质量的生产者。消息类型在RocketMQ的生产者端可以发送多种类型的消息包括:延迟消息,顺序消息以及事务消息,下面分别进行实例分析;1.延迟消息RocketMQ支持发送延迟消息,Broker收到消息后会延迟一段时间在处理,具体使用看如下代码:Message msg = new Message(“TopicTest6”, “TagA”, (“Hello RocketMQ” + i).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.setDelayTimeLevel(3);可以直接设置延迟时间等级,具体有哪些等级,以及每个等级对应的时间可以查看类MessageStoreConfigprivate String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;类中的messageDelayLevel 变量包含了所有可以延迟的时间,使用空格分离,所以这里等级为3,其实对应的延迟时间就是10秒;分别观察生产者和消费者的日志如下:生产者发送消息日志如下:Time [Wed Feb 20 18:25:32 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307268073D16E9365CCFA9E0000, offsetMsgId=0A0D530700002A9F0000000000001F42, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=1], queueOffset=4]消费者接收日志如下:Time [Wed Feb 20 18:25:42 CST 2019],ConsumeMessageThread_3Receive New Messages :[MessageExt [queueId=1, storeSize=219, queueOffset=2, sysFlag=0, bornTimestamp=1550658332318, bornHost=/10.13.83.7:55682, storeTimestamp=1550658342322, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002026, commitLogOffset=8230, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=3, CONSUME_START_TIME=1550658342325, UNIQ_KEY=0A0D5307268073D16E9365CCFA9E0000, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId=‘null’}]]可以发现发送消息的时间和接收消息的时间相差10秒;2.顺序消息顺序消息指生产者生产数据的顺序和消费者消费数据的顺序是一致的;顺序消息包括全局顺序消息和局部顺序消息,全局顺序消息指在某个Topic下所有消息都是顺序的,局部顺序消息指在Topic下的Message Queue中是顺序的;2.1全局顺序消息RocketMQ在默认情况下并不能保证有序,一个Topic下会指定多个读写队列,生产者会将消息写入任意的Message Queue中,同样消费者可能会启动多个线程同时处理数据,所以并不能保证顺序;如何保证全局顺序需要只有一个读队列一个写队列,然后需要保证生产者和消费者不能并发处理,以下做一个简单的实例验证;生产者顺序的发送5条数据:Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_4Receive New Messages :[MessageExt [queueId=1, storeSize=219, queueOffset=3, sysFlag=0, bornTimestamp=1550713381399, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391403, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002575, commitLogOffset=9589, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=4, CONSUME_START_TIME=1550713391405, UNIQ_KEY=0A0D5307418073D16E936914F6160000, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId=‘null’}]]Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_5Receive New Messages :[MessageExt [queueId=2, storeSize=219, queueOffset=2, sysFlag=0, bornTimestamp=1550713381450, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391451, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002650, commitLogOffset=9808, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=3, CONSUME_START_TIME=1550713391453, UNIQ_KEY=0A0D5307418073D16E936914F64A0001, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId=‘null’}]]Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_6Receive New Messages :[MessageExt [queueId=3, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381455, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391460, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F000000000000272B, commitLogOffset=10027, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391463, UNIQ_KEY=0A0D5307418073D16E936914F64F0002, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId=‘null’}]]Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_7Receive New Messages :[MessageExt [queueId=0, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381462, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391464, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002806, commitLogOffset=10246, bodyCRC=855693371, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391490, UNIQ_KEY=0A0D5307418073D16E936914F6560003, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 51], transactionId=‘null’}]]Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_8Receive New Messages :[MessageExt [queueId=1, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381466, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391469, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000028E1, commitLogOffset=10465, bodyCRC=761548184, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391501, UNIQ_KEY=0A0D5307418073D16E936914F65A0004, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 52], transactionId=‘null’}]]可以发现这5条数据分别写入了4个Message Queue中;再看一下消费者日志:Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_4Receive New Messages :[MessageExt [queueId=1, storeSize=219, queueOffset=3, sysFlag=0, bornTimestamp=1550713381399, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391403, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002575, commitLogOffset=9589, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=4, CONSUME_START_TIME=1550713391405, UNIQ_KEY=0A0D5307418073D16E936914F6160000, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId=‘null’}]]Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_5Receive New Messages :[MessageExt [queueId=2, storeSize=219, queueOffset=2, sysFlag=0, bornTimestamp=1550713381450, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391451, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002650, commitLogOffset=9808, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=3, CONSUME_START_TIME=1550713391453, UNIQ_KEY=0A0D5307418073D16E936914F64A0001, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId=‘null’}]]Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_6Receive New Messages :[MessageExt [queueId=3, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381455, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391460, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F000000000000272B, commitLogOffset=10027, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391463, UNIQ_KEY=0A0D5307418073D16E936914F64F0002, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId=‘null’}]]Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_7Receive New Messages :[MessageExt [queueId=0, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381462, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391464, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002806, commitLogOffset=10246, bodyCRC=855693371, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391490, UNIQ_KEY=0A0D5307418073D16E936914F6560003, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 51], transactionId=‘null’}]]Time [Thu Feb 21 09:43:11 CST 2019],ConsumeMessageThread_8Receive New Messages :[MessageExt [queueId=1, storeSize=219, queueOffset=4, sysFlag=0, bornTimestamp=1550713381466, bornHost=/10.13.83.7:55720, storeTimestamp=1550713391469, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000028E1, commitLogOffset=10465, bodyCRC=761548184, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest6, MAX_OFFSET=5, CONSUME_START_TIME=1550713391501, UNIQ_KEY=0A0D5307418073D16E936914F65A0004, WAIT=true, DELAY=3, TAGS=TagA, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 52], transactionId=‘null’}]]消费者启动了5个线程同时从4个Message Queue中读取数据,所有不能保证数据的顺序性;分别做如下改造:设置Topic的读写队列分别为1,可以直接去RocketMQ-console去修改配置;然后设置消费者的处理线程数为1:consumer.setConsumeThreadMin(1);consumer.setConsumeThreadMax(1);再次测试,生产者同样发送5条数据:Time [Thu Feb 21 09:59:49 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D53071A0873D16E93692457700000, offsetMsgId=0A0D530700002A9F0000000000002EF5, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=11]Time [Thu Feb 21 09:59:49 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D53071A0873D16E93692457790001, offsetMsgId=0A0D530700002A9F0000000000002FA7, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=12]Time [Thu Feb 21 09:59:49 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D53071A0873D16E936924577F0002, offsetMsgId=0A0D530700002A9F0000000000003059, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=13]Time [Thu Feb 21 09:59:49 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D53071A0873D16E93692457850003, offsetMsgId=0A0D530700002A9F000000000000310B, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=14]Time [Thu Feb 21 09:59:49 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D53071A0873D16E93692457880004, offsetMsgId=0A0D530700002A9F00000000000031BD, messageQueue=MessageQueue [topic=TopicTest6, brokerName=broker-a, queueId=0], queueOffset=15]可以发送所有的消息都写入了相同的队列,然后看消费者日志:Time [Thu Feb 21 09:59:49 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=11, sysFlag=0, bornTimestamp=1550714389360, bornHost=/10.13.83.7:58033, storeTimestamp=1550714389364, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002EF5, commitLogOffset=12021, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=12, CONSUME_START_TIME=1550714389372, UNIQ_KEY=0A0D53071A0873D16E93692457700000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId=‘null’}]]Time [Thu Feb 21 09:59:49 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=12, sysFlag=0, bornTimestamp=1550714389369, bornHost=/10.13.83.7:58033, storeTimestamp=1550714389371, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000002FA7, commitLogOffset=12199, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=13, CONSUME_START_TIME=1550714389379, UNIQ_KEY=0A0D53071A0873D16E93692457790001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId=‘null’}]]Time [Thu Feb 21 09:59:49 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=13, sysFlag=0, bornTimestamp=1550714389375, bornHost=/10.13.83.7:58033, storeTimestamp=1550714389379, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000003059, commitLogOffset=12377, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=15, CONSUME_START_TIME=1550714389385, UNIQ_KEY=0A0D53071A0873D16E936924577F0002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId=‘null’}]]Time [Thu Feb 21 09:59:49 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=14, sysFlag=0, bornTimestamp=1550714389381, bornHost=/10.13.83.7:58033, storeTimestamp=1550714389382, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F000000000000310B, commitLogOffset=12555, bodyCRC=855693371, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=15, CONSUME_START_TIME=1550714389385, UNIQ_KEY=0A0D53071A0873D16E93692457850003, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 51], transactionId=‘null’}]]Time [Thu Feb 21 09:59:49 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=15, sysFlag=0, bornTimestamp=1550714389384, bornHost=/10.13.83.7:58033, storeTimestamp=1550714389386, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000031BD, commitLogOffset=12733, bodyCRC=761548184, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest6’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=16, CONSUME_START_TIME=1550714389409, UNIQ_KEY=0A0D53071A0873D16E93692457880004, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 52], transactionId=‘null’}]]可以发送所有的消息都被同一个线程处理,并且从同一个Message Queue中读取数据,可以保证数据的顺序性;2.2局部顺序消息生产者需要将相关业务的消息发送到同一个Message Queue,在消费端需要保证同一个Message Queue读取的消息不能被并发处理;生产者发送消息给同一个Message Queue可以通过MessageQueueSelector来实现:SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return mqs.get(0); }}, i);通过在select放在中为msg指定固定的Message Queue,这里为了方便给所有的消息都指定第0个队列;消费者保证同一个Message Queue读取的消息不能被并发处理,通过MessageListenerOrderly实现:consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { System.out.printf(“Time [” + new Date().toString() + “],” +Thread.currentThread().getName() + “Receive New Messages :” + msgs + “%n”); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start();简单测试一下,分开看一下生产者和消费者日志:Time [Thu Feb 21 10:45:43 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307463C73D16E93694E5D8B0000, offsetMsgId=0A0D530700002A9F00000000000035E9, messageQueue=MessageQueue [topic=TopicTest7, brokerName=broker-a, queueId=0], queueOffset=5]Time [Thu Feb 21 10:45:43 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307463C73D16E93694E5D940001, offsetMsgId=0A0D530700002A9F000000000000369B, messageQueue=MessageQueue [topic=TopicTest7, brokerName=broker-a, queueId=0], queueOffset=6]Time [Thu Feb 21 10:45:43 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307463C73D16E93694E5D980002, offsetMsgId=0A0D530700002A9F000000000000374D, messageQueue=MessageQueue [topic=TopicTest7, brokerName=broker-a, queueId=0], queueOffset=7]Time [Thu Feb 21 10:45:43 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307463C73D16E93694E5D9D0003, offsetMsgId=0A0D530700002A9F00000000000037FF, messageQueue=MessageQueue [topic=TopicTest7, brokerName=broker-a, queueId=0], queueOffset=8]Time [Thu Feb 21 10:45:43 CST 2019],SendResult [sendStatus=SEND_OK, msgId=0A0D5307463C73D16E93694E5D9F0004, offsetMsgId=0A0D530700002A9F00000000000038B1, messageQueue=MessageQueue [topic=TopicTest7, brokerName=broker-a, queueId=0], queueOffset=9]Time [Thu Feb 21 10:45:56 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=5, sysFlag=0, bornTimestamp=1550717143436, bornHost=/10.13.83.7:63916, storeTimestamp=1550717143440, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000035E9, commitLogOffset=13801, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest7’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, UNIQ_KEY=0A0D5307463C73D16E93694E5D8B0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId=‘null’}]]Time [Thu Feb 21 10:45:56 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=6, sysFlag=0, bornTimestamp=1550717143444, bornHost=/10.13.83.7:63916, storeTimestamp=1550717143445, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F000000000000369B, commitLogOffset=13979, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest7’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, UNIQ_KEY=0A0D5307463C73D16E93694E5D940001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId=‘null’}]]Time [Thu Feb 21 10:45:56 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=7, sysFlag=0, bornTimestamp=1550717143448, bornHost=/10.13.83.7:63916, storeTimestamp=1550717143451, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F000000000000374D, commitLogOffset=14157, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest7’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, UNIQ_KEY=0A0D5307463C73D16E93694E5D980002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId=‘null’}]]Time [Thu Feb 21 10:45:56 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=8, sysFlag=0, bornTimestamp=1550717143453, bornHost=/10.13.83.7:63916, storeTimestamp=1550717143454, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000037FF, commitLogOffset=14335, bodyCRC=855693371, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest7’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, UNIQ_KEY=0A0D5307463C73D16E93694E5D9D0003, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 51], transactionId=‘null’}]]Time [Thu Feb 21 10:45:56 CST 2019],ConsumeMessageThread_1Receive New Messages :[MessageExt [queueId=0, storeSize=178, queueOffset=9, sysFlag=0, bornTimestamp=1550717143455, bornHost=/10.13.83.7:63916, storeTimestamp=1550717143456, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F00000000000038B1, commitLogOffset=14513, bodyCRC=761548184, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest7’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, UNIQ_KEY=0A0D5307463C73D16E93694E5D9F0004, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 52], transactionId=‘null’}]]3.事务消息事务消息是指RocketMQ发送的消息和其他本地事件需要同时成功同时失败,可以理解为就是分布式事务;RocketMQ处理事务消息的大致流程如下:1.生产者发送"待确认"消息;2.RocketMQ接收到消息进行相关保存操作,成功以后返回状态给生产者;3.生产者接收到的返回如果为SEND_OK状态,将执行本地事务操作;4.根据本地事务执行的结果,生产者执行commit还是rollback;5.如果第四步生产者执行操作失败,服务器会在经过固定时间段对状态为"待确认"的消息发起回查请求;6.生产者接收到回查请求后根据本地事务的结果返回commit还是rollback;7.服务器收到结果后执行相关操作。接下来看一下官方提供的实例TransactionProducerTest生产者类,类似DefaultMQProducer,主要设置了一个事务监听器类TransactionListener,用于开始本地事务会给服务器的回查接口;public class TransactionProducerTest { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer(“transactionProducerGroupName”); producer.setNamesrvAddr(“192.168.237.128:9876”); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName(“client-transaction-msg-check-thread”); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] { “TagA”, “TagB”, “TagC”, “TagD”, “TagE” }; for (int i = 0; i < 1; i++) { try { Message msg = new Message(“TopicTest1234”, tags[i % tags.length], “KEY” + i, (“Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); System.out.println(“start send message " + msg); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n”, sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); }}还有一个监听器类TransactionListenerImplpublic class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println(“executeLocalTransaction”); int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); System.out.println(“checkLocalTransaction:status = " + status); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; }}以上监听器需要实现两个接口方法,分别是执行本地事务的方法和用于被服务器回调的方法;运行以上生产者相关日志如下:start send message Message{topic=‘TopicTest1234’, flag=0, properties={KEYS=KEY0, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId=‘null’}executeLocalTransactionSendResult [sendStatus=SEND_OK, msgId=0A0D5307383473D16E936A31CF040000, offsetMsgId=null, messageQueue=MessageQueue [topic=TopicTest1234, brokerName=broker-a, queueId=2], queueOffset=47]checkLocalTransaction:status = 0checkLocalTransaction:status = 0checkLocalTransaction:status = 0从日志可以看出,首先发送"待确认"消息,发送返回为SEND_OK;然后执行本地事务,实例中返回的是一个LocalTransactionState.UNKNOW状态,导致服务器一直调用回查方法checkLocalTransaction,同时消费端一直没有消息被消费;做简单代码改动,将本地事务的执行结果改成LocalTransactionState.COMMIT_MESSAGE,生产者消费者日志如下:start send message Message{topic=‘TopicTest1234’, flag=0, properties={KEYS=KEY0, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId=‘null’}executeLocalTransactionSendResult [sendStatus=SEND_OK, msgId=0A0D530740F073D16E936A3A2DC10000, offsetMsgId=null, messageQueue=MessageQueue [topic=TopicTest1234, brokerName=broker-a, queueId=2], queueOffset=58]Time [Thu Feb 21 15:03:17 CST 2019],ConsumeMessageThread_3Receive New Messages :[MessageExt [queueId=2, storeSize=278, queueOffset=0, sysFlag=8, bornTimestamp=1550732597697, bornHost=/10.13.83.7:55029, storeTimestamp=1550732597758, storeHost=/10.13.83.7:10911, msgId=0A0D530700002A9F0000000000008853, commitLogOffset=34899, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=34610, toString()=Message{topic=‘TopicTest1234’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TopicTest1234, MAX_OFFSET=1, KEYS=KEY0, TRAN_MSG=true, UNIQ_KEY=0A0D530740F073D16E936A3A2DC10000, WAIT=true, PGROUP=transactionProducerGroupName, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId=‘0A0D530740F073D16E936A3A2DC10000’}]]可以看到消息状态在服务器端被修改,这样消费端就可以消费此消息;总结本文首先介绍了RocketMQ发送消息的通讯模式,然后重点介绍了延迟消息,顺序消息以及事务消息,并且结合实例进行分析。示例代码地址https://github.com/ksfzhaohui…https://gitee.com/OutOfMemory… ...

February 21, 2019 · 9 min · jiezi

RocketMQ生产者流程篇

前言生产者向消息队列里面写入消息,不同的业务场景会采用不同的写入策略,比如:同步发送,异步发送,延迟发送,事务消息等;本文首先从分析生产者发送消息的流程开始,然后再来介绍各种发送消息的策略。生产者流程1.流程概述生产者首先需要设置namesrv,或者指定其他方式更新namesrv;然后从namesrv获取topic的路由信息,路由信息包括broker以及Message Queue等信息,同时将路由信息保存在本地内存中,方便下次使用;最后从Message Queue列表中选择合适的Queue发送消息,实现负载均衡;2.启动过程DefaultMQProducer实例化提供了两个参数分别是:生产者组名称以及RPCHook,RPCHook是一个接口,具体实现交由业务端实现,两个方法分别是:doBeforeRequest和doAfterResponse,表示在执行请求之前和接收返回之后分别执行相关逻辑;接下来就是调用DefaultMQProducer的start方法,相关的初始化操作都在里面进行,内部其实调用的是DefaultMQProducerImpl的start方法,具体代码如下:public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException(“The producer group[” + this.defaultMQProducer.getProducerGroup() + “] has been created before, specify another name please.” + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { mQClientFactory.start(); } log.info(“the producer [{}] start OK. sendMessageWithVIPChannel={}”, this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException(“The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); }默认serviceState的状态为CREATE_JUST,刚进入设置为START_FAILED,初始化完成之后设置为RUNNING,再次初始化时会直接报错,下面看一下具体初始化了哪些信息;2.1检查配置这里的检查其实就是对producerGroup进行合法性校验;2.2设置instanceName如果producerGroup不等于默认的"CLIENT_INNER_PRODUCER”,则设置DefaultMQProducer的instanceName为进程的pid;2.3创建MQClientInstance对象首先检查 ConcurrentMap<String/ clientId /, MQClientInstance> factoryTable中是否已经存在已clientId为key的MQClientInstance,如果存在则取出,不存在则实例化;clientId是已ip地址,instanceName以及unitName组成的,例如:10.13.83.7@125002.4注册producer将DefaultMQProducerImpl注册到MQClientInstance中,已producerGroup作为key,注册到ConcurrentMap<String/ group /, MQProducerInner> producerTable中,如果已经存在此Group,则抛出异常;2.5初始化TopicPublishInfo已topic名称为"TBW102"为key,实例化TopicPublishInfo作为value,存放在ConcurrentMap<String/ topic /, TopicPublishInfo> topicPublishInfoTable中,TopicPublishInfo用来存放topic的路由信息;2.6启动MQClientInstanceMQClientInstance启动会启动很多相关服务,具体可以看如下代码: public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info(“the client factory [{}] start OK”, this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException(“The Factory object[” + this.getClientId() + “] has been created before, and failed.”, null); default: break; } } }默认serviceState的状态为CREATE_JUST,刚进入设置为START_FAILED,初始化完成之后设置为RUNNING,防止重复初始化;2.6.1初始化NameServerAddr首先判断DefaultMQProducer是否配置了NameServerAddr,如果没有配置会到一个地址下获取,地址默认为:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,相关的逻辑在MixAll类中,代码如下: public static String getWSAddr() { String wsDomainName = System.getProperty(“rocketmq.namesrv.domain”, DEFAULT_NAMESRV_ADDR_LOOKUP); String wsDomainSubgroup = System.getProperty(“rocketmq.namesrv.domain.subgroup”, “nsaddr”); String wsAddr = “http://” + wsDomainName + “:8080/rocketmq/” + wsDomainSubgroup; if (wsDomainName.indexOf(":") > 0) { wsAddr = “http://” + wsDomainName + “/rocketmq/” + wsDomainSubgroup; } return wsAddr; }正常情况下我们需要设置自己的地址,可以通过如下方式设置:System.setProperty(“rocketmq.namesrv.domain”, “localhost”);这种情况下就可以不用手动设置NameServerAddr;2.6.2初始化RemotingClientRemotingClient是一个接口类,底层使用的通讯框架是Netty,提供了实现类NettyRemotingClient,RemotingClient在初始化的时候实例化Bootstrap,方便后续用来创建Channel;2.6.3启动定时器任务总共启动了5个定时器任务,分别是:定时更新NameServerAddr信息,定时更新topic的路由信息,定时清理下线的broker,定时持久化Consumer的Offset信息,定时调整线程池;2.6.3启动服务pullMessageService和rebalanceService被用在消费端的两个服务类,分别是:从broker拉取消息的服务和均衡消息队列服务,负责分配消费者可消费的消息队列;3.发送消息相关发送消息的代码在DefaultMQProducerImpl的sendDefaultImpl方法中,部分代码如下所示:private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } …以下代码省略…此方法的四个参数分别是:msg为要发送的消息,communicationMode要使用的发送方式包括同步、异步、单向,sendCallback在异步情况下的回调函数,timeout发送消息的超时时间;3.1获取topic的路由信息通过msg中设置的topic获取路由信息,具体代码如下:private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }首先从变量ConcurrentMap<String/ topic /, TopicPublishInfo> topicPublishInfoTable中获取是否存在指定topic的路由信息,如果获取不到则使用topic去nameServer获取路由信息,如果还是获取不到则使用默认的topic名称为"TBW102"去获取路由信息,需要使用默认名称去获取的情况是没有创建topic,需要broker自动创建topic的情况;获取路由信息使用的是MQClientInstance中的updateTopicRouteInfoFromNameServer方法,此方法根据topic获取路由信息,具体连接哪台nameServer,会从列表中顺序的选择nameServer,实现负载均衡;注:名称为"TBW102"的topic是系统自动创建的;3.2选择MessageQueue成功获取到路由信息之后,需要从MessageQueue列表中选择一个,在这之前获取了默认发送消息失败的重试次数,默认为3次(只有发送模式是同步的情况下),代码如下:public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error(“Error occurred when selecting message queue”, e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }以上代码在MQFaultStrategy,此类提供了选择MessageQueue的策略,相关策略可以看类变量:private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); private boolean sendLatencyFaultEnable = false; private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};latencyFaultTolerance:延迟容错对象,维护brokers的延迟信息;sendLatencyFaultEnable:延迟容错开关,默认不开启;latencyMax:延迟级别数组;notAvailableDuration :根据延迟级别,对应broker不可用的时长;这样上面的这段代码就好理解了,首先判定是否开启开关,如果开启首先获取index,index初始为一个随机值,后面每次+1,这样在后面与MessageQueue长度取模的时候每个MessageQueue可以按顺序获取;获取MessageQueue之后需要判定其对应的Broker是否可用,同时也需要和当前指定的brokerName进行匹配;如果没有获取到就选择一个延迟相对小的,pickOneAtLeast会做排序处理;如果都不行就直接获取一个MessageQueue,不管其他条件了;3.3发送消息在发送之前会做超时检测,如果前面两步已经超时了,则直接报超时,默认超时时间是3秒;部分代码如下: private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals(“true”)) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); msg.setBody(prevBody); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException(“sendKernelImpl call timeout”); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException(“sendKernelImpl call timeout”); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult;此处的6个参数分别是:msg消息本身,mq要发送到的队列,communicationMode发送策略,sendCallback异步回调函数,topicPublishInfo路由信息,timeout发送超时时间(3秒-前2步消耗的时间);首先需要获取指定broker的地址,这要才能创建channel与broker连接;然后就是一些hock处理;接下来就是准备发送的消息头SendMessageRequestHeader,最后根据不同的发送策略执行发送消息,此处就不在进入更加深入的分析;总结本文重点介绍了发送者的启动,以及发送消息的大概流程;关于消息的发送策略,以及消息的类型包括:顺序消息,事务消息等,将在后面的文章介绍。 ...

February 15, 2019 · 5 min · jiezi

RocketMQ搭建

RocketMQ也已经加入了apache的开源项目,今天说说windows下的搭建1 下载安装包方式1(自己编译)下载:https://www.apache.org/dyn/cl…方式2,编译好的下载:https://www.apache.org/dyn/cl…2.使用maven编译(方式一需要这一步) > unzip rocketmq-all-4.4.0-source-release.zip > cd rocketmq-all-4.4.0/ > mvn -Prelease-all -DskipTests clean install -U > cd distribution/target/apache-rocketmq3.先启动Name Server> nohup sh bin/mqnamesrv & > tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success…windows平台:配置环境变量:ROCKETMQ_HOME:解压后的mq目录路径,注意是bin目录所在的那层目录start mqnamesrv.cmd4.启动Broker > nohup sh bin/mqbroker -n localhost:9876 & > tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success…windows平台:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true5.关闭服务> sh bin/mqshutdown brokerThe mqbroker(36695) is running…Send shutdown request to mqbroker(36695) OK> sh bin/mqshutdown namesrvThe mqnamesrv(36664) is running…Send shutdown request to mqnamesrv(36664) OK6.可视化管理控制台rocketmq-console-ng下载源码 https://github.com/apache/inc…进入rocketmq-console目录,打包mvn package运行,需要jdk1.8进入target目录java -jar rocketmq-console-ng-1.0.0.jar –server.port=12581 –rocketmq.config.namesrvAddr=127.0.0.1:9876如果不想配置这么多参数,你可以直接在rocketmq-console-ng目录里的application.properties文件中修改完毕后再打包,这样就只需要运行java -jar rocketmq-console-ng-1.0.0.jar启动成功后,我们就可以通过浏览器访问http://localhost:12581进入控制台界面了,如下图: ...

January 29, 2019 · 1 min · jiezi

高级开发人员必备技术:MQ

也许在你们公司从没有使用过MQ,也不知道这东西是用来干什么的,但是一旦你进入大公司你就会发现,这东西处处可见。今天就来说说MQ方面的东西,我公众号有activemq的 demo,大家可以自己去看看。什么是MQMessage Queue简称MQ,中文消息队列。“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。你可以把它理解为一个中间件,帮你完成多个系统或应用间的消息传递。为什么要使用MQ首先它有3个核心,解耦,异步,削峰,因此我们可以想到以下使用场景:你的系统要和多个系统发生关系,别的系统要从你这获取一些数据,今天A系统和你要这样的数据,明天B系统说你的数据有问题,后天C系统让你加个别的数据。你一个人要维护解决很多问题,忙得不可开交。而有了MQ,你就可以通过Pub/Sub 发布订阅消息这么一个模型,系统就跟其它系统彻底解耦了。只要把消息放到队列里,其它系统就自己去处理自己需要的数据,自己不再考虑该给谁发送数据。比如:下完订单,不再去通知库存做同步处理。把该物品的信息放在队列中,库存自己去选择什么时候去处理计算自己的库存。还是上面的例子,之前的流程,user浏览器端发起购物请求3ms,订单系统处理数据库300ms,之后库存系统处理数据库300ms,这样同步情况下加起来就要603ms。即使前端有加载提示框,等待时间超过300ms,人眼是能感受到这种延迟的,体验很不好,速度越快才能留住user。现在使用MQ采用异步消息处理,假如消息放进队列需要3ms,那么最终的响应时间是3+3=6ms,对于创建订单和库存计算user并不关心,这样极大的提高了响应时间。一个大的网站或是应用,总会迎来访问量的高峰,可能是营销活动突然带来的大流量,或是节假日。比如双十一,购物人数突然猛增,并发数提高,数据库的压力突然增大,超出了每秒钟的处理能力,就会导致网站瘫痪。使用mq后,数据库可以不必立马处理这么多的请求,可以自己选择能承受的消息慢慢去处理。所有的消息积压在队列中,而不是同时积压到数据库。加入队列中积压了1亿条数据,而我的数据库只能每秒处理100万条数据,那我就每秒从队列中取出100万条来处理,始终不会超出阈值,这样数据库就不会被挤垮。把峰值慢慢消耗。现在想想你为什么没有使用到mq吧?或是考略使用mq使用后带来的威胁任何事物都有它的两面性,既然有优点那也有缺点:系统可用性降低万一mq挂了,队列里面的数据没有了,其它系统数据还没处理完,这可咋整?系统的复杂度提高了你用个mq是爽了,其它系统也要对应的修改自己的系统,来消费队列中的消息。硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。一致性问题你订单系统操作成功了,但是库存系统却失败了,这样导致了数据的不一致。所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。主流的MQ产品KafkaActiveMQRabbitMQ特性ActiveMQRabbitMQRocketMQKafka单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支撑高吞吐10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同 RocketMQ功能支持MQ 领域的功能极其完备基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用上面表格来自:https://github.com/doocs/adva…推荐使用早期大家都在使用ActiveMQ ,适合小型项目,如果你尝试使用MQ,你可以选择。RabbitMQ社区活跃度比较高,开源,有问题可以在社区寻求帮助。但是底层使用了erlang 语言,不是小公司又能力掌控的 。RocketMQ 阿里出品,是用的中国公司比较多,经历过使用场景的考验,且自家产品也在用,不用担心。但是社区活跃度不高。推荐使用。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

January 25, 2019 · 1 min · jiezi

paascloud开源项目学习(2) -- centos7下安装SpringCloud+Vue环境

前言github 开源项目–paascloud-master:https://github.com/paascloud/…paascloud-master 官方环境搭建:http://blog.paascloud.net/201…基本环境rzyum install lrzszzip 和 unzipyum install -y unzip zipvimyum -y install vim* Java 环境jdk 8tar.gz包安装,参考:https://www.cnblogs.com/chy12…rpm包安装,参考:https://www.cnblogs.com/zengh…mysql 5.7下载mysql yum源 版本为5.7下载地址:https://dev.mysql.com/downloads/file/?id=470281查看yum源安装mysql版本上面下载后,yum localinstall mysql57-community-release-el7-11.noarch.rpmvim /etc/yum.repos.d/mysql-community.repo # 确定使用的版本,enable设为1yum install -y mysql-community-server启动mysqlsystemctl status mysqld.servicesystemctl start mysqld.service查看mysql密码cat /etc/my.cnf# log-error=/var/log/mysqld.log# pid-file=/var/run/mysqld/mysqld.pidcat /var/log/mysqld.log | grep password登录mysql数据库mysql -u root -p 修改密钥复杂度配置mysql> set global validate_password_policy=0;mysql> set global validate_password_length=6;修改密码mysql> alter user ‘root’@’localhost’ identified by ‘123456’;远程访问权限mysql> GRANT ALL PRIVILEGES ON . TO ‘root’@’%’ IDENTIFIED BY ‘123456’ WITH GRANT OPTION;mysql> flush privileges;mysql 备份参考博客:https://blog.csdn.net/SWPU_Li…crontab 命令:https://www.cnblogs.com/kensh…dockerdocker 在线安装非常慢,不推荐。依次执行下面命令yum remove docker docker-common docker-selinux docker-engineyum install -y yum-utils device-mapper-persistent-data lvm2yum-config-manager –add-repo https://download.docker.com/linux/centos/docker-ce.repoyum-config-manager –enable docker-ce-edgeyum-config-manager –enable docker-ce-testyum-config-manager –disable docker-ce-edgeyum makecache fastyum -y install docker-cesystemctl start dockerdocker run hello-worlddocker imagesREPOSITORY TAG IMAGE ID CREATED SIZEhello-world latest 1815c82652c0 2 months ago 1.84kBdocker 本地安装从官方安装包下载:docker-ce-17.06.0.ce-1.el7.centos.x86_64.rpm。安装yum install /usr/local/src/tool/docker-ce-17.06.0.ce-1.el7.centos.x86_64.rpm -y启动systemctl start docker查看docker版本docker -v开机启动# systemctl enable dockerCreated symlink from /etc/systemd/system/multi-user.target.wants/docker.service to /usr/lib/systemd/system/docker.service.docker 卸载查看已安装的docker安装包yum list installed|grep docker删除上面显示的安装包列表yum –y remove docker.x86_64.XXX删除docker镜像rm -rf /var/lib/dockerredis 4.0.2redis 单机tar 包安装下载,解压,编译:wget http://download.redis.io/releases/redis-4.0.2.tar.gztar xzf redis-4.0.2.tar.gzcd redis-4.0.2make二进制文件是编译完成后在 src 目录下,通过下面的命令启动 Redis 服务:src/redis-server使用内置的客户端命令 redis-cli 进行使用:# src/redis-cliredis> set foo barOKredis> get foo"bar"停止服务:# 第一种:杀死进程PID,kill -9 PIDps aux|grep redis# 第二种src/redis-cli shutdownyum 安装安装,启动yum install epel-releaseyum install redissystemctl start redis.serviceredis-server /etc/redis.confsystemctl enable redis常用配置vi /usr/local/redis-4.0.2/redis.confrequirepass paasword #配置密码# bind 127.0.0.1 #允许远程访问daemonize yes #后台启动自定义配置启动src/redis-server ../redis.confsrc/redis-cli -a paaswordredis 集群参考博客:http://blog.paascloud.net/201…docker 下安装 redisdocker run -d -p 6379:6379 redis:4.0.8 –requirepass “123456"nginx 1.14.X下载对应当前系统版本的 nginx 包wget http://nginx.org/packages/centos/7/noarch/RPMS/nginx-release-centos-7-0.el7.ngx.noarch.rpm建立 nginx 的 yum 仓库rpm -ivh nginx-release-centos-7-0.el7.ngx.noarch.rpm安装 nginxyum -y install nginx启动 nginxsystemctl start nginx版本号nginx -vnginx version: nginx/1.14.1默认配置文件路径/etc/nginx/nginx.confrocketmq 4.2.X主要是搭建集群环境同步双写(2m-2s-sync)参考博客:http://blog.paascloud.net/201…异步复制(2m-2s-async)参考博客:https://blog.csdn.net/weixin_… 注意:如果 broker 启动失败,可能是 runbroker.sh、runserver.sh 里的内存大小设置默认过大。RocketMQ Web管理界面rocketmq 提供多种管理方式,命令行和界面等,apache 提供一个开源的扩展项目: https://github.com/apache/roc… 里面包含一个子项目 rocketmq-console,配置下,打个包就可以用了。或者可以百度搜索一下rocketmq-console.war。具体安装参考博客:https://www.jianshu.com/p/e5b…rabbitmq 3.7.3参考博客:http://blog.paascloud.net/201…zookeeper 3.4.X单机、集群、伪集群:https://www.cnblogs.com/sundd…paascloue 集群环境:http://blog.paascloud.net/201…命令启动 rabbitmq/etc/init.d/rabbitmq-server start # 或 service rabbitmq-service start 启用 RabbitMQWeb 管理插件用户名/密码:guest/guest启动rabbitmq-plugins enable rabbitmq_management 访问(修改为自己 ip):http://192.168.241.101:15672/启动 zookeeper根据上面参考博客1搭建的伪集群,因为配置文件在一个机器上的 zookeeper 目录下,所以启动时对应不同的配置文件。进入zookeeper的 conf目录下cd /root/software/zookeeper-3.4.9/conf启动# 添加了环境变量zkServer.sh start zoo1.cfgzkServer.sh start zoo2.cfgzkServer.sh start zoo3.cfg查看状态zkServer.sh status zoo1.cfgzkServer.sh status zoo2.cfgzkServer.sh status zoo3.cfg启动 zookeeper 图形化界面zookeeper 图形化的客户端工具–ZooInspector,具体使用参考博客:https://blog.csdn.net/qq_2685…。启动 zookeeper 集群后,运行 ZooInspector jar 包,当 paascloud 项目启动后,出现下面效果启动 rocketmq 集群根据上面 rocketmq集群 目录下的第一个参考博客来启动。2m-2s-sync。启动 NameServer A 192.168.241.101nohup sh /usr/local/rocketmq/bin/mqnamesrv &启动 NameServer A 192.168.241.102nohup sh /usr/local/rocketmq/bin/mqnamesrv &启动 BrokerServer A-master 192.168.241.101nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties&启动 BrokerServer A-slave 192.168.241.101nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties&启动 BrokerServer B-master 192.168.241.102nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties&启动 启动BrokerServer B-slave 192.168.241.102nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties&查看日志netstat -ntlpjpstail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log停止服务sh /usr/local/rocketmq/bin/mqshutdown namesrvsh /usr/local/rocketmq/bin/mqshutdown broker清理数据rm -rf /usr/local/rocketmq/data/masterrm -rf /usr/local/rocketmq/data/slavemkdir -p /usr/local/rocketmq/data/master/store/commitlogmkdir -p /usr/local/rocketmq/data/slave/store/commitlogmkdir -p /usr/local/rocketmq/data/master/store/consumequeuemkdir -p /usr/local/rocketmq/data/slave/store/consumequeuemkdir -p /usr/local/rocketmq/data/master/store/indexmkdir -p /usr/local/rocketmq/data/slave/store/indexrocketmq 集群控制台启动解压在tomcat目录,./tomcat/bin/startup.sh 启动即可。访问地址:http://192.168.0.110:8080/roc… ...

January 23, 2019 · 2 min · jiezi

RocketMQ入门篇

RocketMQ整体结构如上图所示,整体可以分成4个角色,分别是:Producer,Consumer,Broker以及NameServer;1.NameServer可以理解为是消息队列的协调者,Broker向它注册路由信息,同时Client向其获取路由信息,如果使用过Zookeeper,就比较容易理解了,但是功能比Zookeeper弱;NameServer本身是没有状态的,并且多个NameServer直接并没有通信,可以横向扩展多台,Broker会和每一台NameServer建立长连接;2.BrokerBroker是RocketMQ的核心,提供了消息的接收,存储,拉取等功能,一般都需要保证Broker的高可用,所以会配置Broker Slave,当Master挂掉之后,Consumer然后可以消费Slave;Broker分为Master和Slave,一个Master可以对应多个Slave,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave;3.Producer消息队列的生产者,需要与NameServer建立连接,从NameServer获取Topic路由信息,并向提供Topic服务的Broker Master建立连接;Producer无状态,看集群部署;4.Consumer消息队列的消费者,同样与NameServer建立连接,从NameServer获取Topic路由信息,并向提供Topic服务的Broker Master,Slave建立连接;5.Topic和Message Queue在介绍完以上4个角色以后,还需要重点介绍一下上面提到的Topic和Message Queue;字面意思就是主题,用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息,为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,有点类似kafka的分区(Partition),这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个Message Queue读取消息;单机配置和部署以下部署在centos7,jdk1.8,rocketmq4.3.2;启动RocketMQ的顺序是先启动NameServer,然后再启动Broker;1.NameServer启动[root@localhost bin]# ./mqnamesrvJava HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future releaseJava HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.The Name Server boot success. serializeType=JSON如上日志表示启动成功,默认端口为9876;2.Broker启动[root@localhost bin]# ./mqbrokerJava HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error=‘Cannot allocate memory’ (errno=12)## There is insufficient memory for the Java Runtime Environment to continue.# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.# An error report file with more information is saved as:# /root/rocketmq-all-4.3.2-bin-release/bin/hs_err_pid3977.log启动失败,报内存不足,主要是rocketmq默认配置的启动参数值比较大,修改runbroker.sh即可[root@localhost bin]# vi runbroker.shJAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"默认配置的可用内存为8g,虚拟机内存不够,修改为如下即可JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"再次启动,日志如下,表示启动成功,默认端口为10911;[root@localhost bin]# ./mqbrokerThe broker[localhost.localdomain, 192.168.237.128:10911] boot success. serializeType=JSON3.简单测试3.1生产者public class SyncProducer { public static void main(String[] args) throws Exception { // 构造Producer DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”); producer.setNamesrvAddr(“192.168.237.128:9876”); // 初始化Producer,整个应用生命周期内,只需要初始化1次 producer.start(); for (int i = 0; i < 100; i++) { Message msg = new Message(“TopicTest”, “TagA”, (“Hello RocketMQ” + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } producer.shutdown(); }}创建了一个DefaultMQProducer对象,同时设置了GroupName和NameServer地址,然后创建Message消息通过DefaultMQProducer将消息发送出去,返回一个SendResult对象;3.2消费者public class PushConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“please rename to unique group name”); consumer.setNamesrvAddr(“192.168.237.128:9876”); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe(“TopicTest”, “*”); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + “Receive New Messages :” + msgs + “%n”); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }}同样指定了GroupName和NameServer地址,订阅了Topic;3.3运行测试直接运行生产者报如下错误:Exception in thread “main” org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTestSee http://rocketmq.apache.org/docs/faq/ for further details. at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:634) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1253) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1203) at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:214) at com.rocketmq.SyncProducer.main(SyncProducer.java:26)错误显示"没有此Topic的路由信息",也就是生产者在发送消息的时候没有获取到路由信息,找不到指定的Broker,可能的原因:1.Broker没有正确连接NameServer2.Producer没有连接NameServer3.Topic没有被正确创建SyncProducer中指定了NameServer的地址,同时RocketMQ默认情况下会自动创建Topic,所以原因是Broker没有注册到NameServer,重新指定NameServer再启动:[root@localhost bin]# ./mqbroker -n localhost:9876The broker[localhost.localdomain, 192.168.237.128:10911] boot success. serializeType=JSON and name server is localhost:9876再次运行SyncProducer,日志如下:SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4C60000, offsetMsgId=C0A8ED8000002A9F000000000000229C, messageQueue=MessageQueue[topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=11]SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4CD0001, offsetMsgId=C0A8ED8000002A9F000000000000234D, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=9]SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4D90002, offsetMsgId=C0A8ED8000002A9F00000000000023FE, messageQueue=MessageQueue[topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=9]SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4E80003, offsetMsgId=C0A8ED8000002A9F00000000000024AF, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=11]SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4F40004, offsetMsgId=C0A8ED8000002A9F0000000000002560, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=12]SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C4F70005, offsetMsgId=C0A8ED8000002A9F0000000000002611, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=10]SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C5030006, offsetMsgId=C0A8ED8000002A9F00000000000026C2, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=10]SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C5070007, offsetMsgId=C0A8ED8000002A9F0000000000002773, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=12]SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C50A0008, offsetMsgId=C0A8ED8000002A9F0000000000002824, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=13]SendResult [sendStatus=SEND_OK, msgId=0A0D53073B6073D16E933086C50D0009, offsetMsgId=C0A8ED8000002A9F00000000000028D5, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=11]消费者使用的是push模式,可以实时接受消息:ConsumeMessageThread_13Receive New Messages :[MessageExt [queueId=1, storeSize=177,queueOffset=11, sysFlag=0, bornTimestamp=1547086138566, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430770, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F000000000000229C, commitLogOffset=8860, bodyCRC=705268097, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=12, CONSUME_START_TIME=1547086138573, UNIQ_KEY=0A0D53073B6073D16E933086C4C60000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 48], transactionId=‘null’}]]ConsumeMessageThread_3Receive New Messages :[MessageExt [queueId=2, storeSize=177, queueOffset=9, sysFlag=0, bornTimestamp=1547086138573, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430783, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F000000000000234D, commitLogOffset=9037, bodyCRC=1561245975, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1547086138598, UNIQ_KEY=0A0D53073B6073D16E933086C4CD0001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 49], transactionId=‘null’}]]ConsumeMessageThread_17Receive New Messages :[MessageExt [queueId=3, storeSize=177, queueOffset=9, sysFlag=0, bornTimestamp=1547086138585, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430794, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F00000000000023FE, commitLogOffset=9214, bodyCRC=1141369005, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=10, CONSUME_START_TIME=1547086138601, UNIQ_KEY=0A0D53073B6073D16E933086C4D90002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 50], transactionId=‘null’}]]ConsumeMessageThread_9Receive New Messages :[MessageExt [queueId=0, storeSize=177, queueOffset=11, sysFlag=0, bornTimestamp=1547086138600, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430807, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F00000000000024AF, commitLogOffset=9391, bodyCRC=855693371, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=12, CONSUME_START_TIME=1547086138612, UNIQ_KEY=0A0D53073B6073D16E933086C4E80003, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 51], transactionId=‘null’}]]ConsumeMessageThread_15Receive New Messages :[MessageExt [queueId=1, storeSize=177, queueOffset=12, sysFlag=0, bornTimestamp=1547086138612, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430809, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F0000000000002560, commitLogOffset=9568, bodyCRC=761548184, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=13, CONSUME_START_TIME=1547086138626, UNIQ_KEY=0A0D53073B6073D16E933086C4F40004, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 52], transactionId=‘null’}]]ConsumeMessageThread_11Receive New Messages :[MessageExt [queueId=2, storeSize=177, queueOffset=10, sysFlag=0, bornTimestamp=1547086138615, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430820, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F0000000000002611, commitLogOffset=9745, bodyCRC=1516469518, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=11, CONSUME_START_TIME=1547086138628, UNIQ_KEY=0A0D53073B6073D16E933086C4F70005, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 53], transactionId=‘null’}]]ConsumeMessageThread_4Receive New Messages :[MessageExt [queueId=3, storeSize=177,queueOffset=10, sysFlag=0, bornTimestamp=1547086138627, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430824, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F00000000000026C2,commitLogOffset=9922, bodyCRC=1131031732,reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=11, CONSUME_START_TIME=1547086138633, UNIQ_KEY=0A0D53073B6073D16E933086C5030006, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 54], transactionId=‘null’}]]ConsumeMessageThread_14Receive New Messages :[MessageExt [queueId=0, storeSize=177, queueOffset=12, sysFlag=0, bornTimestamp=1547086138631, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430827, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F0000000000002773, commitLogOffset=10099, bodyCRC=879565858, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=13, CONSUME_START_TIME=1547086138635, UNIQ_KEY=0A0D53073B6073D16E933086C5070007, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 55], transactionId=‘null’}]]ConsumeMessageThread_10Receive New Messages :[MessageExt [queueId=1, storeSize=177, queueOffset=13, sysFlag=0, bornTimestamp=1547086138634, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430830, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F0000000000002824, commitLogOffset=10276, bodyCRC=617742771, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=14, CONSUME_START_TIME=1547086138638, UNIQ_KEY=0A0D53073B6073D16E933086C50A0008, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 56], transactionId=‘null’}]]ConsumeMessageThread_7Receive New Messages :[MessageExt [queueId=2, storeSize=177, queueOffset=11, sysFlag=0, bornTimestamp=1547086138637, bornHost=/192.168.237.1:53524, storeTimestamp=1547139430833, storeHost=/192.168.237.128:10911, msgId=C0A8ED8000002A9F00000000000028D5, commitLogOffset=10453, bodyCRC=1406480677, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=12, CONSUME_START_TIME=1547086138641, UNIQ_KEY=0A0D53073B6073D16E933086C50D0009, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 57], transactionId=‘null’}]]多机集群配置和部署分别部署两台NameServer,两台Broker并且分别提供Slave,准备两台电脑分别是本机的windows以及虚拟机centos;1.启动NameServer分别启动2台NameServer,端口号都使用默认的9876,地址端口如下:192.168.237.128:987610.13.83.7:98762.启动Broker每台机器上分别启动一个Master和一个Slave,互为主备,在主目录下的conf文件夹下提供了多种broker配置模式,分别有:2m-2s-async,2m-2s-sync,2m-noslave,可以以此为模版做如下配置:2.1配置10.13.83.7Master和SlaveMaster配置如下:namesrvAddr=192.168.237.128:9876;10.13.83.7:9876brokerClusterName=DefaultClusterbrokerName=broker-abrokerId=0deleteWhen=04fileReservedTime=48brokerRole=SYNC_MASTERflushDiskType=ASYNC_FLUSHlistenPort=10911storePathRootDir=E:/rocketmq-all-4.3.2-bin-release/store-a-mSlave配置如下:namesrvAddr=192.168.237.128:9876;10.13.83.7:9876brokerClusterName=DefaultClusterbrokerName=broker-abrokerId=1deleteWhen=04fileReservedTime=48brokerRole=SLAVEflushDiskType=ASYNC_FLUSHlistenPort=10811storePathRootDir=E:/rocketmq-all-4.3.2-bin-release/store-a-s分别启动结果如下:E:\rocketmq-all-4.3.2-bin-release\bin>mqbroker -c E:\rocketmq-all-4.3.2-bin-release\conf\broker-m.confThe broker[broker-a, 10.13.83.7:10911] boot success. serializeType=JSON and name server is 192.168.237.128:9876;10.13.83.7:9876以上是Master启动日志,Slave日志如下:E:\rocketmq-all-4.3.2-bin-release\bin>mqbroker -c E:\rocketmq-all-4.3.2-bin-release\conf\broker-s.confThe broker[broker-a, 10.13.83.7:10811] boot success. serializeType=JSON and name server is 192.168.237.128:9876;10.13.83.7:98762.2配置10.13.83.7SlaveMaster配置如下:namesrvAddr=192.168.237.128:9876;10.13.83.7:9876brokerClusterName=DefaultClusterbrokerName=broker-bbrokerId=0deleteWhen=04fileReservedTime=48brokerRole=SYNC_MASTERflushDiskType=ASYNC_FLUSHlistenPort=10911storePathRootDir=/root/rocketmq-all-4.3.2-bin-release/store-b-mSlave配置如下:namesrvAddr=192.168.237.128:9876;10.13.83.7:9876brokerClusterName=DefaultClusterbrokerName=broker-bbrokerId=1deleteWhen=04fileReservedTime=48brokerRole=SLAVEflushDiskType=ASYNC_FLUSHlistenPort=10811storePathRootDir=/root/rocketmq-all-4.3.2-bin-release/store-b-s启动日志分别如下:[root@localhost bin]# ./mqbroker -c ../conf/broker-m.conf The broker[broker-b, 192.168.237.128:10911] boot success. serializeType=JSON and name server is 192.168.237.128:9876;10.13.83.7:9876[root@localhost bin]# ./mqbroker -c ../conf/broker-s.conf The broker[broker-b, 192.168.237.128:10811] boot success. serializeType=JSON and name server is 192.168.237.128:9876;10.13.83.7:98763.配置说明1.namesrvAddrNameServer地址,可以配置多个,用逗号分隔;2.brokerClusterName所属集群名称,如果节点较多可以配置多个3.brokerNamebroker名称,master和slave使用相同的名称,表明他们的主从关系4.brokerId0表示Master,大于0表示不同的slave5.deleteWhen表示几点做消息删除动作,默认是凌晨4点6.fileReservedTime在磁盘上保留消息的时长,单位是小时7.brokerRole有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;8.flushDiskType刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;9.listenPort启动监听的端口号10.storePathRootDir存储消息的根目录管理工具1.命令行管理工具mqadmin是RocketMQ自带的命令行管理工具,可以创建、修改Topic,查询消息,更新配置信息等操作,具体可以通过如下命令查看:E:\rocketmq-all-4.3.2-bin-release\bin>mqadminThe most commonly used mqadmin commands are: updateTopic Update or create topic deleteTopic Delete topic from broker and NameServer. updateSubGroup Update or create subscription group deleteSubGroup Delete subscription group from broker. updateBrokerConfig Update broker’s config updateTopicPerm Update topic perm topicRoute Examine topic route info topicStatus Examine topic Status info topicClusterList get cluster info for topic brokerStatus Fetch broker runtime status data queryMsgById Query Message by Id queryMsgByKey Query Message by Key queryMsgByUniqueKey Query Message by Unique key queryMsgByOffset Query Message by offset printMsg Print Message Detail printMsgByQueue Print Message Detail sendMsgStatus send msg to broker. brokerConsumeStats Fetch broker consume stats data producerConnection Query producer’s socket connection and client vers consumerConnection Query consumer’s socket connection, client versionubscription consumerProgress Query consumers’s progress, speed consumerStatus Query consumer’s internal data structure cloneGroupOffset clone offset from other group. clusterList List all of clusters topicList Fetch all topic list from name server updateKvConfig Create or update KV config. deleteKvConfig Delete KV config. wipeWritePerm Wipe write perm of broker in all name server resetOffsetByTime Reset consumer offset by timestamp(without clientt). updateOrderConf Create or update or delete order conf cleanExpiredCQ Clean expired ConsumeQueue on broker. cleanUnusedTopic Clean unused topic on broker. startMonitoring Start Monitoring statsAll Topic and Consumer tps stats allocateMQ Allocate MQ checkMsgSendRT check message send response time clusterRT List All clusters Message Send RT getNamesrvConfig Get configs of name server. updateNamesrvConfig Update configs of name server. getBrokerConfig Get broker config by cluster or special broker! queryCq Query cq command. sendMessage Send a message consumeMessage Consume messageSee ‘mqadmin help <command>’ for more information on a specific command.列出了所有支持的命令以及简单的介绍,如果想看详细的可以如下命令:E:\rocketmq-all-4.3.2-bin-release\bin>mqadmin help statsAllusage: mqadmin statsAll [-a] [-h] [-n <arg>] [-t <arg>] -a,–activeTopic print active topic only -h,–help Print help -n,–namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 -t,–topic <arg> print select topic only2.图形界面管理工具除了命令,还提供了图形界面管理工具,在RocketMQ的扩展包里面,具体地址如下:https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0/rocketmq-console目前的稳定版本是1.0.0,可以下载下来在本地运行,对application.properties做简单配置:rocketmq.config.namesrvAddr=10.13.83.7:9876需要指定NameServer的地址,然后就可以打包运行了,运作之后会启动8080端口,直接访问地址:http://localhost:8080总结本文从最简单的安装部署入手,并对常用的配置参数做了简单介绍;然后了解了RocketMQ的部署的整体结构,分别对其中的角色做了简单介绍;最后介绍了两种RocketMQ的管理工具,方便对RocketMQ的监控和管理。 ...

January 10, 2019 · 6 min · jiezi