关于golang:go-链路追踪之-jaeger

41次阅读

共计 8279 个字符,预计需要花费 21 分钟才能阅读完成。

docker 下进装置 jaeger

docker run -d -p 9411:9411 openzipkin/zipkin

docker run -d -p 5775:5775/udp -p 16686:16686 -p 14250:14250 -p 14268:14268 jaegertracing/all-in-one:latest

同一个过程中进行追踪

package main

import (
    "context"
    "fmt"
    "github.com/opentracing/opentracing-go"
    "github.com/uber/jaeger-client-go"
    "github.com/uber/jaeger-client-go/config"
    "io"
    "time"
)

func main() {tracer, closer := initJaeger("jaeger-demo") // 初始化
    defer closer.Close()
    opentracing.SetGlobalTracer(tracer) //StartspanFromContext 创立新 span 时会用到

    span := tracer.StartSpan("span_root") // 开始监控

    //ContextWithSpan 来创立一个新的 ctx,将 span 的信息与 context 关联,传到 foo3 中时,须要创立一个子 span,父 span 是 ctx 中的 span。// 用 StartSpanFromContext 来模仿从 context 中启动一个子 span,然而 StartSpanFromContext 或者 SpanFromContext 只能在同一个服务内应用,//grpc 中 client 的 context 和 server 的 context 并不是同一个 context,无奈应用这两个函数。ctx := opentracing.ContextWithSpan(context.Background(), span)
    r1 := foo3("Hello foo3", ctx)
    r2 := foo4("Hello foo4", ctx)
    fmt.Println(r1, r2)
    span.Finish() // 完结监控}

func foo3(req string, ctx context.Context) (reply string) {
    //1. 创立子 span
    span, _ := opentracing.StartSpanFromContext(ctx, "span_foo3")
    defer func() {
        //4. 接口调用完,在 tag 中设置 request 和 reply
        span.SetTag("request", req)
        span.SetTag("reply", reply)
        span.Finish()}()

    println(req)
    //2. 模仿解决耗时
    time.Sleep(time.Second / 2)
    //3. 返回 reply
    reply = "foo3Reply"
    return
}

// 跟 foo3 一样逻辑
func foo4(req string, ctx context.Context) (reply string) {span, _ := opentracing.StartSpanFromContext(ctx, "span_foo4")
    defer func() {span.SetTag("request", req)
        span.SetTag("reply", reply)
        span.Finish()}()

    println(req)
    time.Sleep(time.Second / 2)
    reply = "foo4Reply"
    return
}

// 初始化 jaeger tracer 的 initJaeger 办法
func initJaeger(serviceName string) (opentracing.Tracer, io.Closer) {
    cfg := &config.Configuration{
        Sampler: &config.SamplerConfig{
            Type:  "const",
            Param: 1, // 设置采样率 1
        },
        // reporter 中配置 jaeger Agent 的 ip 与端口,以便将 tracer 的信息公布到 agent 中。// LocalAgentHostPort 参数为 127.0.0.1:6381,6381 接口是承受压缩格局的 thrift 协定数据。Reporter: &config.ReporterConfig{
            LogSpans:           true,
            LocalAgentHostPort: "127.0.0.1:6831",
        },
        ServiceName: serviceName,
    }
    tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger))
    if err != nil {panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
    }
    return tracer, closer
}

跨过程追踪:
Client:

package main

import (
    "fmt"
    "github.com/opentracing/opentracing-go"
    "github.com/uber/jaeger-client-go"
    "github.com/uber/jaeger-client-go/config"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/grpclog"
    "google.golang.org/grpc/metadata"
    "io"
    pb "micro/proto/hello"
    "time"
)

const (
    // Address gRPC 服务地址
    Address = "127.0.0.1:50052"
)

