乐趣区

关于golang:分布式任务-消息队列框架-goqueue

  1. 为什么写这个库
  2. 利用场景有哪些
  3. 如何应用
  4. 总结

为什么要写这个库?

在开始自研 go-queue 之前,针对以下咱们调研目前的开源队列计划:

beanstalkd

beanstalkd 有一些非凡好用性能:反对工作 priority、延时 (delay)、超时重发(time-to-run) 和预留(buried),可能很好的反对分布式的后台任务和定时工作解决。如下是 beanstalkd 根本局部:

  • job:工作单元;
  • tube:工作队列,存储对立类型 job。producer 和 consumer 操作对象;
  • producerjob 生产者,通过 put 将 job 退出一个 tube;
  • consumerjob 消费者,通过 reserve/release/bury/delete 来获取 job 或扭转 job 的状态;

很侥幸的是官网提供了 go client:https://github.com/beanstalkd…。

然而这对不相熟 beanstalkd 操作的 go 开发者而言,须要学习老本。

kafka

相似基于 kafka 音讯队列作为存储的计划,存储单元是音讯,如果要实现延时执行,能够想到的计划是以延时执行的工夫作为 topic,这样在大型的音讯零碎中,充斥大量一次性的 topicdq_1616324404788, dq_1616324417622),当工夫扩散,会容易造成磁盘随机写的状况。

而且在 go 生态中,

同时思考以下因素:

  • 反对延时工作
  • 高可用,保证数据不失落
  • 可扩大资源和性能

所以咱们本人基于以上两个根底组件开发了 go-queue

  1. 基于 beanstalkd 开发了 dq,反对定时和延时操作。同时退出 redis 保障生产唯一性。
  2. 基于 kafka 开发了 kq,简化生产者和消费者的开发 API,同时在写入 kafka 应用批量写,节俭 IO。

整体设计如下:

利用场景

首先在生产场景来说,一个是针对工作队列,一个是音讯队列。而两者最大的区别:

  • 工作是没有程序束缚;音讯须要;
  • 工作在退出中,或者是期待中,可能存在状态更新(或是勾销);音讯则是繁多的存储即可;

所以在背地的基础设施选型上,也是基于这种生产场景。

  • dq:依赖于 beanstalkd,适宜延时、定时工作执行;
  • kq:依赖于 kafka,实用于异步、批量工作执行;

而从其中 dq 的 API 中也能够看出:

// 提早工作执行
- dq.Delay(msg, delayTime);

// 定时工作执行
- dq.At(msg, atTime);

而在咱们外部:

  • 如果是 异步音讯生产 / 推送,则会抉择应用 kqkq.Push(msg)
  • 如果是 15 分钟揭示 / 今天中午发送短信 等,则应用 dq

如何应用

别离介绍 dqkq 的应用形式:

dq

// [Producer]
producer := dq.NewProducer([]dq.Beanstalk{
    {
        Endpoint: "localhost:11300",
        Tube:     "tube",
    },
    {
        Endpoint: "localhost:11301",
        Tube:     "tube",
    },
})    

for i := 1000; i < 1005; i++ {_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
    if err != nil {fmt.Println(err)
    }
}
// [Consumer]
consumer := dq.NewConsumer(dq.DqConf{Beanstalks: []dq.Beanstalk{
    {
      Endpoint: "localhost:11300",
      Tube:     "tube",
    },
    {
      Endpoint: "localhost:11301",
      Tube:     "tube",
    },
  },
  Redis: redis.RedisConf{
    Host: "localhost:6379",
    Type: redis.NodeType,
  },
})
consumer.Consume(func(body []byte) {
  // your consume logic
  fmt.Println(string(body))
})

和一般的 生产者 - 消费者 模型相似,开发者也只须要关注以下:

  1. 开发者只须要关注本人的工作类型「延时 / 定时」
  2. 生产端的生产逻辑

kq

producer.go

// message structure
type message struct {
    Key     string `json:"key"`
    Value   string `json:"value"`
    Payload string `json:"message"`
}

pusher := kq.NewPusher([]string{
    "127.0.0.1:19092",
    "127.0.0.1:19093",
    "127.0.0.1:19094",
}, "kq")

ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
    select {
    case <-ticker.C:
        count := rand.Intn(100)
    // 筹备音讯
        m := message{Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
            Value:   fmt.Sprintf("%d,%d", round, count),
            Payload: fmt.Sprintf("%d,%d", round, count),
        }
        body, err := json.Marshal(m)
        if err != nil {log.Fatal(err)
        }

        fmt.Println(string(body))
    // push to kafka broker
        if err := pusher.Push(string(body)); err != nil {log.Fatal(err)
        }
    }
}

config.yaml

Name: kq
Brokers:
  - 127.0.0.1:19092
  - 127.0.0.1:19092
  - 127.0.0.1:19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1

consumer.go

var c kq.KqConf
conf.MustLoad("config.yaml", &c)

// WithHandle: 具体的解决 msg 的 logic
// 这也是开发者须要依据本人的业务定制化
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {fmt.Printf("=> %s\n", v)
  return nil
}))
defer q.Stop()
q.Start()

dq 不同的是:开发者不须要关注工作类型(在这里也没有工作的概念,传递的都是 message data)。

其余操作和 dq 相似,只是将 业务处理函数 当成配置间接传入消费者中。

总结

在咱们目前的场景中,kq 大量应用在咱们的异步音讯服务;而延时工作,咱们除了 dq,还能够应用内存版的 TimingWheel「go-zero 生态组件」。

对于 go-queue 更多的设计和实现文章,能够继续关注咱们。欢送大家去关注和应用。

https://github.com/tal-tech/go-queue

https://github.com/tal-tech/go-zero

欢送应用 go-zero 并 star 反对咱们!

go-zero 系列文章见『微服务实际』公众号

退出移动版