聊聊rocketmqclientgo的strategy

序本文次要钻研一下rocketmq-client-go的strategy AllocateStrategyrocketmq-client-go-v2.0.0/consumer/strategy.go type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueueAllocateStrategy定义了一个funcAllocateByAveragelyrocketmq-client-go-v2.0.0/consumer/strategy.go func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue { if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 { return nil } var ( find bool index int ) for idx := range cidAll { if cidAll[idx] == currentCID { find = true index = idx break } } if !find { rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{ rlog.LogKeyConsumerGroup: consumerGroup, "consumerId": currentCID, "cidAll": cidAll, }) return nil } mqSize := len(mqAll) cidSize := len(cidAll) mod := mqSize % cidSize var averageSize int if mqSize <= cidSize { averageSize = 1 } else { if mod > 0 && index < mod { averageSize = mqSize/cidSize + 1 } else { averageSize = mqSize / cidSize } } var startIndex int if mod > 0 && index < mod { startIndex = index * averageSize } else { startIndex = index*averageSize + mod } num := utils.MinInt(averageSize, mqSize-startIndex) result := make([]*primitive.MessageQueue, 0) for i := 0; i < num; i++ { result = append(result, mqAll[(startIndex+i)%mqSize]) } return result}AllocateByAveragely办法会计算averageSize,而后再依据averageSize计算startIndex,最初取mqAll[(startIndex+i)%mqSize]AllocateByAveragelyCirclerocketmq-client-go-v2.0.0/consumer/strategy.go ...

July 13, 2020 · 3 min · jiezi

聊聊rocketmqclientgo的remoteBrokerOffsetStore

