承接上篇:上篇文章讲到革新 go-zero
生成的 app module
中的 gateway & RPC
。本篇讲讲如何接入 异步工作 以及 log 的应用。
Delay Job
日常工作凋谢中,咱们会有很多异步、批量、定时、提早工作要解决,go-zero 中有 go-queue
,举荐应用 go-queue
去解决,go-queue
自身也是基于 go-zero
开发的,其自身是有两种模式:
dq
: 依赖于beanstalkd
,分布式,可存储,提早、定时设置,关机重启能够从新执行,音讯会失落,应用非常简单,go-queue 中应用了 redis setnx 保障了每个音讯只被生产一次,应用场景次要是用来做日常工作应用kq
:依赖于kafka
,这个就不多介绍啦,赫赫有名的kafka
,应用场景次要是做日志用
咱们次要说一下 dq,kq 应用也一样的,只是依赖底层不同,如果没应用过 beanstalkd,没接触过 beanstalkd 的能够先 google 一下,应用起来还是挺容易的。
我在 jobs 下应用 goctl 新建了一个 message-job.api 服务
info(
title: // 音讯工作
desc: // 音讯工作
author: "Mikael"
email: "13247629622@163.com"
)
type BatchSendMessageReq {}
type BatchSendMessageResp {}
service message-job-api {
@handler batchSendMessageHandler // 批量发送短信
post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)
}
因为不须要应用路由,所以 handler 下的 routes.go 被我删除了,在 handler 下新建了一个 jobRun.go,内容如下:
package handler
import (
"fishtwo/lib/xgo"
"fishtwo/app/jobs/message/internal/svc"
)
/**
* @Description 启动 job
* @Author Mikael
* @Date 2021/1/18 12:05
* @Version 1.0
**/
func JobRun(serverCtx *svc.ServiceContext) {xgo.Go(func() {batchSendMessageHandler(serverCtx)
//...many job
})
}
其实 xgo.Go 就是 go batchSendMessageHandler(serverCtx)
,封装了一下 go 携程,避免家养 goroutine panic
而后批改一下启动文件 message-job.go
package main
import (
"flag"
"fmt"
"fishtwo/app/jobs/message/internal/config"
"fishtwo/app/jobs/message/internal/handler"
"fishtwo/app/jobs/message/internal/svc"
"github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/rest"
)
var configFile = flag.String("f", "etc/message-job-api.yaml", "the config file")
func main() {flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)
server := rest.MustNewServer(c.RestConf)
defer server.Stop()
handler.JobRun(ctx)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start()}
次要是 handler.RegisterHandlers(server, ctx) 批改为 handler.JobRun(ctx)
接下来,咱们就能够引入 dq 了,首先在 etc/xxx.yaml 下增加 dqConf
.....
DqConf:
Beanstalks:
- Endpoint: 127.0.0.1:7771
Tube: tube1
- Endpoint: 127.0.0.1:7772
Tube: tube2
Redis:
Host: 127.0.0.1:6379
Type: node
我这里本地用不同端口,模仿开了 2 个节点,7771、7772
在 internal/config/config.go 增加配置解析对象
type Config struct {
....
DqConf dq.DqConf
}
批改 handler/batchsendmessagehandler.go
package handler
import (
"context"
"fishtwo/app/jobs/message/internal/logic"
"fishtwo/app/jobs/message/internal/svc"
"github.com/tal-tech/go-zero/core/logx"
)
func batchSendMessageHandler(ctx *svc.ServiceContext){rootCxt:= context.Background()
l := logic.NewBatchSendMessageLogic(context.Background(), ctx)
err := l.BatchSendMessage()
if err != nil{logx.WithContext(rootCxt).Error("【JOB-ERR】: %+v",err)
}
}
批改 logic 下 batchsendmessagelogic.go,写咱们的 consumer 生产逻辑
package logic
import (
"context"
"fishtwo/app/jobs/message/internal/svc"
"fmt"
"github.com/tal-tech/go-zero/core/logx"
)
type BatchSendMessageLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic {
return BatchSendMessageLogic{Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *BatchSendMessageLogic) BatchSendMessage() error {fmt.Println("job BatchSendMessage start")
l.svcCtx.Consumer.Consume(func(body []byte) {fmt.Printf("job BatchSendMessage %s \n" + string(body))
})
fmt.Printf("job BatchSendMessage finish \n")
return nil
}
这样就功败垂成了,启动 message-job.go 就 ok 课
go run message-job.go
之后咱们就能够在业务代码中向 dq 增加工作,它就能够主动生产了
producer.Delay 向 dq 中投递 5 个提早工作:
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:7771",
Tube: "tube1",
},
{
Endpoint: "localhost:7772",
Tube: "tube2",
},
})
for i := 1000; i < 1005; i++ {_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
if err != nil {fmt.Println(err)
}
}
producer.At
能够指定某个工夫执行,十分好用,感兴趣的敌人本人能够钻研下。
谬误日志
在后面说到 gateway 革新时候,如果眼神好的童鞋,在下面的 httpresult.go 中曾经看到了 log 的身影:
咱们在来看下 rpc 中怎么解决的
是的,我在每个 rpc 启动的 main 中退出了 grpc 拦截器 https://www.yuque.com/tal-tec…,那让咱们看看 grpc 拦截器外面做了什么
而后我代码外面应用 github/pkg/errors 这个包去处理错误的, 这个包还是很好用的
所以呢:
咱们在 grpc
中打印日志 logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】%+v",err)
;
在api
中打印日志 logx.WithContext(r.Context()).Error("【GATEWAY-SRV-ERR】: %+v",err)
go-zero
中打印日志,应用 logx.WithContext
会把 trace-id 带入,这样一个申请下来,比方
user-api --> user-srv --> message-srv
那如果 messsage-srv
出错,他们三个是同一个 trace-id
,是不是就能够在 elk 通过输出这个 trace-id 一次性搜寻进去这条申请报错堆栈信息呢?当然你也能够接入 jaeger、zipkin、skywalking
等,这个我临时还没接入。
框架地址
https://github.com/tal-tech/go-zero
欢送应用 go-zero 并 star 反对咱们!????
go-zero 系列文章见『微服务实际』公众号