关于golang:微服务从代码到k8s部署应有尽有系列八各种队列

39次阅读

共计 9273 个字符,预计需要花费 24 分钟才能阅读完成。

咱们用一个系列来解说从需要到上线、从代码到 k8s 部署、从日志到监控等各个方面的微服务残缺实际。

整个我的项目应用了 go-zero 开发的微服务,根本蕴含了 go-zero 以及相干 go-zero 作者开发的一些中间件,所用到的技术栈根本是 go-zero 项目组的自研组件,根本是 go-zero 全家桶了。

实战我的项目地址:https://github.com/Mikaelemmm…

1、概述

音讯队列有很多种,有 rabbitmq、rocketmq、kafka 等罕用的,其中 go-queue(https://github.com/zeromicro/…)是 go-zero 官网开发的音讯队列组件,其中分为 2 类,一种是 kq、一种是 dq,kq 是基于 kafka 的音讯队列,dq 是基于 beanstalkd 的提早队列,然而 go-queue 不反对定时工作。具体想更多理解 go-queue 的我之前也写过一篇教程能够去看一下这里不细说了。

本我的项目采纳的是 go-queue 做音讯队列,asynq 做提早队列、定时队列

为什么应用 asynq 的几个起因

  • 间接基于 redis,个别我的项目都有 redis,而 asynq 自身就是基于 redis 所以能够少保护一个中间件
  • 反对音讯队列、提早队列、定时任务调度,因为心愿我的项目反对定时工作而 asynq 间接就反对
  • 有 webui 界面,每个工作都能够暂停、归档、通过 ui 界面查看成功失败、监控

为什么 asynq 反对音讯队列还在应用 go-queue?

  • kafka 的吞吐是业绩闻名的,如果后期量不大能够间接用 asynq
  • 没啥目标,就是想给你们演示一下 go-queue

在咱们应用 go-zero 的时候,goctl 给咱们带了很大的便当,然而目前 go-zero 只有生成 api、rpc,很多同学在群里问定时工作、提早队列、音讯队列如何生成,目录构造该怎么做,其实 go-zero 是为咱们设计好了的,就是 serviceGroup,应用 serviceGroup 治理你的服务。

2、如何应用

在后面订单、音讯等场景咱们其实曾经演示过了,这里再额定独自补充一次

咱们还是拿 order-mq 来举例子,显然应用 goctl 生成 api、rpc 不是咱们想要的,那咱们就本人应用 serviceGroup 革新,目录构造还是连续 api 的根本差不多,只是将 handler 改成了 listen,将 logic 换成了 mqs。

2.1 在 main 中代码如下

var configFile = flag.String("f", "etc/order.yaml", "Specify the config file")

func main() {flag.Parse()
    var c config.Config

    conf.MustLoad(*configFile, &c)
    // log, prometheus, trace, metricsUrl
    if err := c.SetUp(); err != nil {panic(err)
    }

    serviceGroup := service.NewServiceGroup()
    defer serviceGroup.Stop()

    for _, mq := range listen.Mqs(c) {serviceGroup.Add(mq)
    }

    serviceGroup.Start()}
  • 首先咱们要定义配置以及解析配置。
  • 其次为什么咱们要在这里加 SetUp 而 api、rpc 不须要呢?因为 api、rpc 都是在 MustNewServer 中曾经框架写的,然而咱们用 serviceGroup 治理没有, 能够手动点进去 SetUp 看看,这个办法中蕴含了 log、prometheus、trace、metricsUrl 的定义,一个办法能够省很多事件,这样咱们间接批改配置文件就能够实现日志、监控、链路追踪了。
  • 接下来就是 go-zero 的 serivceGroup 治理服务了,serviceGroup 是用来治理一组 service 的,那 service 其实就是一个接口,代码如下

    Service (代码在 go-zero/core/service/servicegroup.go)

    // Service is the interface that groups Start and Stop methods.
    Service interface {
        Starter // Start
        Stopper // Stop
    }

    所以,只有你的服务实现了这两个接口,就能够退出到 serviceGroup 对立治理

    那能够看到咱们把所有的 mq 都实现这个接口,而后对立放到都 list.Mqs 中,在启动服务即可

2.2 mq 分类管理

go-zero-looklook/app/order/cmd/mq/internal/listen 目录下代码

该目录下代码是对立治理不同类型 mq,因为咱们要治理 kq、asynq 可能后续还有 rabbitmq、rocketmq 等等,所以在这里做了分类不便保护

对立治理在 go-zero-looklook/app/order/cmd/mq/internal/listen/listen.go, 而后在 main 中调用 listen.Mqs 能够获取所有 mq 一起 start

// 返回所有消费者
func Mqs(c config.Config) []service.Service {svcContext := svc.NewServiceContext(c)
    ctx := context.Background()

    var services []service.Service

    // kq:音讯队列.
    services = append(services, KqMqs(c, ctx, svcContext)...)
    // asynq:提早队列、定时工作
    services = append(services, AsynqMqs(c, ctx, svcContext)...)
    // other mq ....

    return services
}

go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go 就是定义的 asynq

// asynq
// 定时工作、提早工作
func AsynqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {return []service.Service{
      // 监听提早队列
      deferMq.NewAsynqTask(ctx, svcContext),

      // 监听定时工作
   }
}

go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go 就是定义的 kq (go-queue 的 kafka)

// kq
// 音讯队列
func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {return []service.Service{
        // 监听生产流水状态变更
        kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),
        // .....
    }
}