序本文次要钻研一下rocketmq-client-go的remoteBrokerOffsetStore remoteBrokerOffsetStorerocketmq-client-go-v2.0.0/consumer/offset_store.go type remoteBrokerOffsetStore struct { group string OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"` client internal.RMQClient namesrv internal.Namesrvs mutex sync.RWMutex}remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性NewRemoteOffsetStorerocketmq-client-go-v2.0.0/consumer/offset_store.go func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore { return &remoteBrokerOffsetStore{ group: group, client: client, namesrv: namesrv, OffsetTable: make(map[primitive.MessageQueue]int64), }}NewRemoteOffsetStore办法实例化remoteBrokerOffsetStorepersistrocketmq-client-go-v2.0.0/consumer/offset_store.go func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) { r.mutex.Lock() defer r.mutex.Unlock() if len(mqs) == 0 { return } used := make(map[primitive.MessageQueue]struct{}, 0) for _, mq := range mqs { used[*mq] = struct{}{} } for mq, off := range r.OffsetTable { if _, ok := used[mq]; !ok { delete(r.OffsetTable, mq) continue } err := r.updateConsumeOffsetToBroker(r.group, mq, off) if err != nil { rlog.Warning("update offset to broker error", map[string]interface{}{ rlog.LogKeyConsumerGroup: r.group, rlog.LogKeyMessageQueue: mq.String(), rlog.LogKeyUnderlayError: err.Error(), "offset": off, }) } else { rlog.Info("update offset to broker success", map[string]interface{}{ rlog.LogKeyConsumerGroup: r.group, rlog.LogKeyMessageQueue: mq.String(), "offset": off, }) } }}persist办法遍历OffsetTable,执行r.updateConsumeOffsetToBrokerremoverocketmq-client-go-v2.0.0/consumer/offset_store.go ...

July 12, 2020 · 3 min · jiezi

聊聊rocketmqclientgo的localFileOffsetStore

序本文次要钻研一下rocketmq-client-go的localFileOffsetStore OffsetStorerocketmq-client-go-v2.0.0/consumer/offset_store.go type OffsetStore interface { persist(mqs []*primitive.MessageQueue) remove(mq *primitive.MessageQueue) read(mq *primitive.MessageQueue, t readType) int64 update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)}OffsetStore定义了persist、remove、read、update办法localFileOffsetStorerocketmq-client-go-v2.0.0/consumer/offset_store.go type localFileOffsetStore struct { group string path string OffsetTable map[MessageQueueKey]int64 // mutex for offset file mutex sync.Mutex}localFileOffsetStore定义了group、path、OffsetTable、mutex属性NewLocalFileOffsetStorerocketmq-client-go-v2.0.0/consumer/offset_store.go func NewLocalFileOffsetStore(clientID, group string) OffsetStore { store := &localFileOffsetStore{ group: group, path: filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"), OffsetTable: make(map[MessageQueueKey]int64), } store.load() return store}NewLocalFileOffsetStore创立localFileOffsetStore,而后执行store.load()loadrocketmq-client-go-v2.0.0/consumer/offset_store.go func (local *localFileOffsetStore) load() { local.mutex.Lock() defer local.mutex.Unlock() data, err := utils.FileReadAll(local.path) if os.IsNotExist(err) { return } if err != nil { rlog.Info("read from local store error, try to use bak file", map[string]interface{}{ rlog.LogKeyUnderlayError: err, }) data, err = utils.FileReadAll(filepath.Join(local.path, ".bak")) } if err != nil { rlog.Info("read from local store bak file error", map[string]interface{}{ rlog.LogKeyUnderlayError: err, }) return } datas := make(map[MessageQueueKey]int64) wrapper := OffsetSerializeWrapper{ OffsetTable: datas, } err = jsoniter.Unmarshal(data, &wrapper) if err != nil { rlog.Warning("unmarshal local offset error", map[string]interface{}{ "local_path": local.path, rlog.LogKeyUnderlayError: err.Error(), }) return } if datas != nil { local.OffsetTable = datas }}load办法通过utils.FileReadAll(local.path)读取data,而后通过jsoniter.Unmarshal(data, &wrapper)将数据组装到local.OffsetTablereadrocketmq-client-go-v2.0.0/consumer/offset_store.go ...

July 11, 2020 · 2 min · jiezi

聊聊rocketmqclientgo的pushConsumer

序本文次要钻研一下rocketmq-client-go的pushConsumer pushConsumerrocketmq-client-go-v2.0.0/consumer/push_consumer.go type pushConsumer struct { *defaultConsumer queueFlowControlTimes int queueMaxSpanFlowControlTimes int consumeFunc utils.Set submitToConsume func(*processQueue, *primitive.MessageQueue) subscribedTopic map[string]string interceptor primitive.Interceptor queueLock *QueueLock done chan struct{} closeOnce sync.Once}pushConsumer定义了queueFlowControlTimes、queueMaxSpanFlowControlTimes、consumeFunc、submitToConsume、subscribedTopic、interceptor、queueLock、done、closeOnce属性NewPushConsumerrocketmq-client-go-v2.0.0/consumer/push_consumer.go func NewPushConsumer(opts ...Option) (*pushConsumer, error) { defaultOpts := defaultPushConsumerOptions() for _, apply := range opts { apply(&defaultOpts) } srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs) if err != nil { return nil, errors.Wrap(err, "new Namesrv failed.") } if !defaultOpts.Credentials.IsEmpty() { srvs.SetCredentials(defaultOpts.Credentials) } defaultOpts.Namesrv = srvs if defaultOpts.Namespace != "" { defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName } dc := &defaultConsumer{ client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil), consumerGroup: defaultOpts.GroupName, cType: _PushConsume, state: int32(internal.StateCreateJust), prCh: make(chan PullRequest, 4), model: defaultOpts.ConsumerModel, consumeOrderly: defaultOpts.ConsumeOrderly, fromWhere: defaultOpts.FromWhere, allocate: defaultOpts.Strategy, option: defaultOpts, namesrv: srvs, } p := &pushConsumer{ defaultConsumer: dc, subscribedTopic: make(map[string]string, 0), queueLock: newQueueLock(), done: make(chan struct{}, 1), consumeFunc: utils.NewSet(), } dc.mqChanged = p.messageQueueChanged if p.consumeOrderly { p.submitToConsume = p.consumeMessageOrderly } else { p.submitToConsume = p.consumeMessageCurrently } p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...) return p, nil}NewPushConsumer办法实例化defaultConsumer及pushConsumerStartrocketmq-client-go-v2.0.0/consumer/push_consumer.go ...

July 10, 2020 · 7 min · jiezi

