一、缓存雪崩的利用

背景:

咱们在重启pod的时候,此时会导致gocache中重启,而后缓存同时大批量生效。如果此时并发比拟高,会有很多goroutine,去同时拜访redis。

加单飞,将一组雷同的申请合并成一个申请,实际上只会去申请一次,而后对所有的申请返回雷同的后果

singlefight试验:

singlefight_test.go
须要从新从redis获取数据存取到 gocache。

func BenchmarkUse(b *testing.B) {    ctx := context.Background()    wordTouchRedisClient.Set(ctx, "k", "v", time.Second*600)    goCache := cache.New(time.Second*60, time.Second*60)    //sg := singleflight.Group{}    for i := 0; i < b.N; i++ {       _, ok := goCache.Get("k")       if !ok {          go func() {             //_, _, _ = sg.Do("k", func() (interface{}, error) {             v, _ := wordTouchRedisClient.Get(ctx, "k").Result()             goCache.Set("k", v, time.Second*60)             //return v, nil             //})          }()       }    }}BenchmarkUse-8              94518             20173 ns/op

此时引入单飞

func BenchmarkUse(b *testing.B) {    ctx := context.Background()    wordTouchRedisClient.Set(ctx, "k", "v", time.Second*600)    goCache := cache.New(time.Second*60, time.Second*60)    sg := singleflight.Group{}    for i := 0; i < b.N; i++ {       _, ok := goCache.Get("k")       if !ok {          go func() {             _, _, _ = sg.Do("k", func() (interface{}, error) {                v, _ := wordTouchRedisClient.Get(ctx, "k").Result()                goCache.Set("k", v, time.Second*60)                return v, nil             })          }()       }    }}BenchmarkUse-8           21307608                46.96 ns/opBenchmarkUse-2           25675206                45.37 ns/op

危险:

  1. 如果一个报错, 同一批都报错

二、源码剖析

源码正文

// Copyright 2013 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.// Package singleflight provides a duplicate function call suppression// mechanism.// singleflight包提供了反复函数调用克制机制。package singleflight // import "golang.org/x/sync/singleflight"import (    "bytes"    "errors"    "fmt"    "runtime"    "runtime/debug"    "sync")// errGoexit indicates the runtime.Goexit was called in// the user given function.// errGoexit 示意 runtime.Goexit 被用户的函数调用了var errGoexit = errors.New("runtime.Goexit was called")// A panicError is an arbitrary value recovered from a panic// panicError 是从panic中 复原的任意值// with the stack trace during the execution of given function.// 执行给定函数期间的堆栈跟踪type panicError struct {    value interface{}    stack []byte}// Error implements error interface.// Error 实现谬误接口func (p *panicError) Error() string {    return fmt.Sprintf("%v\n\n%s", p.value, p.stack)}func newPanicError(v interface{}) error {    stack := debug.Stack()    // The first line of the stack trace is of the form "goroutine N [status]:"    // 堆栈跟踪的第一行的模式为“goroutine N [status]:”    // but by the time the panic reaches Do the goroutine may no longer exist    // 但当panic达到 Do 时,goroutine 可能不再存在    // and its status will have changed. Trim out the misleading line.    // 并且它的状态将会扭转。修剪掉误导性的线条。    if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {       stack = stack[line+1:]    }    return &panicError{value: v, stack: stack}}// call is an in-flight or completed singleflight.Do call// call 是正在进行的或已实现的 singleflight.Do() 调用type call struct {    wg sync.WaitGroup    // These fields are written once before the WaitGroup is done    // 这些字段在 WaitGroup 实现之前写入一次    // and are only read after the WaitGroup is done.    // 并且仅在 WaitGroup 实现后才读取。    val interface{}    err error    // These fields are read and written with the singleflight    // 这些字段是用 singleflight mutex  读写的    // mutex held before the WaitGroup is done, and are read but    //  在 WaitGroup实现前。    // not written after the WaitGroup is done.    // 并且 只读不写,在WaitGroup实现后。    dups  int    chans []chan<- Result}// Group represents a class of work and forms a namespace in// Group 代表一个工作类,并在其中造成一个命名空间// which units of work can be executed with duplicate suppression.// 哪些工作单元能够通过反复克制来执行。type Group struct {    mu sync.Mutex       // protects m 用来爱护m,并发平安    m  map[string]*call // lazily initialized  提早初始化}// Result holds the results of Do, so they can be passed// Result保留了Do的后果,因而能够传递// on a channel.// 在通道上type Result struct {    Val    interface{}    Err    error    Shared bool}// Do executes and returns the results of the given function, // Do 执行并返回给定函数的后果// making sure that only one execution is in-flight for a given key at a time. // 确保在某一时刻对于给定的键只有一次正在执行// If a duplicate comes in, the duplicate caller waits for the original// 如果有反复的调用者进入,则反复的调用者将期待最后者// to complete and receives the same results.// 实现并收到雷同的后果。// The return value shared indicates whether v was given to multiple callers.// 返回值shared示意v是否被给予多个调用者。func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {    g.mu.Lock()    if g.m == nil {       g.m = make(map[string]*call)    }    if c, ok := g.m[key]; ok {       c.dups++       g.mu.Unlock()       c.wg.Wait()       if e, ok := c.err.(*panicError); ok {          panic(e)       } else if c.err == errGoexit {          runtime.Goexit()       }       return c.val, c.err, true    }    c := new(call)    c.wg.Add(1)    g.m[key] = c    g.mu.Unlock()    g.doCall(c, key, fn)    return c.val, c.err, c.dups > 0}// DoChan is like Do but returns a channel that will receive the// results when they are ready.// DoChan 与 Do 相似,但返回一个chanel通道 接管筹备好后的后果。//// The returned channel will not be closed.// 返回的channel通道不会被敞开。func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {    ch := make(chan Result, 1)    g.mu.Lock()    if g.m == nil {       g.m = make(map[string]*call)    }    if c, ok := g.m[key]; ok {       c.dups++       c.chans = append(c.chans, ch)       g.mu.Unlock()       return ch    }    c := &call{chans: []chan<- Result{ch}}    c.wg.Add(1)    g.m[key] = c    g.mu.Unlock()    go g.doCall(c, key, fn)    return ch}// doCall handles the single call for a key.// doCall 解决对key的单个调用。func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {    normalReturn := false    recovered := false    // use double-defer to distinguish panic from runtime.Goexit,    // 应用双重提早 来辨别panic和runtime.Goexit,    // more details see https://golang.org/cl/134395    // 更多详情参见 https://golang.org/cl/134395    defer func() {       // the given function invoked runtime.Goexit       // 调用给定函数runtime.Goexit       if !normalReturn && !recovered {          c.err = errGoexit       }       g.mu.Lock()       defer g.mu.Unlock()       c.wg.Done()       if g.m[key] == c {          delete(g.m, key)       }       if e, ok := c.err.(*panicError); ok {          // In order to prevent the waiting channels from being blocked forever,          // 为了避免期待通道永远被阻塞,          // needs to ensure that this panic cannot be recovered.          // 须要确保这种panic恐慌无奈复原。          if len(c.chans) > 0 {             go panic(e)             select {} // Keep this goroutine around so that it will appear in the crash dump.                       // 保留此 goroutine,以便它呈现在故障转储中。            } else {             panic(e)          }       } else if c.err == errGoexit {          // Already in the process of goexit, no need to call again          // 曾经在goexit过程中,无需再次调用       } else {          // Normal return          // 失常返回          for _, ch := range c.chans {             ch <- Result{c.val, c.err, c.dups > 0}          }       }    }()    func() {       defer func() {          if !normalReturn {             // Ideally, we would wait to take a stack trace until we've determined             // 现实状况下,咱们会期待获取堆栈跟踪,直到咱们确定             // whether this is a panic or a runtime.Goexit.             // 这是恐慌还是runtime.Goexit。             //             // Unfortunately, the only way we can distinguish the two is to see             // 可怜的是,咱们辨别两者的惟一办法就是看             // whether the recover stopped the goroutine from terminating, and by             // 复原是否阻止 goroutine 终止,并且通过             // the time we know that, the part of the stack trace relevant to the             // 当咱们晓得时,堆栈跟踪中与             // panic has been discarded.             // 恐慌已被抛弃。             if r := recover(); r != nil {                c.err = newPanicError(r)             }          }       }()       c.val, c.err = fn()       normalReturn = true    }()    if !normalReturn {       recovered = true    }}// Forget tells the singleflight to forget about a key.  Future calls// Forget 通知 singleflight 遗记某个键。将来的calls调用// to Do for this key will call the function rather than waiting for// 为此键执行的操作将调用该函数而不是期待// an earlier call to complete.// 较早的调用实现。func (g *Group) Forget(key string) {    g.mu.Lock()    delete(g.m, key)    g.mu.Unlock()}

并发状况下的goroutine执行状况

func BenchmarkUse(b *testing.B) {    ctx := context.Background()    wordTouchRedisClient.Set(ctx, "k", "v", time.Second*600)    goCache := cache.New(time.Second*60, time.Second*60)    sg := singleflight.Group{}    for i := 0; i < b.N; i++ {       _, ok := goCache.Get("k")       if !ok {          go func() {             _, _, _ = sg.Do("k", func() (interface{}, error) {                v, _ := wordTouchRedisClient.Get(ctx, "k").Result()                goCache.Set("k", v, time.Second*60)                return v, nil             })          }()       }    }}

如图表展现

就是在第一个 子goroutine的从开始到完结,启动的 其余子goroutine,都和第一个goroutine,都领有雷同的call,为同一个group。而后返回同样的后果。
第一个子goroutine,完结完,就删掉key,而后在上面的goroutine,为新的一组。