2.3 理论业务

编写理论业务,咱们就在 go-zero-looklook/app/order/cmd/mq/internal/listen/mqs 下,这里为了不便保护,也是做了分类

  • deferMq : 提早队列
  • kq:音讯队列

2.3.1 提早队列

// 监听敞开订单
type AsynqTask struct {
   ctx    context.Context
   svcCtx *svc.ServiceContext
}

func NewAsynqTask(ctx context.Context, svcCtx *svc.ServiceContext) *AsynqTask {
   return &AsynqTask{
      ctx:    ctx,
      svcCtx: svcCtx,
   }
}

func (l *AsynqTask) Start() {fmt.Println("AsynqTask start")

   srv := asynq.NewServer(asynq.RedisClientOpt{Addr: l.svcCtx.Config.Redis.Host, Password: l.svcCtx.Config.Redis.Pass},
      asynq.Config{
         Concurrency: 10,
         Queues: map[string]int{
            "critical": 6,
            "default":  3,
            "low":      1,
         },
      },
   )

   mux := asynq.NewServeMux()

   // 敞开民宿订单工作
   mux.HandleFunc(asynqmq.TypeHomestayOrderCloseDelivery, l.closeHomestayOrderStateMqHandler)

   if err := srv.Run(mux); err != nil {log.Fatalf("could not run server: %v", err)
   }
}

func (l *AsynqTask) Stop() {fmt.Println("AsynqTask stop")
}

因为 asynq 要先启动,而后定义路由工作,所以咱们在 asynqTask.go 中做了对立的路由治理,之后咱们每个业务都独自的在 deferMq 的文件夹上面定义一个文件(如“提早敞开订单:closeHomestayOrderState.go”), 这样每个业务一个文件,跟 go-zero 的 api、rpc 的 logic 一样,保护很不便

closeHomestayOrderState.go 敞开订单逻辑

package deferMq

import (
    "context"
    "encoding/json"
    "looklook/app/order/cmd/rpc/order"
    "looklook/app/order/model"
    "looklook/common/asynqmq"
    "looklook/common/xerr"

    "github.com/hibiken/asynq"
    "github.com/pkg/errors"
)

func (l *AsynqTask) closeHomestayOrderStateMqHandler(ctx context.Context, t *asynq.Task) error {
    var p asynqmq.HomestayOrderCloseTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {return errors.Wrapf(xerr.NewErrMsg("解析 asynq task payload err"), "closeHomestayOrderStateMqHandler payload err:%v, payLoad:%+v", err, t.Payload())
    }

    resp, err := l.svcCtx.OrderRpc.HomestayOrderDetail(ctx, &order.HomestayOrderDetailReq{Sn: p.Sn,})
    if err != nil || resp.HomestayOrder == nil {return errors.Wrapf(xerr.NewErrMsg("获取订单失败"), "closeHomestayOrderStateMqHandler 获取订单失败 or 订单不存在 err:%v, sn:%s ,HomestayOrder : %+v", err, p.Sn, resp.HomestayOrder)
    }

    if resp.HomestayOrder.TradeState == model.HomestayOrderTradeStateWaitPay {
        _, err := l.svcCtx.OrderRpc.UpdateHomestayOrderTradeState(ctx, &order.UpdateHomestayOrderTradeStateReq{
            Sn:         p.Sn,
            TradeState: model.HomestayOrderTradeStateCancel,
        })
        if err != nil {return errors.Wrapf(xerr.NewErrMsg("敞开订单失败"), "closeHomestayOrderStateMqHandler 敞开订单失败  err:%v, sn:%s", err, p.Sn)
        }
    }

    return nil
}

2.3.2 kq 音讯队列

看 go-zero-looklook/app/order/cmd/mq/internal/mqs/kq 文件夹下,因为 kq 跟 asynq 不太一样,它自身就是应用 go-zero 的 Service 治理的,曾经实现了 starter、stopper 接口了,所以咱们在 /Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go 中间接定义好一个 go-queue 业务扔给 serviceGroup,去交给 main 启动就好了 , 咱们的业务代码只须要实现 go-queue 的 Consumer 间接写咱们本人业务即可。

1)/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go

func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {return []service.Service{
        // 监听生产流水状态变更
        kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),
        // .....
    }
}

能够看到 kq.MustNewQueue 自身返回就是 queue.MessageQueue,queue.MessageQueue 又实现了 Start、Stop

2)业务中

/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/mqs/kq/paymentUpdateStatus.go

func (l *PaymentUpdateStatusMq) Consume(_, val string) error {fmt.Printf("PaymentUpdateStatusMq Consume val : %s \n", val)
    // 解析数据
    var message kqueue.ThirdPaymentUpdatePayStatusNotifyMessage
    if err := json.Unmarshal([]byte(val), &message); err != nil {logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->Consume Unmarshal err : %v , val : %s", err, val)
        return err
    }

    // 执行业务..
    if err := l.execService(message); err != nil {logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->execService  err : %v , val : %s , message:%+v", err, val, message)
        return err
    }

    return nil
}