聊聊rocketmqclientgo的PullConsumer

序本文次要钻研一下rocketmq-client-go的PullConsumer PullConsumerrocketmq-client-go-v2.0.0/consumer/pull_consumer.go type PullConsumer interface { // Start Start() // Shutdown refuse all new pull operation, finish all submitted. Shutdown() // Pull pull message of topic, selector indicate which queue to pull. Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) // PullFrom pull messages of queue from the offset to offset + numbers PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) // updateOffset update offset of queue in mem UpdateOffset(queue *primitive.MessageQueue, offset int64) error // PersistOffset persist all offset in mem. PersistOffset(ctx context.Context) error // CurrentOffset return the current offset of queue in mem. CurrentOffset(queue *primitive.MessageQueue) (int64, error)}PullConsumer定义了Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset办法defaultPullConsumerrocketmq-client-go-v2.0.0/consumer/pull_consumer.go ...

July 9, 2020 · 3 min · jiezi

聊聊rocketmqclientgo的TraceInterceptor

序本文主要研究一下rocketmq-client-go的TraceInterceptor TraceInterceptorrocketmq-client-go-v2.0.0/producer/interceptor.go // WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.func WithTrace(traceCfg *primitive.TraceConfig) Option { return func(options *producerOptions) { ori := options.Interceptors options.Interceptors = make([]primitive.Interceptor, 0) options.Interceptors = append(options.Interceptors, newTraceInterceptor(traceCfg)) options.Interceptors = append(options.Interceptors, ori...) }}WithTrace方法在options.Interceptors后追加TraceInterceptornewTraceInterceptorrocketmq-client-go-v2.0.0/producer/interceptor.go func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor { dispatcher := internal.NewTraceDispatcher(traceCfg) dispatcher.Start() return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error { beginT := time.Now() err := next(ctx, req, reply) producerCtx := primitive.GetProducerCtx(ctx) if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() { return next(ctx, req, reply) } // SendOneway && SendAsync has no reply. if reply == nil { return err } result := reply.(*primitive.SendResult) if result.RegionID == "" || !result.TraceOn { return err } sendSuccess := result.Status == primitive.SendOK costT := time.Since(beginT).Nanoseconds() / int64(time.Millisecond) storeT := beginT.UnixNano()/int64(time.Millisecond) + costT/2 traceBean := internal.TraceBean{ Topic: producerCtx.Message.Topic, Tags: producerCtx.Message.GetTags(), Keys: producerCtx.Message.GetKeys(), StoreHost: producerCtx.BrokerAddr, ClientHost: utils.LocalIP, BodyLength: len(producerCtx.Message.Body), MsgType: producerCtx.MsgType, MsgId: result.MsgID, OffsetMsgId: result.OffsetMsgID, StoreTime: storeT, } traceCtx := internal.TraceContext{ RequestId: primitive.CreateUniqID(), // set id TimeStamp: time.Now().UnixNano() / int64(time.Millisecond), TraceType: internal.Pub, GroupName: producerCtx.ProducerGroup, RegionId: result.RegionID, TraceBeans: []internal.TraceBean{traceBean}, CostTime: costT, IsSuccess: sendSuccess, } dispatcher.Append(traceCtx) return err }}newTraceInterceptor方法首先通过internal.NewTraceDispatcher(traceCfg)创建dispatcher,然后执行dispatcher.Start方法,之后返回一个func,该func会构造traceCtx,然后执行dispatcher.Append(traceCtx)小结WithTrace方法在options.Interceptors后追加TraceInterceptor;而newTraceInterceptor方法则创建TraceInterceptor ...

July 7, 2020 · 1 min · jiezi

聊聊rocketmqclientgo的transactionProducer

序本文主要研究一下rocketmq-client-go的transactionProducer transactionProducerrocketmq-client-go-v2.0.0/producer/producer.go type transactionProducer struct { producer *defaultProducer listener primitive.TransactionListener}transactionProducer定义了producer及listener属性NewTransactionProducerrocketmq-client-go-v2.0.0/producer/producer.go func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) { producer, err := NewDefaultProducer(opts...) if err != nil { return nil, errors.Wrap(err, "NewDefaultProducer failed.") } return &transactionProducer{ producer: producer, listener: listener, }, nil}NewTransactionProducer方法实例化transactionProducerStartrocketmq-client-go-v2.0.0/producer/producer.go func (tp *transactionProducer) Start() error { go primitive.WithRecover(func() { tp.checkTransactionState() }) return tp.producer.Start()}Start方法先异步执行tp.checkTransactionState(),然后执行tp.producer.Start()checkTransactionStaterocketmq-client-go-v2.0.0/producer/producer.go func (tp *transactionProducer) checkTransactionState() { for ch := range tp.producer.callbackCh { switch callback := ch.(type) { case *internal.CheckTransactionStateCallback: localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg) uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) if uniqueKey == "" { uniqueKey = callback.Msg.MsgId } header := &internal.EndTransactionRequestHeader{ CommitLogOffset: callback.Header.CommitLogOffset, ProducerGroup: tp.producer.group, TranStateTableOffset: callback.Header.TranStateTableOffset, FromTransactionCheck: true, MsgID: uniqueKey, TransactionId: callback.Header.TransactionId, CommitOrRollback: tp.transactionState(localTransactionState), } req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil) req.Remark = tp.errRemark(nil) err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req, tp.producer.options.SendMsgTimeout) if err != nil { rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{ "callback": callback.Addr.String(), "request": req.String(), rlog.LogKeyUnderlayError: err, }) } default: rlog.Error(fmt.Sprintf("unknown type %v", ch), nil) } }}checkTransactionState方法遍历tp.producer.callbackCh,根据type来不同处理,目前支持CheckTransactionStateCallback,它会构造EndTransactionRequestHeader执行tp.producer.client.InvokeOneWayShutdownrocketmq-client-go-v2.0.0/producer/producer.go ...

