承接上篇:上篇文章讲到革新 go-zero 生成的 app module 中的 gateway & RPC 。本篇讲讲如何接入 异步工作 以及 log的应用

Delay Job

日常工作凋谢中,咱们会有很多异步、批量、定时、提早工作要解决,go-zero中有 go-queue,举荐应用 go-queue 去解决,go-queue 自身也是基于 go-zero 开发的,其自身是有两种模式:

  • dq : 依赖于 beanstalkd ,分布式,可存储,提早、定时设置,关机重启能够从新执行,音讯会失落,应用非常简单,go-queue中应用了redis setnx保障了每个音讯只被生产一次,应用场景次要是用来做日常工作应用
  • kq:依赖于 kafka ,这个就不多介绍啦,赫赫有名的 kafka ,应用场景次要是做日志用

咱们次要说一下dq,kq应用也一样的,只是依赖底层不同,如果没应用过beanstalkd,没接触过beanstalkd的能够先google一下,应用起来还是挺容易的。

我在jobs下应用goctl新建了一个message-job.api服务

info(    title: //音讯工作    desc: // 音讯工作    author: "Mikael"    email: "13247629622@163.com")type BatchSendMessageReq {}type BatchSendMessageResp {}service message-job-api {    @handler batchSendMessageHandler // 批量发送短信    post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)}

因为不须要应用路由,所以handler下的routes.go被我删除了,在handler下新建了一个jobRun.go,内容如下:

package handlerimport (    "fishtwo/lib/xgo"    "fishtwo/app/jobs/message/internal/svc")/*** @Description 启动job* @Author Mikael* @Date 2021/1/18 12:05* @Version 1.0**/func JobRun(serverCtx *svc.ServiceContext)  {    xgo.Go(func() {        batchSendMessageHandler(serverCtx)    //...many job    })}

其实xgo.Go就是 go batchSendMessageHandler(serverCtx) ,封装了一下go携程,避免家养goroutine panic

而后批改一下启动文件message-job.go

package mainimport (   "flag"   "fmt"   "fishtwo/app/jobs/message/internal/config"   "fishtwo/app/jobs/message/internal/handler"   "fishtwo/app/jobs/message/internal/svc"   "github.com/tal-tech/go-zero/core/conf"   "github.com/tal-tech/go-zero/rest")var configFile = flag.String("f", "etc/message-job-api.yaml", "the config file")func main() {   flag.Parse()   var c config.Config   conf.MustLoad(*configFile, &c)   ctx := svc.NewServiceContext(c)   server := rest.MustNewServer(c.RestConf)   defer server.Stop()   handler.JobRun(ctx)   fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)   server.Start()}

次要是handler.RegisterHandlers(server, ctx) 批改为handler.JobRun(ctx)

接下来,咱们就能够引入dq了,首先在etc/xxx.yaml下增加dqConf

.....DqConf:  Beanstalks:    - Endpoint: 127.0.0.1:7771      Tube: tube1    - Endpoint: 127.0.0.1:7772      Tube: tube2  Redis:    Host: 127.0.0.1:6379    Type: node

我这里本地用不同端口,模仿开了2个节点,7771、7772

在internal/config/config.go增加配置解析对象

type Config struct {    ....    DqConf dq.DqConf}

批改handler/batchsendmessagehandler.go

package handlerimport (    "context"    "fishtwo/app/jobs/message/internal/logic"    "fishtwo/app/jobs/message/internal/svc"    "github.com/tal-tech/go-zero/core/logx")func batchSendMessageHandler(ctx *svc.ServiceContext){    rootCxt:= context.Background()    l := logic.NewBatchSendMessageLogic(context.Background(), ctx)    err := l.BatchSendMessage()    if err != nil{        logx.WithContext(rootCxt).Error("【JOB-ERR】 : %+v ",err)    }}

批改logic下batchsendmessagelogic.go,写咱们的consumer生产逻辑

package logicimport (   "context"   "fishtwo/app/jobs/message/internal/svc"   "fmt"   "github.com/tal-tech/go-zero/core/logx")type BatchSendMessageLogic struct {   logx.Logger   ctx    context.Context   svcCtx *svc.ServiceContext}func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic {   return BatchSendMessageLogic{       Logger: logx.WithContext(ctx),       ctx:    ctx,       svcCtx: svcCtx,   }}func (l *BatchSendMessageLogic) BatchSendMessage() error {   fmt.Println("job BatchSendMessage start")   l.svcCtx.Consumer.Consume(func(body []byte) {       fmt.Printf("job BatchSendMessage %s \n" + string(body))   })   fmt.Printf("job BatchSendMessage finish \n")   return nil}

这样就功败垂成了,启动message-job.go就ok课

go run message-job.go

之后咱们就能够在业务代码中向dq增加工作,它就能够主动生产了

producer.Delay 向dq中投递5个提早工作:

    producer := dq.NewProducer([]dq.Beanstalk{        {            Endpoint: "localhost:7771",            Tube:     "tube1",        },        {            Endpoint: "localhost:7772",            Tube:     "tube2",        },    })    for i := 1000; i < 1005; i++ {        _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)        if err != nil {            fmt.Println(err)        }    }

producer.At 能够指定某个工夫执行,十分好用,感兴趣的敌人本人能够钻研下。

谬误日志

在后面说到gateway革新时候,如果眼神好的童鞋,在下面的httpresult.go中曾经看到了log的身影:

咱们在来看下rpc中怎么解决的

是的,我在每个rpc启动的main中退出了grpc拦截器 https://www.yuque.com/tal-tec...,那让咱们看看grpc拦截器外面做了什么

而后我代码外面应用github/pkg/errors这个包去处理错误的,这个包还是很好用的

所以呢:

咱们在 grpc 中打印日志 logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v",err)

api 中打印日志 logx.WithContext(r.Context()).Error("【GATEWAY-SRV-ERR】 : %+v ",err)

go-zero 中打印日志,应用 logx.WithContext 会把trace-id带入,这样一个申请下来,比方

user-api --> user-srv --> message-srv

那如果 messsage-srv 出错,他们三个是同一个 trace-id ,是不是就能够在elk通过输出这个trace-id一次性搜寻进去这条申请报错堆栈信息呢?当然你也能够接入 jaeger、zipkin、skywalking 等,这个我临时还没接入。

框架地址

https://github.com/tal-tech/go-zero

欢送应用 go-zero 并 star 反对咱们!????

go-zero 系列文章见『微服务实际』公众号