//grpc 提供了拦截器,咱们能够在 dial 函数里设置拦截器,这样每次申请都会通过拦截器,咱们不须要在每个接口中去编写反复的代码。func main() {
    //init jaeger
    jaegerAgentHost := "127.0.0.1:6831"
    tracer, closer, err := initJaeger("client", jaegerAgentHost)
    if err != nil {fmt.Println(err)
    }
    defer closer.Close()
    //dial
    conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(insecure.NewCredentials()), clientDialOption(tracer))
    if err != nil {fmt.Printf("dial fail, %+v\n\n", err)
    }

    //// 连贯
    ////conn, err := grpc.Dial(Address, grpc.WithInsecure())
    //conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
    //if err != nil {//    grpclog.Fatalln(err)
    //}
    //defer conn.Close()

    // 初始化客户端
    client := pb.NewHelloClient(conn)
    ctx, cancel := context.WithTimeout(context.TODO(), time.Duration(10)*time.Second)
    defer cancel()
    // 调用办法
    res, err := client.SayHello(ctx, &pb.HelloRequest{Name: "gRPC1212"})

    if err != nil {grpclog.Fatalln(err)
    }

    fmt.Println(res.Message)
}

func clientDialOption(tracer opentracing.Tracer) grpc.DialOption {return grpc.WithUnaryInterceptor(jaegerGrpcClientInterceptor)
}

type TextMapWriter struct {metadata.MD}

// Set 重写 TextMapWriter 的 Set 办法,咱们须要将 carrier 中的数据写入到 metadata 中,这样 grpc 才会携带。func (t TextMapWriter) Set(key, val string) {//key = strings.ToLower(key)
    t.MD[key] = append(t.MD[key], val)
}

//span finish 之前利用 SetTag 增加一些额定的信息,例如 request 和 reply,// 以及 error 信息,然而这些信息是不会在 client 和 server 中传递的,咱们能够在 UI 中每个 span 中显示出他们的 tag。func jaegerGrpcClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
    var parentContext opentracing.SpanContext
    // 先从 context 中获取原始的 span
    parentSpan := opentracing.SpanFromContext(ctx)
    if parentSpan != nil {parentContext = parentSpan.Context()
    }
    tracer := opentracing.GlobalTracer()
    span := tracer.StartSpan(method, opentracing.ChildOf(parentContext))
    defer span.Finish()
    // 从 context 中获取 metadata。md.(type) == map[string][]string
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {//md = metadata.New(nil)
        m := map[string]string{
            "x-request-id":      "X-Request-Id",
            "x-b3-traceid":      "X-B3-Traceid",
            "x-b3-spanid":       "X-B3-Spanid",
            "x-b3-parentspanid": "X-B3-Parentspanid",
            "x-b3-sampled":      "X-B3-Sampled",
            "x-b3-flags":        "X-B3-Flags",
            "x-ot-span-context": "X-Ot-Span-Context",
        }
        md = metadata.New(m)
    } else {
        // 如果对 metadata 进行批改,那么须要用拷贝的正本进行批改。(FromIncomingContext 的正文)md = md.Copy()}
    // 定义一个 carrier,上面的 Inject 注入数据须要用到。carrier.(type) == map[string]string
    //carrier := opentracing.TextMapCarrier{}
    carrier := TextMapWriter{md}
    // 将 span 的 context 信息注入到 carrier 中
    e := tracer.Inject(span.Context(), opentracing.TextMap, carrier)
    if e != nil {fmt.Println("tracer Inject err,", e)
    }
    // 创立一个新的 context,把 metadata 附带上
    ctx = metadata.NewOutgoingContext(ctx, md)

    return invoker(ctx, method, req, reply, cc, opts...)
}

func initJaeger(serviceName string, jaegerAgentHost string) (tracer opentracing.Tracer, closer io.Closer, err error) {
    cfg := &config.Configuration{
        Sampler: &config.SamplerConfig{
            Type:  "const",
            Param: 1,
        },
        Reporter: &config.ReporterConfig{
            LogSpans:           true,
            LocalAgentHostPort: jaegerAgentHost,
        },
        ServiceName: serviceName,
    }
    tracer, closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
    opentracing.SetGlobalTracer(tracer)
    return tracer, closer, err
}

Server 端

package main

