docker下进装置jaeger
docker run -d -p 9411:9411 openzipkin/zipkindocker run -d -p 5775:5775/udp -p 16686:16686 -p 14250:14250 -p 14268:14268 jaegertracing/all-in-one:latest
同一个过程中进行追踪
package mainimport ( "context" "fmt" "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/config" "io" "time")func main() { tracer, closer := initJaeger("jaeger-demo") //初始化 defer closer.Close() opentracing.SetGlobalTracer(tracer) //StartspanFromContext创立新span时会用到 span := tracer.StartSpan("span_root") //开始监控 //ContextWithSpan来创立一个新的ctx,将span的信息与context关联,传到foo3中时,须要创立一个子span,父span是ctx中的span。 //用StartSpanFromContext来模仿从context中启动一个子span,然而StartSpanFromContext或者SpanFromContext只能在同一个服务内应用, //grpc中client的context和server的context并不是同一个context,无奈应用这两个函数。 ctx := opentracing.ContextWithSpan(context.Background(), span) r1 := foo3("Hello foo3", ctx) r2 := foo4("Hello foo4", ctx) fmt.Println(r1, r2) span.Finish() //完结监控}func foo3(req string, ctx context.Context) (reply string) { //1.创立子span span, _ := opentracing.StartSpanFromContext(ctx, "span_foo3") defer func() { //4.接口调用完,在tag中设置request和reply span.SetTag("request", req) span.SetTag("reply", reply) span.Finish() }() println(req) //2.模仿解决耗时 time.Sleep(time.Second / 2) //3.返回reply reply = "foo3Reply" return}//跟foo3一样逻辑func foo4(req string, ctx context.Context) (reply string) { span, _ := opentracing.StartSpanFromContext(ctx, "span_foo4") defer func() { span.SetTag("request", req) span.SetTag("reply", reply) span.Finish() }() println(req) time.Sleep(time.Second / 2) reply = "foo4Reply" return}//初始化jaeger tracer的initJaeger办法func initJaeger(serviceName string) (opentracing.Tracer, io.Closer) { cfg := &config.Configuration{ Sampler: &config.SamplerConfig{ Type: "const", Param: 1, //设置采样率 1 }, // reporter中配置jaeger Agent的ip与端口,以便将tracer的信息公布到agent中。 // LocalAgentHostPort参数为127.0.0.1:6381,6381接口是承受压缩格局的thrift协定数据。 Reporter: &config.ReporterConfig{ LogSpans: true, LocalAgentHostPort: "127.0.0.1:6831", }, ServiceName: serviceName, } tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger)) if err != nil { panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) } return tracer, closer}
跨过程追踪:
Client:
package mainimport ( "fmt" "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/config" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "io" pb "micro/proto/hello" "time")const ( // Address gRPC服务地址 Address = "127.0.0.1:50052")//grpc提供了拦截器,咱们能够在dial函数里设置拦截器,这样每次申请都会通过拦截器,咱们不须要在每个接口中去编写反复的代码。func main() { //init jaeger jaegerAgentHost := "127.0.0.1:6831" tracer, closer, err := initJaeger("client", jaegerAgentHost) if err != nil { fmt.Println(err) } defer closer.Close() //dial conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(insecure.NewCredentials()), clientDialOption(tracer)) if err != nil { fmt.Printf("dial fail, %+v\n\n", err) } //// 连贯 ////conn, err := grpc.Dial(Address, grpc.WithInsecure()) //conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(insecure.NewCredentials())) //if err != nil { // grpclog.Fatalln(err) //} //defer conn.Close() // 初始化客户端 client := pb.NewHelloClient(conn) ctx, cancel := context.WithTimeout(context.TODO(), time.Duration(10)*time.Second) defer cancel() // 调用办法 res, err := client.SayHello(ctx, &pb.HelloRequest{Name: "gRPC1212"}) if err != nil { grpclog.Fatalln(err) } fmt.Println(res.Message)}func clientDialOption(tracer opentracing.Tracer) grpc.DialOption { return grpc.WithUnaryInterceptor(jaegerGrpcClientInterceptor)}type TextMapWriter struct { metadata.MD}// Set 重写TextMapWriter的Set办法,咱们须要将carrier中的数据写入到metadata中,这样grpc才会携带。func (t TextMapWriter) Set(key, val string) { //key = strings.ToLower(key) t.MD[key] = append(t.MD[key], val)}//span finish之前利用SetTag增加一些额定的信息,例如request和reply,//以及error信息,然而这些信息是不会在client和server中传递的,咱们能够在UI中每个span中显示出他们的tag。func jaegerGrpcClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) { var parentContext opentracing.SpanContext //先从context中获取原始的span parentSpan := opentracing.SpanFromContext(ctx) if parentSpan != nil { parentContext = parentSpan.Context() } tracer := opentracing.GlobalTracer() span := tracer.StartSpan(method, opentracing.ChildOf(parentContext)) defer span.Finish() //从context中获取metadata。md.(type) == map[string][]string md, ok := metadata.FromIncomingContext(ctx) if !ok { //md = metadata.New(nil) m := map[string]string{ "x-request-id": "X-Request-Id", "x-b3-traceid": "X-B3-Traceid", "x-b3-spanid": "X-B3-Spanid", "x-b3-parentspanid": "X-B3-Parentspanid", "x-b3-sampled": "X-B3-Sampled", "x-b3-flags": "X-B3-Flags", "x-ot-span-context": "X-Ot-Span-Context", } md = metadata.New(m) } else { //如果对metadata进行批改,那么须要用拷贝的正本进行批改。(FromIncomingContext的正文) md = md.Copy() } //定义一个carrier,上面的Inject注入数据须要用到。carrier.(type) == map[string]string //carrier := opentracing.TextMapCarrier{} carrier := TextMapWriter{md} //将span的context信息注入到carrier中 e := tracer.Inject(span.Context(), opentracing.TextMap, carrier) if e != nil { fmt.Println("tracer Inject err,", e) } //创立一个新的context,把metadata附带上 ctx = metadata.NewOutgoingContext(ctx, md) return invoker(ctx, method, req, reply, cc, opts...)}func initJaeger(serviceName string, jaegerAgentHost string) (tracer opentracing.Tracer, closer io.Closer, err error) { cfg := &config.Configuration{ Sampler: &config.SamplerConfig{ Type: "const", Param: 1, }, Reporter: &config.ReporterConfig{ LogSpans: true, LocalAgentHostPort: jaegerAgentHost, }, ServiceName: serviceName, } tracer, closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger)) opentracing.SetGlobalTracer(tracer) return tracer, closer, err}
Server 端
package mainimport ( "context" "fmt" "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/config" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "io" pb "micro/proto/hello" // 引入proto包 "net")const ( // Address gRPC服务地址 Address = "127.0.0.1:50052")// 定义helloService并实现约定的接口type helloService struct{}// HelloService Hello服务var HelloService = helloService{}// SayHello 实现Hello服务接口func (h helloService) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) { resp := new(pb.HelloResponse) resp.Message = fmt.Sprintf("Hello %s.", in.Name) return resp, nil}//咱们应用Extract函数将carrier从metadata中提取进去,//这样client端与server端就能建设span信息的关联。咱们在server端同样只是批改拦截器,//在grpc.NewServer时将咱们的拦截器传进去。func main() { jaegerAgentHost := "127.0.0.1:6831" tracer, closer, err := initJaeger("client", jaegerAgentHost) if err != nil { fmt.Println(err) } defer closer.Close() var serviceOpts []grpc.ServerOption listen, err := net.Listen("tcp", Address) if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } if tracer != nil { serviceOpts = append(serviceOpts, serverOption(tracer)) } // 实例化grpc Server s := grpc.NewServer(serviceOpts...) // 注册HelloService pb.RegisterHelloServer(s, HelloService) grpclog.Println("Listen on " + Address) s.Serve(listen)}func serverOption(tracer opentracing.Tracer) grpc.ServerOption { return grpc.UnaryInterceptor(jaegerGrpcServerInterceptor)}type TextMapReader struct { metadata.MD}// ForeachKey 读取metadata中的span信息func (t TextMapReader) ForeachKey(handler func(key, val string) error) error { //不能是指针 for key, val := range t.MD { for _, v := range val { if err := handler(key, v); err != nil { return err } } } return nil}func jaegerGrpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { //从context中获取metadata。md.(type) == map[string][]string md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.New(nil) } else { //如果对metadata进行批改,那么须要用拷贝的正本进行批改。(FromIncomingContext的正文) md = md.Copy() } fmt.Println(md) carrier := TextMapReader{md} tracer := opentracing.GlobalTracer() spanContext, e := tracer.Extract(opentracing.TextMap, carrier) if e != nil { fmt.Println("Extract err:", e) } span := tracer.StartSpan(info.FullMethod, opentracing.ChildOf(spanContext)) defer span.Finish() ctx = opentracing.ContextWithSpan(ctx, span) return handler(ctx, req)}func initJaeger(serviceName string, jaegerAgentHost string) (tracer opentracing.Tracer, closer io.Closer, err error) { cfg := &config.Configuration{ Sampler: &config.SamplerConfig{ Type: "const", Param: 1, }, Reporter: &config.ReporterConfig{ LogSpans: true, LocalAgentHostPort: jaegerAgentHost, }, ServiceName: serviceName, } tracer, closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger)) opentracing.SetGlobalTracer(tracer) return tracer, closer, err}
因为grpc 调用和服务端都申明了 UnaryInterceptor 和 StreamInterceptor 两回调函数,因而只须要重写这两个函数,在函数中调用 opentracing 的接口进行链路追踪,并初始化客户端或者服务端时候注册进去就能够。
相应的函数曾经有现成的包 grpc-opentracing
参考文章:
基于jaeger和grpc实现的rpc调用链跟踪模块
golang 链路追踪之 jaeger(http)
https://github.com/grpc-ecosy...