July 7, 2020 · 2 min · jiezi

惊艳阿里面试官掌握RocketMQ与Kafka中如何实现事务

文末有惊喜!RocketMQ的事务是如何实现的?首先我们来看 RocketMQ 的事务。我在之前的课程中,已经给大家讲解过 RocketMQ 事务的大致流程,这里我们再一起通过代码,重温一下这个流程。 //+V:BGM7756,免费领取资料public class CreateOrderService { @Inject private OrderDao orderDao; //注入订单表的DAO @Inject private ExecutorService executorService; //注入一个ExecutorService private TransactionMQProducer producer; //初始化transactionListener和producer @Init public void init() throws MQClientException { TransactionListener transactionListener = createTransactionListener(); producer = new TransactionMQProducer("myGroup"); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); } //创建订单服务的请求入口 @PUT @RequestMapping(...) public Boolean createOrder(@RequestBody CreateOrderRequest request) { //根据创建订单请求创建一条消息 Message msg = createMessage(request); //发送事务消息 SendResult sendResult = producer.sendMessageInTransaction(msg, request); //返回:事务是否成功 return sendResult.getSendStatus() == SendStatus.SEND_OK; } private TransactionListener createTransactionListener() { return new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { CreateOrderRequest request = (CreateOrderRequest ) arg; try { //执行本地事务创建订单 orderDao.createOrderInDB(request); //如果没抛异常说明执行成功,提交事务消息 return LocalTransactionState.COMMIT_MESSAGE; } catch (Throwable t) { //失败则直接回滚事务消息 return LocalTransactionState.ROLLBACK_MESSAGE; } } //反查本地事务 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { //从消息中获得订单ID String orderId = msg.getUserProperty("orderId"); //去数据库中查询订单号是否存在,如果存在则提交事务; //如果不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回UNKNOW//(PS:这里RocketMQ有个拼写错误:UNKNOW) return orderDao.isOrderIdExistsInDB(orderId)? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW; } } ; } //....}//+V:BGM7756,免费领取资料在这个流程中,我们提供一个创建订单的服务,功能就是在数据库中插入一条订单记录,并发送一条创建订单的消息,要求写数据库和发消息这两个操作在一个事务内执行,要么都成功,要么都失败。在这段代码中,我们首先在 init() 方法中初始化了 transactionListener和发生 RocketMQ 事务消息的变量 producer。真正提供创建订单服务的方法是createOrder(),在这个方法里面,我们根据请求的参数创建一条消息,然后调用RocketMQ producer 发送事务消息,并返回事务执行结果。 ...