import (
    "context"
    "fmt"
    "github.com/opentracing/opentracing-go"
    "github.com/uber/jaeger-client-go"
    "github.com/uber/jaeger-client-go/config"
    "google.golang.org/grpc"
    "google.golang.org/grpc/grpclog"
    "google.golang.org/grpc/metadata"
    "io"
    pb "micro/proto/hello" // 引入 proto 包
    "net"
)

const (
    // Address gRPC 服务地址
    Address = "127.0.0.1:50052"
)

// 定义 helloService 并实现约定的接口
type helloService struct{}

// HelloService Hello 服务
var HelloService = helloService{}

// SayHello 实现 Hello 服务接口
func (h helloService) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {resp := new(pb.HelloResponse)
    resp.Message = fmt.Sprintf("Hello %s.", in.Name)
    return resp, nil
}

// 咱们应用 Extract 函数将 carrier 从 metadata 中提取进去,// 这样 client 端与 server 端就能建设 span 信息的关联。咱们在 server 端同样只是批改拦截器,// 在 grpc.NewServer 时将咱们的拦截器传进去。func main() {
    jaegerAgentHost := "127.0.0.1:6831"
    tracer, closer, err := initJaeger("client", jaegerAgentHost)
    if err != nil {fmt.Println(err)
    }
    defer closer.Close()
    var serviceOpts []grpc.ServerOption

    listen, err := net.Listen("tcp", Address)
    if err != nil {grpclog.Fatalf("Failed to listen: %v", err)
    }

    if tracer != nil {serviceOpts = append(serviceOpts, serverOption(tracer))
    }

    // 实例化 grpc Server
    s := grpc.NewServer(serviceOpts...)

    // 注册 HelloService
    pb.RegisterHelloServer(s, HelloService)

    grpclog.Println("Listen on" + Address)
    s.Serve(listen)
}

func serverOption(tracer opentracing.Tracer) grpc.ServerOption {return grpc.UnaryInterceptor(jaegerGrpcServerInterceptor)
}

type TextMapReader struct {metadata.MD}

// ForeachKey 读取 metadata 中的 span 信息
func (t TextMapReader) ForeachKey(handler func(key, val string) error) error { // 不能是指针
    for key, val := range t.MD {
        for _, v := range val {if err := handler(key, v); err != nil {return err}
        }
    }
    return nil
}

func jaegerGrpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {// 从 context 中获取 metadata。md.(type) == map[string][]string
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {md = metadata.New(nil)
    } else {
        // 如果对 metadata 进行批改,那么须要用拷贝的正本进行批改。(FromIncomingContext 的正文)md = md.Copy()}
    fmt.Println(md)
    carrier := TextMapReader{md}
    tracer := opentracing.GlobalTracer()
    spanContext, e := tracer.Extract(opentracing.TextMap, carrier)
    if e != nil {fmt.Println("Extract err:", e)
    }

    span := tracer.StartSpan(info.FullMethod, opentracing.ChildOf(spanContext))
    defer span.Finish()
    ctx = opentracing.ContextWithSpan(ctx, span)

    return handler(ctx, req)
}

func initJaeger(serviceName string, jaegerAgentHost string) (tracer opentracing.Tracer, closer io.Closer, err error) {
    cfg := &config.Configuration{
        Sampler: &config.SamplerConfig{
            Type:  "const",
            Param: 1,
        },
        Reporter: &config.ReporterConfig{
            LogSpans:           true,
            LocalAgentHostPort: jaegerAgentHost,
        },
        ServiceName: serviceName,
    }
    tracer, closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
    opentracing.SetGlobalTracer(tracer)
    return tracer, closer, err
}

因为 grpc 调用和服务端都申明了 UnaryInterceptor 和 StreamInterceptor 两回调函数,因而只须要重写这两个函数,在函数中调用 opentracing 的接口进行链路追踪,并初始化客户端或者服务端时候注册进去就能够。

相应的函数曾经有现成的包 grpc-opentracing



参考文章:
基于 jaeger 和 grpc 实现的 rpc 调用链跟踪模块
golang 链路追踪之 jaeger(http)

https://github.com/grpc-ecosy…

正文完
 0