0、索引
go-zero docker-compose 搭建课件服务(九):http 对立返回和集成日志服务
0.1 源码地址
https://github.com/liuyuede123/go-zero-courseware
1、http 对立返回
个别返回中会有 code
,message
,data
。当申请胜利的时候code
返回 0 或者 200,message
返回 success,data
为要获取的数据;当申请失败的时候 code
返回自定义的错误码,message
返回展现给前端的错误信息,data
为空。
咱们将封装一个谬误返回的函数,利用到 api handler 的返回
在 user 服务中创立了 common 文件夹,外面存一些专用的办法,创立 response/response.go
package response
import (
"go-zero-courseware/user/common/xerr"
"net/http"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/rest/httpx"
"google.golang.org/grpc/status"
)
type Response struct {
Code uint32 `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data"`}
//http 返回
func HttpResult(r *http.Request, w http.ResponseWriter, resp interface{}, err error) {
if err == nil {
// 胜利返回
r := &Response{
Code: 0,
Message: "success",
Data: resp,
}
httpx.WriteJson(w, http.StatusOK, r)
} else {
// 谬误返回
errcode := uint32(500)
errmsg := "服务器谬误"
causeErr := errors.Cause(err) // err 类型
if e, ok := causeErr.(*xerr.CodeError); ok { // 自定义谬误类型
// 自定义 CodeError
errcode = e.GetErrCode()
errmsg = e.GetErrMsg()} else {if gstatus, ok := status.FromError(causeErr); ok { // grpc err 谬误
grpcCode := uint32(gstatus.Code())
errcode = grpcCode
errmsg = gstatus.Message()}
}
logx.WithContext(r.Context()).Errorf("【API-ERR】: %+v", err)
httpx.WriteJson(w, http.StatusBadRequest, &Response{
Code: errcode,
Message: errmsg,
Data: nil,
})
}
}
创立 xerr/errors.go 文件,定义 CodeError 构造体
package xerr
import ("fmt")
/**
罕用通用固定谬误
*/
type CodeError struct {
errCode uint32
errMsg string
}
// 返回给前端的错误码
func (e *CodeError) GetErrCode() uint32 {return e.errCode}
// 返回给前端显示端错误信息
func (e *CodeError) GetErrMsg() string {return e.errMsg}
func (e *CodeError) Error() string {return fmt.Sprintf("ErrCode:%d,ErrMsg:%s", e.errCode, e.errMsg)
}
func NewErrCodeMsg(errCode uint32, errMsg string) *CodeError {return &CodeError{errCode: errCode, errMsg: errMsg}
}
因为 api 个别调用的 rpc 的申请,获取到的谬误无奈展现给前端应用,咱们会应用自定义的谬误类型。当让 rpc 中的谬误也可能是前端间接能够展现的谬误,或者是数据库的某个异样抛出的谬误,如果想辨别这些谬误,能够本人定义业务端 code 和 message 做下辨别就行。这里咱们对立 api 服务中解决。
当 api 或者 rpc 中有一些未知谬误抛出的时候咱们须要写入到日志中,包含具体的错误信息和堆栈信息。这些后续放到日志服务 ELK 中能够不便查看。
批改 userinfohandler.go、userloginhandler.go、userregisterhandler.go 中的返回
...
response.HttpResult(r, w, resp, err)
批改 userinfologic.go
...
func (l *UserInfoLogic) UserInfo(req *types.UserInfoRequest) (resp *types.UserInfoResponse, err error) {
info, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &userclient.UserInfoRequest{Id: req.Id,})
if err != nil {
// 自定义的谬误返回
return nil, xerr.NewErrCodeMsg(500, "用户查问失败")
}
return &types.UserInfoResponse{
Id: info.Id,
Username: info.Username,
LoginName: info.LoginName,
Sex: info.Sex,
}, nil
}
批改 userloginlogic.go
...
func (l *UserLoginLogic) UserLogin(req *types.LoginRequest) (resp *types.LoginResponse, err error) {
login, err := l.svcCtx.UserRpc.Login(l.ctx, &userclient.LoginRequest{
LoginName: req.LoginName,
Password: req.Password,
})
if err != nil {return nil, xerr.NewErrCodeMsg(500, "用户登录失败")
}
now := time.Now().Unix()
login.Token, err = l.getJwtToken(l.svcCtx.Config.Auth.AccessSecret, now, l.svcCtx.Config.Auth.AccessExpire, int64(login.Id))
if err != nil {
// 返回错误信息,并打印堆栈信息到日志
return nil, errors.Wrapf(xerr.NewErrCodeMsg(5000, "token 生成失败"), "loginName: %s,err:%v", req, err)
}
return &types.LoginResponse{
Id: login.Id,
Token: login.Token,
}, nil
}
...
批改 userregisterlogic.go
...
func (l *UserRegisterLogic) UserRegister(req *types.RegisterRequest) (resp *types.RegisterResponse, err error) {
_, err = l.svcCtx.UserRpc.Register(l.ctx, &userclient.RegisterRequest{
LoginName: req.LoginName,
Username: req.Username,
Password: req.Password,
Sex: req.Sex,
})
if err != nil {
// 自定义的谬误返回
return nil, xerr.NewErrCodeMsg(5000, "注册用户失败")
}
return &types.RegisterResponse{}, nil}
对于 errors.Wrapf
第一个参数是错误信息,第二个是格式化之后的错误信息字符串,args 是 fromat 中的动静参数。最终还是返回咱们传入的 error,然而会把堆栈信息也打印进去。这个为前面的日志服务做铺垫
func Wrapf(err error, format string, args ...interface{}) error {
if err == nil {return nil}
err = &withMessage{
cause: err,
msg: fmt.Sprintf(format, args...),
}
return &withStack{
err,
callers(),}
}
对于鉴权
对于鉴权,如果鉴权失败,之前是间接返回 401 状态码,然而咱们想同样的返回错误信息和 message
此时就须要自定义一个鉴权失败的回调函数
咱们在 response.go 中减少一个鉴权失败的回调函数
...
func JwtUnauthorizedResult(w http.ResponseWriter, r *http.Request, err error) {httpx.WriteJson(w, http.StatusUnauthorized, &Response{401, "鉴权失败", nil})
}
而后在 api 入口程序 user.go 中批改代码如下
...
func main() {flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
// 此处退出鉴权失败的回调
server := rest.MustNewServer(c.RestConf, rest.WithUnauthorizedCallback(response.JwtUnauthorizedResult))
defer server.Stop()
ctx := svc.NewServiceContext(c)
handler.RegisterHandlers(server, ctx)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start()}
而后咱们再看下 user 的 rpc 服务
这里咱们会引入一个拦截器。什么是拦截器?
定义:UnaryServerInterceptor 提供了一个钩子来拦挡服务器上一元 RPC 的执行。信息蕴含拦截器能够操作的这个 RPC 的所有信息。处理程序是包装器服务办法实现。拦截器负责调用处理程序实现 RPC。
其实就是拦挡 handler 做一些返回前和返回后的解决
咱们须要在 common 中新增一个拦截器办法,新建文件 rpcserver/rpcserver.go
package rpcserver
import (
"context"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"go-zero-courseware/user/common/xerr"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func LoggerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {resp, err = handler(ctx, req)
if err != nil {causeErr := errors.Cause(err) // err 类型
if e, ok := causeErr.(*xerr.CodeError); ok { // 自定义谬误类型
logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】%+v", err)
// 转成 grpc err
err = status.Error(codes.Code(e.GetErrCode()), e.GetErrMsg())
} else {logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】%+v", err)
}
}
return resp, err
}
而后在入口文件 user.go 中增加一个拦截器
...
s.AddUnaryInterceptors(rpcserver.LoggerInterceptor)
...
课件服务和下面相似,这里就不一一增加批改了
2、集成日志服务
咱们须要搭建一个 ELK 体系的服务,流程图如下:
将会用到以下服务:
服务名 | 端口号 | |
---|---|---|
elasticsearch | 9200 | |
kibana | 5601 | |
go-stash | ||
filebeat | ||
zookeeper | 2181 | |
kafka | 9092 |
docker-compose 如下:
user 服务中咱们引入了日志地址,到咱们的宿主机上。之所以这样做,是因为在 mac 零碎上 docker 的日志文件门路和 linux 上的不统一。找了半天也没在 mac 上找到容器的日志。所以用户服务中的日志会写到文件中而后同步到宿主机的 data/log 目录下。
还有就是 filebeat 日志中,咱们会从宿主机上的日志同步到 filebeat 指定目录。而后 filebeat 会同步到 kafka
version: '3.5'
# 网络配置
networks:
backend:
driver: bridge
# 服务容器配置
services:
etcd: # 自定义容器名称
build:
context: etcd # 指定构建应用的 Dockerfile 文件
environment:
- TZ=Asia/Shanghai
- ALLOW_NONE_AUTHENTICATION=yes
- ETCD_ADVERTISE_CLIENT_URLS=http://etcd:2379
ports: # 设置端口映射
- "2379:2379"
networks:
- backend
restart: always
etcd-manage:
build:
context: etcd-manage
environment:
- TZ=Asia/Shanghai
ports:
- "7000:8080" # 设置容器 8080 端口映射指定宿主机端口,用于宿主机拜访可视化 web
depends_on: # 依赖容器
- etcd # 在 etcd 服务容器启动后启动
networks:
- backend
restart: always
courseware-rpc: # 自定义容器名称
build:
context: courseware # 指定构建应用的 Dockerfile 文件
dockerfile: rpc/Dockerfile
environment: # 设置环境变量
- TZ=Asia/Shanghai
privileged: true
ports: # 设置端口映射
- "9400:9400" # 课件服务 rpc 端口
stdin_open: true # 关上规范输出,能够承受内部输出
tty: true
networks:
- backend
restart: always # 指定容器退出后的重启策略为始终重启
courseware-api: # 自定义容器名称
build:
context: courseware # 指定构建应用的 Dockerfile 文件
dockerfile: api/Dockerfile
environment: # 设置环境变量
- TZ=Asia/Shanghai
privileged: true
ports: # 设置端口映射
- "8400:8400" # 课件服务 api 端口
stdin_open: true # 关上规范输出,能够承受内部输出
tty: true
networks:
- backend
restart: always # 指定容器退出后的重启策略为始终重启
user-rpc: # 自定义容器名称
build:
context: user # 指定构建应用的 Dockerfile 文件
dockerfile: rpc/Dockerfile
environment: # 设置环境变量
- TZ=Asia/Shanghai
privileged: true
volumes:
- ./data/log/user-rpc:/var/log/go-zero/user-rpc # 日志的映射地址
ports: # 设置端口映射
- "9300:9300" # 课件服务 rpc 端口
stdin_open: true # 关上规范输出,能够承受内部输出
tty: true
networks:
- backend
restart: always # 指定容器退出后的重启策略为始终重启
user-api: # 自定义容器名称
build:
context: user # 指定构建应用的 Dockerfile 文件
dockerfile: api/Dockerfile
environment: # 设置环境变量
- TZ=Asia/Shanghai
privileged: true
volumes:
- ./data/log/user-api:/var/log/go-zero/user-api
ports: # 设置端口映射
- "8300:8300" # 课件服务 api 端口
stdin_open: true # 关上规范输出,能够承受内部输出
tty: true
networks:
- backend
restart: always # 指定容器退出后的重启策略为始终重启
elasticsearch:
build:
context: ./elasticsearch
environment:
- TZ=Asia/Shanghai
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
privileged: true
ports:
- "9200:9200"
networks:
- backend
restart: always
prometheus:
build:
context: ./prometheus
environment:
- TZ=Asia/Shanghai
privileged: true
volumes:
- ./prometheus/prometheus.yml:/opt/bitnami/prometheus/conf/prometheus.yml # 将 prometheus 配置文件挂载到容器里
- ./prometheus/target.json:/opt/bitnami/prometheus/conf/targets.json # 将 prometheus 配置文件挂载到容器里
ports:
- "9090:9090" # 设置容器 9090 端口映射指定宿主机端口,用于宿主机拜访可视化 web
networks:
- backend
restart: always
grafana:
build:
context: ./grafana
environment:
- TZ=Asia/Shanghai
privileged: true
ports:
- "3000:3000"
networks:
- backend
restart: always
jaeger:
build:
context: ./jaeger
environment:
- TZ=Asia/Shanghai
- SPAN_STORAGE_TYPE=elasticsearch
- ES_SERVER_URLS=http://elasticsearch:9200
- LOG_LEVEL=debug
privileged: true
ports:
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "4317:4317"
- "4318:4318"
- "14250:14250"
- "14268:14268"
- "14269:14269"
- "9411:9411"
networks:
- backend
restart: always
kibana:
build:
context: ./kibana
environment:
- elasticsearch.hosts=http://elasticsearch:9200
- TZ=Asia/Shanghai
privileged: true
ports:
- "5601:5601"
networks:
- backend
restart: always
depends_on:
- elasticsearch
go-stash:
build:
context: ./go-stash
environment:
- TZ=Asia/Shanghai
privileged: true
volumes:
- ./go-stash/go-stash.yml:/app/etc/config.yaml
networks:
- backend
restart: always
depends_on:
- elasticsearch
- kafka
filebeat:
build:
context: ./filebeat
environment:
- TZ=Asia/Shanghai
entrypoint: "filebeat -e -strict.perms=false"
privileged: true
volumes:
- ./filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml
- ./data/log:/var/lib/docker/containers # 宿主机上的日志同步到 filebeat 指定目录
networks:
- backend
restart: always
depends_on:
- kafka
zookeeper:
build:
context: ./zookeeper
environment:
- TZ=Asia/Shanghai
privileged: true
networks:
- backend
ports:
- "2181:2181"
restart: always
kafka:
build:
context: ./kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
- TZ=Asia/Shanghai
- ALLOW_PLAINTEXT_LISTENER=yes
restart: always
privileged: true
networks:
- backend
depends_on:
- zookeeper
(我的项目根目录下自行创立对应的 Dokcerfile)
filebeat 须要引入配置文件 filebeat.yml 如下:
其中 filebeat 须要从宿主机同步数据,就是下面用户服务中生成的日志文件,会同步到 filebeat 的对应文件中
拉取过去的文件会输入到 kafka 指定的 topic 中,咱们这里定义的是courseware-log
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/lib/docker/containers/*/*.log # 此为宿主机同步过去的日志文件
filebeat.config:
modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
processors:
- add_cloud_metadata: ~
- add_docker_metadata: ~
output.kafka:
enabled: true
hosts: ["kafka:9092"]
#要提前创立 topic
topic: "courseware-log"
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
required_acks: 1
用户服务中也须要批改 etc 下的 user.yaml 配置,减少日志的配置,输入到 data/log 目录下
Log:
Mode: file
Path: /var/log/go-zero/user-api
Level: error
Log:
Mode: file
Path: /var/log/go-zero/user-rpc
Level: error
咱们启动下相干服务,申请下 user-api 的接口
而后回到我的项目中查看 data/log 中是否生成相干日志
日志失常输入,再到 filebeat 服务中,查看文件是否同步下来:
# 进入容器
docker exec -it 231bf79f3d5e21cea153bd94bf29693e67360113256e0e3c67a693e727d0b660 /bin/sh
# 查看目录
cd /var/lib/docker/containers
ls
user-api user-rpc
而后咱们再到 kafka 的容器中
# 进入到容器
docker exec -it cb764aeb86e8296a805e47c85f65ac5334c3ed15630fe36e7a39a81ca1bad67f /bin/sh
# 到 bin 目录下
cd /opt/bitnami/kafka/bin
# 能够看到这些调试脚本
$ ls
connect-distributed.sh kafka-cluster.sh kafka-consumer-perf-test.sh kafka-get-offsets.sh kafka-producer-perf-test.sh kafka-server-stop.sh kafka-verifiable-consumer.sh zookeeper-server-start.sh
connect-mirror-maker.sh kafka-configs.sh kafka-delegation-tokens.sh kafka-leader-election.sh kafka-reassign-partitions.sh kafka-storage.sh kafka-verifiable-producer.sh zookeeper-server-stop.sh
connect-standalone.sh kafka-console-consumer.sh kafka-delete-records.sh kafka-log-dirs.sh kafka-replica-verification.sh kafka-streams-application-reset.sh trogdor.sh zookeeper-shell.sh
kafka-acls.sh kafka-console-producer.sh kafka-dump-log.sh kafka-metadata-shell.sh kafka-run-class.sh kafka-topics.sh windows
kafka-broker-api-versions.sh kafka-consumer-groups.sh kafka-features.sh kafka-mirror-maker.sh kafka-server-start.sh kafka-transactions.sh zookeeper-security-migration.sh
$
先看下有没有创立 courseware-log
的 topic,如果没有就创立一个
$ ./kafka-topics.sh --bootstrap-server kafka:9092 --list
__consumer_offsets
courseware-log
# 没有就创立,创立的命令。最新版的 kafka 不须要指定 zookeeper
./kafka-topics.sh --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic courseware-log
# 建错了删除用这个
./kafka-topics.sh --delete --bootstrap-server kafka:9092 --topic courseware-log
# 公布音讯用这个
./kafka-console-producer.sh --broker-list kafka:9092 --topic courseware-log
# 生产用这个
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic courseware-log --from-beginning
咱们执行生产脚本看下日志会不会过去。
当初还没有日志进来,咱们申请一下接口让接口报错,能够看到日志开始生产了
到这里日志曾经流转到 kafka 中了。
上面是 go-stash 从 kafka 拉取日志解决并保留到 elasticsearch 的流程:
go-stash 须要引入配置文件 go-stash.yml,内容如下:
参数可参考 github go-stash
Clusters:
- Input:
Kafka:
Name: go-stash
Brokers:
- "kafka:9092"
Topics:
- courseware-log
Group: pro
Consumers: 16
Filters:
- Action: drop
Conditions:
- Key: k8s_container_name
Value: "-rpc"
Type: contains
- Key: level
Value: info
Type: match
Op: and
- Action: remove_field
Fields:
# - message
- _source
- _type
- _score
- _id
- "@version"
- topic
- index
- beat
- docker_container
- offset
- prospector
- source
- stream
- "@metadata"
- Action: transfer
Field: message
Target: data
Output:
ElasticSearch:
Hosts:
- "http://elasticsearch:9200"
Index: "courseware-{{yyyy-MM-dd}}"
问题:
然而这里 mac 上又遇到一个问题就是对接 go-stash 时 mac 上的 docker 中会报错
2022/09/08 21:51:10 {"@timestamp":"2022-09-08T21:51:10.346+08:00","level":"error","content":"cpu_linux.go:29 open cpuacct.usage_percpu: no such file or directory"}
具体能够看这里 https://github.com/zeromicro/… 还没有找到好的解决办法。
后续:
之后又重启了下 docker 发现问题解决了,同步到 es 失效了。
接下来咱们申请下用户服务的接口,到 es 查看,索引曾经创立,错误信息曾经写进去了
而后咱们拜访 http://127.0.0.1:5601/ 进到 kibana 后盾,点击 Discover,并创立索引
搜寻到课件服务的索引后点击下一步
抉择 @timestamp,点击创立
从新点击 Discover 之后能够看到课件的日志服务创立实现