依照【云溪数据库 Tracing(一)】介绍的应用 opentracing 要求,本文着重介绍云溪数据库 Tracing 模块中是如何实现 Span,SpanContexts 和 Tracer 的。

Part 1 - Tracing 模块调用关系

1.1 Traincg 模块蕴含的文件列表

Tracer.go :定义了opentracing 中的trace相干接口的实现。

Tracer_span.go :定义了opentracing中的span 相干操作的实现。

Tags.go :定义了 opentracing中对于tags的相干接口。

Shadow.go :不是opentracing中的概念,这里次要实现与zipkin的通信,用于tracing 信息推送到内部的zipkin中。

1.2 各个文件之间的调用关系

在 cluster_settings.go 中会创立 tracer,供全局应用,其余模块中应用这个 Tracer 实现 span 的创立和其余操作,例如设定 span 名称、设定 tag 、减少 log 等操作。

Part 2 - Opentracing
在云溪数据库中的实现
以下是只是列出了局部接口实现,并非全副。

2.1 Span 接口实现:

GetContext 实现:API 用于获取 Span 中的 SpanContext,次要性能是先创立一个 map[string]string 类型的 baggageCopy, 将 span 中的 mu.Baggage 读出写入 baggageCopy,创立新的 spanContext,并且返回。

func (s *span) Context() opentracing.SpanContext {   s.mu.Lock()   defer s.mu.Unlock()   baggageCopy := make(map[string]string, len(s.mu.Baggage))   for k, v := range s.mu.Baggage {      baggageCopy[k] = v   }   sc := &spanContext{      spanMeta: s.spanMeta,      Baggage:  baggageCopy,   }   if s.shadowTr != nil {      sc.shadowTr = s.shadowTr      sc.shadowCtx = s.shadowSpan.Context()   }   if s.isRecording() {      sc.recordingGroup = s.mu.recordingGroup      sc.recordingType = s.mu.recordingType   }   return sc}

Finished 实现:API 用于完结一个 Span 的记录和追踪。

func (s *span) Finish() {   s.FinishWithOptions(opentracing.FinishOptions{})}

SetTag 实现:用于向指定的 Span 增加 Tag 信息。

func (s *span) SetTag(key string, value interface{}) opentracing.Span {   return s.setTagInner(key, value, false /* locked */)}

Log 实现:用于向指定的 Span 增加 Log 信息。

func (s *span) LogKV(alternatingKeyValues ...interface{}) {   fields, err := otlog.InterleavedKVToFields(alternatingKeyValues...)   if err != nil {      s.LogFields(otlog.Error(err), otlog.String("function", "LogKV"))      return   }   s.LogFields(fields...)}

SetBaggageItem 实现:用于向指定的 Span 减少 Baggage 信息,次要是用于跨过程追踪应用。

func (s *span) SetBaggageItem(restrictedKey, value string) opentracing.Span {   s.mu.Lock()   defer s.mu.Unlock()   return s.setBaggageItemLocked(restrictedKey, value)}

BaggageItem 实现:用于获取指定的 Baggage 信息。

func (s *span) BaggageItem(restrictedKey string) string {   s.mu.Lock()   defer s.mu.Unlock()   return s.mu.Baggage[restrictedKey]}

SetOperationName 实现:用于设定 Span 的名称。

func (s *span) SetOperationName(operationName string) opentracing.Span { if s.shadowTr != nil {  s.shadowSpan.SetOperationName(operationName) } s.operation = operationName return s}

Tracer 实现:用于获取 Span 属于哪个 Tracer。

// Tracer is part of the opentracing.Span interface.func (s *span) Tracer() opentracing.Tracer { return s.tracer}

2.2 SpanContext 接口实现:

ForeachBaggageItem 实现:用于遍历 spanContext 中的 baggage 信息。