咱们在 paymentUpdateStatus.go 中只须要实现接口 Consume 就能够承受来自 kq 传过来的 kafka 的音讯了,咱们只管在咱们 Consumer 中解决咱们业务即可

3、定时工作

对于定时工作,目前 go-zero-looklook 没有应用,这里我也阐明一下

  • 如果你想简略一点间接应用 cron(裸机、k8s 都有),
  • 如果略微简单一点能够应用 https://github.com/robfig/cron 包,在代码中定义工夫
  • 应用 xxl-job、gocron 分布式定时工作零碎接入
  • asynq 的 shedule

这里因为我的项目用的 asynq,我就演示一下 asynq 的 shedule 吧

分为 client 与 server,client 用来定义调度工夫,server 是到了工夫承受 client 的音讯触发来执行咱们写的业务的,理论业务咱们应该写在 server,client 用来定义业务调度工夫的

asynqtest/docker-compose.yml

version: '3'

services:
  #asynqmon asynq 提早队列、定时队列的 webui
  asynqmon:
    image: hibiken/asynqmon:latest
    container_name: asynqmon_asynq
    ports:
      - 8980:8080
    command:
      - '--redis-addr=redis:6379'
      - '--redis-password=G62m50oigInC30sf'
    restart: always
    networks:
      - asynqtest_net
    depends_on:
      - redis
  
  #redis 容器
  redis:
    image: redis:6.2.5
    container_name: redis_asynq
    ports:
      - 63779:6379
    environment:
      # 时区上海
      TZ: Asia/Shanghai
    volumes:
      # 数据文件
      - ./data/redis/data:/data:rw
    command: "redis-server --requirepass G62m50oigInC30sf  --appendonly yes"
    privileged: true
    restart: always
    networks:
      - asynqtest_net

networks:
  asynqtest_net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.22.0.0/16

asynqtest/shedule/client/client.go

package main

import (
    "asynqtest/tpl"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

const redisAddr = "127.0.0.1:63779"
const redisPwd = "G62m50oigInC30sf"

func main() {
    // 周期性工作
    scheduler := asynq.NewScheduler(
        asynq.RedisClientOpt{
            Addr:     redisAddr,
            Password: redisPwd,
        }, nil)

    payload, err := json.Marshal(tpl.EmailPayload{Email: "546630576@qq.com", Content: "发邮件呀"})
    if err != nil {log.Fatal(err)
    }

    task := asynq.NewTask(tpl.EMAIL_TPL, payload)
    // 每隔 1 分钟同步一次
    entryID, err := scheduler.Register("*/1 * * * *", task)

    if err != nil {log.Fatal(err)
    }
    log.Printf("registered an entry: %q\n", entryID)

    if err := scheduler.Run(); err != nil {log.Fatal(err)
    }
}

asynqtest/shedule/server/server.go

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "asynqtest/tpl"

    "github.com/hibiken/asynq"
)

func main() {
    srv := asynq.NewServer(asynq.RedisClientOpt{Addr: "127.0.0.1:63779", Password: "G62m50oigInC30sf"},
        asynq.Config{
            Concurrency: 10,
            Queues: map[string]int{
                "critical": 6,
                "default":  3,
                "low":      1,
            },
        },
    )

    mux := asynq.NewServeMux()

    // 敞开民宿订单工作
    mux.HandleFunc(tpl.EMAIL_TPL, emailMqHandler)

    if err := srv.Run(mux); err != nil {log.Fatalf("could not run server: %v", err)
    }
}

func emailMqHandler(ctx context.Context, t *asynq.Task) error {
    var p tpl.EmailPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {return fmt.Errorf("emailMqHandler err:%+v", err)
    }

    fmt.Printf("p : %+v \n", p)

    return nil
}

asynqtest/tpl/tpl.go

package tpl

const EMAIL_TPL = "schedule:email"

type EmailPayload struct {
    Email   string
    Content string
}

启动 server.goclient.go

浏览器输出 http://127.0.0.1:8980/schedulers 这里 能够看到所有 client 定义的工作

浏览器输出 http://127.0.0.1:8990/ 这里能够看到咱们的 server 生产请

控制台生产状况

说一下 asynq 的 shedule 在集成到我的项目中的思路,能够独自启动一个服务作为调度 client 定义零碎的定时任务调度治理,将 server 定义在每个业务本人的 mq 的 asynq 一起即可。

4、结尾

在这一节中,咱们学会应用了音讯队列、提早队列,kafka 能够通过管理工具去查看,至于 asynq 查看 webui 在 go-zero-looklook/docker-compose-env.yml 中咱们曾经启动好了 asynqmon,间接应用 http://127.0.0.1:8980 即可查看

我的项目地址

https://github.com/zeromicro/go-zero

欢送应用 go-zerostar 反对咱们!

微信交换群

关注『微服务实际 』公众号并点击 交换群 获取社区群二维码。

正文完
 0