July 6, 2020 · 3 min · jiezi

聊聊rocketmqclientgo的defaultProducer

序本文主要研究一下rocketmq-client-go的defaultProducer defaultProducerrocketmq-client-go-v2.0.0/producer/producer.go type defaultProducer struct { group string client internal.RMQClient state int32 options producerOptions publishInfo sync.Map callbackCh chan interface{} interceptor primitive.Interceptor}defaultProducer定义了group、client、state、options、publishInfo、callbackCh、interceptorNewDefaultProducerrocketmq-client-go-v2.0.0/producer/producer.go func NewDefaultProducer(opts ...Option) (*defaultProducer, error) { defaultOpts := defaultProducerOptions() for _, apply := range opts { apply(&defaultOpts) } srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs) if err != nil { return nil, errors.Wrap(err, "new Namesrv failed.") } if !defaultOpts.Credentials.IsEmpty() { srvs.SetCredentials(defaultOpts.Credentials) } defaultOpts.Namesrv = srvs producer := &defaultProducer{ group: defaultOpts.GroupName, callbackCh: make(chan interface{}), options: defaultOpts, } producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh) producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...) return producer, nil}NewDefaultProducer方法通过internal.NewNamesrv创建NameServerAddrs,之后实例化defaultProducer,然后实例化internal.GetOrNewRocketMQClient及primitive.ChainInterceptorsStartrocketmq-client-go-v2.0.0/producer/producer.go ...

July 5, 2020 · 3 min · jiezi

聊聊rocketmqclientgo的apigo