func (sc *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {   for k, v := range sc.Baggage {      if !handler(k, v) {         break      }   }}

2.3 Tracer 接口实现:

Inject 实现:用于向 carrier 中注入 SpanContext 信息

// Inject is part of the opentracing.Tracer interface.func (t *Tracer) Inject(   osc opentracing.SpanContext, format interface{}, carrier interface{},) error { ……   // We onlysupport the HTTPHeaders/TextMap format.   if format != opentracing.HTTPHeaders && format != opentracing.TextMap {      return opentracing.ErrUnsupportedFormat   }   mapWriter, ok := carrier.(opentracing.TextMapWriter)   if !ok {      return opentracing.ErrInvalidCarrier   }   sc, ok := osc.(*spanContext)   if !ok {      return opentracing.ErrInvalidSpanContext   }   mapWriter.Set(fieldNameTraceID, strconv.FormatUint(sc.TraceID, 16))   mapWriter.Set(fieldNameSpanID, strconv.FormatUint(sc.SpanID, 16))   for k, v := range sc.Baggage {      mapWriter.Set(prefixBaggage+k, v)   }  ……   return nil}

Extract 实现:用于从 carrier 中抽取出 SpanContext 信息。

func (t *Tracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {   // We onlysupport the HTTPHeaders/TextMap format.   if format != opentracing.HTTPHeaders && format != opentracing.TextMap {      return noopSpanContext{}, opentracing.ErrUnsupportedFormat   }   mapReader, ok := carrier.(opentracing.TextMapReader)   if !ok {      return noopSpanContext{}, opentracing.ErrInvalidCarrier   }   var sc spanContext ……   err :=mapReader.ForeachKey(func(k, v string) error {      switch k = strings.ToLower(k); k {      case fieldNameTraceID:         var err error         sc.TraceID, err = strconv.ParseUint(v, 16, 64)         if err != nil {            return opentracing.ErrSpanContextCorrupted         }      case fieldNameSpanID:         var err error         sc.SpanID, err = strconv.ParseUint(v, 16, 64)         if err != nil {            return opentracing.ErrSpanContextCorrupted         }      case fieldNameShadowType:         shadowType = v      default:         if strings.HasPrefix(k, prefixBaggage) {            if sc.Baggage == nil {               sc.Baggage = make(map[string]string)            }            sc.Baggage[strings.TrimPrefix(k, prefixBaggage)] = v         } else if strings.HasPrefix(k, prefixShadow) {            if shadowCarrier == nil {               shadowCarrier = make(opentracing.TextMapCarrier)            }            // We build ashadow textmap with the original shadow keys.            shadowCarrier.Set(strings.TrimPrefix(k, prefixShadow), v)         }      }      return nil   })   if err != nil {      return noopSpanContext{}, err   }   if sc.TraceID == 0 &&sc.SpanID == 0 {      return noopSpanContext{}, nil   }   ……   return &sc, nil}

StartSpan 接口实现:用于创立一个新的 Span,可依据传入不同 opts 来实现不同 Span 的初始化。

func (t *Tracer) StartSpan(   operationName string, opts ...opentracing.StartSpanOption,) opentracing.Span {   // Fast paths toavoid the allocation of StartSpanOptions below when tracing   // is disabled: if we have no optionsor a single SpanReference (the common   // case) with a noop context, return anoop span now.   if len(opts) == 1 {      if o, ok := opts[0].(opentracing.SpanReference); ok {         if IsNoopContext(o.ReferencedContext) {            return &t.noopSpan         }      }   }   shadowTr := t.getShadowTracer()   ……   return s}

2.4 noop span 实现:

noop span 实现:使监控代码不依赖 Tracer 和 Span 的返回值,避免程序异样退出。

type noopSpan struct {   tracer *Tracer}var _ opentracing.Span = &noopSpan{}func (n *noopSpan) Context() opentracing.SpanContext                       { return noopSpanContext{} }func (n *noopSpan) BaggageItem(key string) string                          { return "" }func (n *noopSpan) SetTag(key string, value interface{}) opentracing.Span  { return n }func (n *noopSpan) Finish()                                               {}func (n *noopSpan) FinishWithOptions(opts opentracing.FinishOptions)      {}func (n *noopSpan) SetOperationName(operationName string) opentracing.Span { return n }func (n *noopSpan) Tracer() opentracing.Tracer                             { return n.tracer }func (n *noopSpan) LogFields(fields ...otlog.Field)                        {}func (n *noopSpan) LogKV(keyVals ...interface{})                           {}func (n *noopSpan) LogEvent(event string)                                 {}func (n *noopSpan) LogEventWithPayload(event string, payload interface{})  {}func (n *noopSpan) Log(data opentracing.LogData)                           {}func (n *noopSpan) SetBaggageItem(key, val string) opentracing.Span {   if key == Snowball {      panic("attempting to set Snowball on a noop span; use the Recordable optionto StartSpan")   }   return n}

Part3 - 云溪数据库中

Opentracing 简略应用示例

3.1 开启 Tracer Recording 测试

云溪数据库中 开始创立的 span 均是 no operator span, 须要手动调用 StartRecording, 将 span 转换为可 record 状态,能力失常对 span 进行操作。

func TestTracerRecording(t *testing.T) { tr := NewTracer() noop1 := tr.StartSpan("noop") if _, noop := noop1.(*noopSpan); !noop {  t.Error("expected noop span") } noop1.LogKV("hello", "void") noop2 := tr.StartSpan("noop2", opentracing.ChildOf(noop1.Context())) if _, noop := noop2.(*noopSpan); !noop {  t.Error("expected noop child span") } noop2.Finish() noop1.Finish() s1 := tr.StartSpan("a", Recordable) if _, noop := s1.(*noopSpan); noop {  t.Error("Recordable (but not recording) span should not be noop") } if !IsBlackHoleSpan(s1) {  t.Error("Recordable span should be black hole") } // Unless recording is actually started, child spans are still noop. noop3 := tr.StartSpan("noop3", opentracing.ChildOf(s1.Context())) if _, noop := noop3.(*noopSpan); !noop {  t.Error("expected noop child span") } noop3.Finish() s1.LogKV("x", 1) StartRecording(s1, SingleNodeRecording) s1.LogKV("x", 2) s2 := tr.StartSpan("b", opentracing.ChildOf(s1.Context())) if IsBlackHoleSpan(s2) {  t.Error("recording span should not be black hole") } s2.LogKV("x", 3) if err := TestingCheckRecordedSpans(GetRecording(s1), `  span a:   tags: unfinished=   x: 2  span b:   tags: unfinished=   x: 3 `); err != nil {  t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(s2), `  span b:   tags: unfinished=   x: 3 `); err != nil {  t.Fatal(err) } s3 := tr.StartSpan("c", opentracing.FollowsFrom(s2.Context())) s3.LogKV("x", 4) s3.SetTag("tag", "val") s2.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), `  span a:   tags: unfinished=   x: 2  span b:   x: 3  span c:   tags: tag=val unfinished=   x: 4 `); err != nil {  t.Fatal(err) } s3.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), `  span a:      tags: unfinished=   x: 2  span b:   x: 3  span c:   tags: tag=val   x: 4 `); err != nil {  t.Fatal(err) } StopRecording(s1) s1.LogKV("x", 100) if err := TestingCheckRecordedSpans(GetRecording(s1), ``); err != nil {  t.Fatal(err) } // The child span is still recording. s3.LogKV("x", 5) if err := TestingCheckRecordedSpans(GetRecording(s3), `  span c:   tags: tag=val   x: 4   x: 5 `); err != nil {  t.Fatal(err) } s1.Finish()}

3.2 创立 childSpan 测试

测试 StartChildSpan,依据已有 span 创立出一个新的 span,为已有 span 的子 span。

func TestStartChildSpan(t *testing.T) { tr := NewTracer() sp1 := tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 := StartChildSpan("child", sp1, nil /* logTags */, false /*separateRecording*/) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), `  span parent:   span child: `); err != nil {  t.Fatal(err) } sp1 = tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 = StartChildSpan("child", sp1, nil /* logTags */, true /*separateRecording*/) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), `  span parent: `); err != nil {  t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(sp2), `  span child: `); err != nil {  t.Fatal(err) } sp1 = tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 = StartChildSpan(  "child", sp1, logtags.SingleTagBuffer("key", "val"), false, /*separateRecording*/ ) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), `  span parent:   span child:    tags: key=val `); err != nil {  t.Fatal(err) }}

