共计 8279 个字符,预计需要花费 21 分钟才能阅读完成。
docker 下进装置 jaeger
docker run -d -p 9411:9411 openzipkin/zipkin
docker run -d -p 5775:5775/udp -p 16686:16686 -p 14250:14250 -p 14268:14268 jaegertracing/all-in-one:latest
同一个过程中进行追踪
package main
import (
"context"
"fmt"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"io"
"time"
)
func main() {tracer, closer := initJaeger("jaeger-demo") // 初始化
defer closer.Close()
opentracing.SetGlobalTracer(tracer) //StartspanFromContext 创立新 span 时会用到
span := tracer.StartSpan("span_root") // 开始监控
//ContextWithSpan 来创立一个新的 ctx,将 span 的信息与 context 关联,传到 foo3 中时,须要创立一个子 span,父 span 是 ctx 中的 span。// 用 StartSpanFromContext 来模仿从 context 中启动一个子 span,然而 StartSpanFromContext 或者 SpanFromContext 只能在同一个服务内应用,//grpc 中 client 的 context 和 server 的 context 并不是同一个 context,无奈应用这两个函数。ctx := opentracing.ContextWithSpan(context.Background(), span)
r1 := foo3("Hello foo3", ctx)
r2 := foo4("Hello foo4", ctx)
fmt.Println(r1, r2)
span.Finish() // 完结监控}
func foo3(req string, ctx context.Context) (reply string) {
//1. 创立子 span
span, _ := opentracing.StartSpanFromContext(ctx, "span_foo3")
defer func() {
//4. 接口调用完,在 tag 中设置 request 和 reply
span.SetTag("request", req)
span.SetTag("reply", reply)
span.Finish()}()
println(req)
//2. 模仿解决耗时
time.Sleep(time.Second / 2)
//3. 返回 reply
reply = "foo3Reply"
return
}
// 跟 foo3 一样逻辑
func foo4(req string, ctx context.Context) (reply string) {span, _ := opentracing.StartSpanFromContext(ctx, "span_foo4")
defer func() {span.SetTag("request", req)
span.SetTag("reply", reply)
span.Finish()}()
println(req)
time.Sleep(time.Second / 2)
reply = "foo4Reply"
return
}
// 初始化 jaeger tracer 的 initJaeger 办法
func initJaeger(serviceName string) (opentracing.Tracer, io.Closer) {
cfg := &config.Configuration{
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1, // 设置采样率 1
},
// reporter 中配置 jaeger Agent 的 ip 与端口,以便将 tracer 的信息公布到 agent 中。// LocalAgentHostPort 参数为 127.0.0.1:6381,6381 接口是承受压缩格局的 thrift 协定数据。Reporter: &config.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: "127.0.0.1:6831",
},
ServiceName: serviceName,
}
tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger))
if err != nil {panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
return tracer, closer
}
跨过程追踪:
Client:
package main
import (
"fmt"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"io"
pb "micro/proto/hello"
"time"
)
const (
// Address gRPC 服务地址
Address = "127.0.0.1:50052"
)
//grpc 提供了拦截器,咱们能够在 dial 函数里设置拦截器,这样每次申请都会通过拦截器,咱们不须要在每个接口中去编写反复的代码。func main() {
//init jaeger
jaegerAgentHost := "127.0.0.1:6831"
tracer, closer, err := initJaeger("client", jaegerAgentHost)
if err != nil {fmt.Println(err)
}
defer closer.Close()
//dial
conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(insecure.NewCredentials()), clientDialOption(tracer))
if err != nil {fmt.Printf("dial fail, %+v\n\n", err)
}
//// 连贯
////conn, err := grpc.Dial(Address, grpc.WithInsecure())
//conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
//if err != nil {// grpclog.Fatalln(err)
//}
//defer conn.Close()
// 初始化客户端
client := pb.NewHelloClient(conn)
ctx, cancel := context.WithTimeout(context.TODO(), time.Duration(10)*time.Second)
defer cancel()
// 调用办法
res, err := client.SayHello(ctx, &pb.HelloRequest{Name: "gRPC1212"})
if err != nil {grpclog.Fatalln(err)
}
fmt.Println(res.Message)
}
func clientDialOption(tracer opentracing.Tracer) grpc.DialOption {return grpc.WithUnaryInterceptor(jaegerGrpcClientInterceptor)
}
type TextMapWriter struct {metadata.MD}
// Set 重写 TextMapWriter 的 Set 办法,咱们须要将 carrier 中的数据写入到 metadata 中,这样 grpc 才会携带。func (t TextMapWriter) Set(key, val string) {//key = strings.ToLower(key)
t.MD[key] = append(t.MD[key], val)
}
//span finish 之前利用 SetTag 增加一些额定的信息,例如 request 和 reply,// 以及 error 信息,然而这些信息是不会在 client 和 server 中传递的,咱们能够在 UI 中每个 span 中显示出他们的 tag。func jaegerGrpcClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
var parentContext opentracing.SpanContext
// 先从 context 中获取原始的 span
parentSpan := opentracing.SpanFromContext(ctx)
if parentSpan != nil {parentContext = parentSpan.Context()
}
tracer := opentracing.GlobalTracer()
span := tracer.StartSpan(method, opentracing.ChildOf(parentContext))
defer span.Finish()
// 从 context 中获取 metadata。md.(type) == map[string][]string
md, ok := metadata.FromIncomingContext(ctx)
if !ok {//md = metadata.New(nil)
m := map[string]string{
"x-request-id": "X-Request-Id",
"x-b3-traceid": "X-B3-Traceid",
"x-b3-spanid": "X-B3-Spanid",
"x-b3-parentspanid": "X-B3-Parentspanid",
"x-b3-sampled": "X-B3-Sampled",
"x-b3-flags": "X-B3-Flags",
"x-ot-span-context": "X-Ot-Span-Context",
}
md = metadata.New(m)
} else {
// 如果对 metadata 进行批改,那么须要用拷贝的正本进行批改。(FromIncomingContext 的正文)md = md.Copy()}
// 定义一个 carrier,上面的 Inject 注入数据须要用到。carrier.(type) == map[string]string
//carrier := opentracing.TextMapCarrier{}
carrier := TextMapWriter{md}
// 将 span 的 context 信息注入到 carrier 中
e := tracer.Inject(span.Context(), opentracing.TextMap, carrier)
if e != nil {fmt.Println("tracer Inject err,", e)
}
// 创立一个新的 context,把 metadata 附带上
ctx = metadata.NewOutgoingContext(ctx, md)
return invoker(ctx, method, req, reply, cc, opts...)
}
func initJaeger(serviceName string, jaegerAgentHost string) (tracer opentracing.Tracer, closer io.Closer, err error) {
cfg := &config.Configuration{
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: jaegerAgentHost,
},
ServiceName: serviceName,
}
tracer, closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
opentracing.SetGlobalTracer(tracer)
return tracer, closer, err
}
Server 端
package main
import (
"context"
"fmt"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"io"
pb "micro/proto/hello" // 引入 proto 包
"net"
)
const (
// Address gRPC 服务地址
Address = "127.0.0.1:50052"
)
// 定义 helloService 并实现约定的接口
type helloService struct{}
// HelloService Hello 服务
var HelloService = helloService{}
// SayHello 实现 Hello 服务接口
func (h helloService) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {resp := new(pb.HelloResponse)
resp.Message = fmt.Sprintf("Hello %s.", in.Name)
return resp, nil
}
// 咱们应用 Extract 函数将 carrier 从 metadata 中提取进去,// 这样 client 端与 server 端就能建设 span 信息的关联。咱们在 server 端同样只是批改拦截器,// 在 grpc.NewServer 时将咱们的拦截器传进去。func main() {
jaegerAgentHost := "127.0.0.1:6831"
tracer, closer, err := initJaeger("client", jaegerAgentHost)
if err != nil {fmt.Println(err)
}
defer closer.Close()
var serviceOpts []grpc.ServerOption
listen, err := net.Listen("tcp", Address)
if err != nil {grpclog.Fatalf("Failed to listen: %v", err)
}
if tracer != nil {serviceOpts = append(serviceOpts, serverOption(tracer))
}
// 实例化 grpc Server
s := grpc.NewServer(serviceOpts...)
// 注册 HelloService
pb.RegisterHelloServer(s, HelloService)
grpclog.Println("Listen on" + Address)
s.Serve(listen)
}
func serverOption(tracer opentracing.Tracer) grpc.ServerOption {return grpc.UnaryInterceptor(jaegerGrpcServerInterceptor)
}
type TextMapReader struct {metadata.MD}
// ForeachKey 读取 metadata 中的 span 信息
func (t TextMapReader) ForeachKey(handler func(key, val string) error) error { // 不能是指针
for key, val := range t.MD {
for _, v := range val {if err := handler(key, v); err != nil {return err}
}
}
return nil
}
func jaegerGrpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {// 从 context 中获取 metadata。md.(type) == map[string][]string
md, ok := metadata.FromIncomingContext(ctx)
if !ok {md = metadata.New(nil)
} else {
// 如果对 metadata 进行批改,那么须要用拷贝的正本进行批改。(FromIncomingContext 的正文)md = md.Copy()}
fmt.Println(md)
carrier := TextMapReader{md}
tracer := opentracing.GlobalTracer()
spanContext, e := tracer.Extract(opentracing.TextMap, carrier)
if e != nil {fmt.Println("Extract err:", e)
}
span := tracer.StartSpan(info.FullMethod, opentracing.ChildOf(spanContext))
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return handler(ctx, req)
}
func initJaeger(serviceName string, jaegerAgentHost string) (tracer opentracing.Tracer, closer io.Closer, err error) {
cfg := &config.Configuration{
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: jaegerAgentHost,
},
ServiceName: serviceName,
}
tracer, closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
opentracing.SetGlobalTracer(tracer)
return tracer, closer, err
}
因为 grpc 调用和服务端都申明了 UnaryInterceptor 和 StreamInterceptor 两回调函数,因而只须要重写这两个函数,在函数中调用 opentracing 的接口进行链路追踪,并初始化客户端或者服务端时候注册进去就能够。
相应的函数曾经有现成的包 grpc-opentracing
参考文章:
基于 jaeger 和 grpc 实现的 rpc 调用链跟踪模块
golang 链路追踪之 jaeger(http)
https://github.com/grpc-ecosy…
正文完