序本文主要研究一下rocketmq-client-go的api.go Producerrocketmq-client-go-v2.0.0/api.go type Producer interface { Start() error Shutdown() error SendSync(ctx context.Context, mq ...*primitive.Message) (*primitive.SendResult, error) SendAsync(ctx context.Context, mq func(ctx context.Context, result *primitive.SendResult, err error), msg ...*primitive.Message) error SendOneWay(ctx context.Context, mq ...*primitive.Message) error}func NewProducer(opts ...producer.Option) (Producer, error) { return producer.NewDefaultProducer(opts...)}Producer定义了Start、Shutdown、SendSync、SendAsync、SendOneWay方法;NewProducer方法通过producer.NewDefaultProducer创建ProducerTransactionProducerrocketmq-client-go-v2.0.0/api.go type TransactionProducer interface { Start() error Shutdown() error SendMessageInTransaction(ctx context.Context, mq *primitive.Message) (*primitive.TransactionSendResult, error)}func NewTransactionProducer(listener primitive.TransactionListener, opts ...producer.Option) (TransactionProducer, error) { return producer.NewTransactionProducer(listener, opts...)}TransactionProducer方法定义了Start、Shutdown、SendMessageInTransaction方法;NewTransactionProducer方法通过producer.NewTransactionProducer创建TransactionProducerPushConsumerrocketmq-client-go-v2.0.0/api.go type PushConsumer interface { // Start the PullConsumer for consuming message Start() error // Shutdown the PullConsumer, all offset of MessageQueue will be sync to broker before process exit Shutdown() error // Subscribe a topic for consuming Subscribe(topic string, selector consumer.MessageSelector, f func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error)) error // Unsubscribe a topic Unsubscribe(topic string) error}func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) { return consumer.NewPushConsumer(opts...)}PushConsumer定义了Start、Shutdown、Subscribe、Unsubscribe方法;NewPushConsumer通过consumer.NewPushConsumer创建PushConsumerPullConsumerrocketmq-client-go-v2.0.0/api.go ...

July 4, 2020 · 3 min · jiezi

MySQL数据库增量日志解析工具canal-实战

简介canal,阿里开源工具,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费应用场景数据库实时备份业务cache刷新索引构建和实时维护,例:将商品数据推送到es中构建倒排索引带业务逻辑的增量数据处理,例:增量数据推送到第三方平台官网https://github.com/alibaba/canal 原理 MySQL master将数据写入binlogcanal 向master发送dump协议master 收到dump请求,推送binlog给canalcanal 解析binlog,可讲数据投递到MQ系统中,目前支持kafka、RocketMQ安装配置mysql建议的mysql版本是5.7.xmysql8.0.x见官方说明https://github.com/alibaba/ca... 修改mysql配置文件my.cnf [mysqld]log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式server_id=1新增用户并授权,测试的话可以直接使用root用户 CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '123456'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;配置canal查看是否安装java ➜ canal-admin java -versionjava version "1.8.0_251"Java(TM) SE Runtime Environment (build 1.8.0_251-b08)Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)若没有安装,去oracle官网下载1.8版本(切勿选择高版本),选择适合自己系统的版本安装即可,mac系统下载的dmg,直接点击安装,不再累述image.png 访问release页面,下载最新稳定版1.14wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz 解压到指定目录 mkdir /tmp/canaltar zxvf canal.deployer-1.1.4.tar.gz -C /tmp/canal进入到canal目录,查看文件 ...

June 3, 2020 · 4 min · jiezi

搭建rocketmq阅读源码环境

记录下搭建rocketmq阅读源码的环境 1.clone代码 git clone git@github.com:apache/rocketmq.git2.导入idea 3.找一个目录作为rocketmq的工作目录比如D:rocketmq-home新建三个文件夹conf: 配置文件logs: 日志文件store: broker持久化目录在idea中找到distribution目录把conf目录下面的logback_namesrv.xmllogback_broker.xmlbroker.conf文件拷贝到D:rocketmq-homeconf目录 broker.cnf增加 namesrvAddr=127.0.0.1:9876storePathRootDir=D:\\rocketmq-home\\storestorePathCommitLog=D:\\rocketmq-home\\store\\commitLogstorePathConsumeQueue=D:\\rocketmq-home\\store\\consumequeuestorePathIndex=D:\\rocketmq-home\\store\\indexstoreCheckpoint=D:\\rocketmq-home\\store\\checkpointabortFile=D:\\rocketmq-home\\store\\abort4.找到namesrv的启动类NameSrvStartup设置环境变量ROCKETMQ_HOME=D:rocketmq-home 5.找到broker的启动类BrokerStartup设置环境变量ROCKETMQ_HOME=D:rocketmq-home

June 2, 2020 · 1 min · jiezi

rocketMQ461-系列教程-提炼篇

1、namrsrv 与 broker1.1、namesrv 与 broker 架构 从架构图来看, namesrv 充当的角色是 注册中心。只不过有点特殊的是, namesrv 之间互不通信。 1.2、namesrv 与 broker 的通信broker 每隔 30s 会向 namesrv 注册自身的信息。namesrv 每隔 10s 检查 120s 内 无响应的 broker, 并进行剔除。 那么 namesrv 与 broker 的通信模型,会出现一个问题。那就是, namesrv 至少需要 120s 左右才会感知到 broker 死亡。 1.3、rocketmq 的消息模型在 rocketMQ 消息模型中,有以下几个主要角色:生产者、消费者、队列。之间的沟通方式为如下。 图主要表达几个信息:1、topic 实际上是逻辑结构,queue 才是 物理结构,也就是 rocketmq 是基于 队列 进行消费的2、消费者是以组为单位进行消费的。3、在图中, 我特意让 groupA 只订阅 topicA,而没有订阅 topicB。这个原因是, rocketMQ 在被设计时,就不希望一个消费者同时处理多个类型的消息。因此同一个 consumerGroup 下的 consumer 职责应该是一样的,不要干不同的事情(即消费多个topic)。 2、消息发送者2.1、消息发送的三种方式同步发送同步发送时,需要等待 broker 将消息存入 commitlog 文件后,才会返回,生产者线程阻塞。异步发送异步发送时,是使用线程池提交任务的。核心线程、最大线程数量=cpu 核数。见 DefaultMQProducerImpl。 发送消息,不需要等到服务器返回结果。单向发送只管发,不管成功与否2.2、消息发送流程大体流程:验证消息 => 找到 broker => 选择队列 => 消息发送。 ...

May 27, 2020 · 2 min · jiezi