3.3 跨过程追踪测试

测试跨过程追踪性能,次要是测试 inject 接口和 extract 接口,Inject 用于向 carrier 中注入 SpanContext 信息,Extract 用于从 carrier 中抽取出 SpanContext 信息。

 func TestTracerInjectExtract(t *testing.T) { tr := NewTracer() tr2 := NewTracer() // Verify that noop spans become noop spans on the remote side. noop1 := tr.StartSpan("noop") if _, noop := noop1.(*noopSpan); !noop {  t.Fatalf("expected noop span: %+v", noop1) } carrier := make(opentracing.HTTPHeadersCarrier) if err := tr.Inject(noop1.Context(), opentracing.HTTPHeaders, carrier); err != nil {  t.Fatal(err) } if len(carrier) != 0 {  t.Errorf("noop span has carrier: %+v", carrier) } wireContext, err := tr2.Extract(opentracing.HTTPHeaders, carrier) if err != nil {  t.Fatal(err) } if _, noopCtx := wireContext.(noopSpanContext); !noopCtx {  t.Errorf("expected noop context: %v", wireContext) } noop2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext)) if _, noop := noop2.(*noopSpan); !noop {  t.Fatalf("expected noop span: %+v", noop2) } noop1.Finish() noop2.Finish() // Verify that snowball tracing is propagated and triggers recording on the // remote side. s1 := tr.StartSpan("a", Recordable) StartRecording(s1, SnowballRecording) carrier = make(opentracing.HTTPHeadersCarrier) if err := tr.Inject(s1.Context(), opentracing.HTTPHeaders, carrier); err != nil {  t.Fatal(err) } wireContext, err = tr2.Extract(opentracing.HTTPHeaders, carrier) if err != nil {  t.Fatal(err) } s2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext)) // Compare TraceIDs trace1 := s1.Context().(*spanContext).TraceID trace2 := s2.Context().(*spanContext).TraceID if trace1 != trace2 {  t.Errorf("TraceID doesn't match: parent %d child %d", trace1, trace2) } s2.LogKV("x", 1) s2.Finish() // Verify that recording was started automatically. rec := GetRecording(s2) if err := TestingCheckRecordedSpans(rec, `  span remote op:   tags: sb=1   x: 1 `); err != nil {  t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(s1), `  span a:   tags: sb=1 unfinished= `); err != nil {  t.Fatal(err) } if err := ImportRemoteSpans(s1, rec); err != nil {  t.Fatal(err) } s1.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), `  span a:   tags: sb=1  span remote op:   tags: sb=1   x: 1 `); err != nil {  t.Fatal(err) }}