一、缓存雪崩的利用
背景:
咱们在重启 pod 的时候,此时会导致 gocache 中重启,而后缓存同时大批量生效。如果此时并发比拟高,会有很多 goroutine,去同时拜访 redis。
加单飞,将一组雷同的申请合并成一个申请,实际上只会去申请一次,而后对所有的申请返回雷同的后果
singlefight 试验:
singlefight_test.go
须要从新从 redis 获取数据存取到 gocache。
func BenchmarkUse(b *testing.B) {ctx := context.Background()
wordTouchRedisClient.Set(ctx, "k", "v", time.Second*600)
goCache := cache.New(time.Second*60, time.Second*60)
//sg := singleflight.Group{}
for i := 0; i < b.N; i++ {_, ok := goCache.Get("k")
if !ok {go func() {//_, _, _ = sg.Do("k", func() (interface{}, error) {v, _ := wordTouchRedisClient.Get(ctx, "k").Result()
goCache.Set("k", v, time.Second*60)
//return v, nil
//})
}()}
}
}
BenchmarkUse-8 94518 20173 ns/op
此时引入单飞
func BenchmarkUse(b *testing.B) {ctx := context.Background()
wordTouchRedisClient.Set(ctx, "k", "v", time.Second*600)
goCache := cache.New(time.Second*60, time.Second*60)
sg := singleflight.Group{}
for i := 0; i < b.N; i++ {_, ok := goCache.Get("k")
if !ok {go func() {_, _, _ = sg.Do("k", func() (interface{}, error) {v, _ := wordTouchRedisClient.Get(ctx, "k").Result()
goCache.Set("k", v, time.Second*60)
return v, nil
})
}()}
}
}
BenchmarkUse-8 21307608 46.96 ns/op
BenchmarkUse-2 25675206 45.37 ns/op
危险:
- 如果一个报错, 同一批都报错
二、源码剖析
源码正文
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package singleflight provides a duplicate function call suppression
// mechanism.
// singleflight 包提供了反复函数调用克制机制。package singleflight // import "golang.org/x/sync/singleflight"
import (
"bytes"
"errors"
"fmt"
"runtime"
"runtime/debug"
"sync"
)
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
// errGoexit 示意 runtime.Goexit 被用户的函数调用了
var errGoexit = errors.New("runtime.Goexit was called")
// A panicError is an arbitrary value recovered from a panic
// panicError 是从 panic 中 复原的任意值
// with the stack trace during the execution of given function.
// 执行给定函数期间的堆栈跟踪
type panicError struct {value interface{}
stack []byte}
// Error implements error interface.
// Error 实现谬误接口
func (p *panicError) Error() string {return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
func newPanicError(v interface{}) error {stack := debug.Stack()
// The first line of the stack trace is of the form "goroutine N [status]:"
// 堆栈跟踪的第一行的模式为“goroutine N [status]:”// but by the time the panic reaches Do the goroutine may no longer exist
// 但当 panic 达到 Do 时,goroutine 可能不再存在
// and its status will have changed. Trim out the misleading line.
// 并且它的状态将会扭转。修剪掉误导性的线条。if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
// call is an in-flight or completed singleflight.Do call
// call 是正在进行的或已实现的 singleflight.Do() 调用
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// 这些字段在 WaitGroup 实现之前写入一次
// and are only read after the WaitGroup is done.
// 并且仅在 WaitGroup 实现后才读取。val interface{}
err error
// These fields are read and written with the singleflight
// 这些字段是用 singleflight mutex 读写的
// mutex held before the WaitGroup is done, and are read but
// 在 WaitGroup 实现前。// not written after the WaitGroup is done.
// 并且 只读不写,在 WaitGroup 实现后。dups int
chans []chan<- Result}
// Group represents a class of work and forms a namespace in
// Group 代表一个工作类,并在其中造成一个命名空间
// which units of work can be executed with duplicate suppression.
// 哪些工作单元能够通过反复克制来执行。type Group struct {
mu sync.Mutex // protects m 用来爱护 m,并发平安
m map[string]*call // lazily initialized 提早初始化
}
// Result holds the results of Do, so they can be passed
// Result 保留了 Do 的后果,因而能够传递
// on a channel.
// 在通道上
type Result struct {Val interface{}
Err error
Shared bool
}
// Do executes and returns the results of the given function,
// Do 执行并返回给定函数的后果
// making sure that only one execution is in-flight for a given key at a time.
// 确保在某一时刻对于给定的键只有一次正在执行
// If a duplicate comes in, the duplicate caller waits for the original
// 如果有反复的调用者进入,则反复的调用者将期待最后者
// to complete and receives the same results.
// 实现并收到雷同的后果。// The return value shared indicates whether v was given to multiple callers.
// 返回值 shared 示意 v 是否被给予多个调用者。func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {g.mu.Lock()
if g.m == nil {g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {panic(e)
} else if c.err == errGoexit {runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
// DoChan 与 Do 相似,但返回一个 chanel 通道 接管筹备好后的后果。//
// The returned channel will not be closed.
// 返回的 channel 通道不会被敞开。func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
// doCall handles the single call for a key.
// doCall 解决对 key 的单个调用。func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// 应用双重提早 来辨别 panic 和 runtime.Goexit,
// more details see https://golang.org/cl/134395
// 更多详情参见 https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
// 调用给定函数 runtime.Goexit
if !normalReturn && !recovered {c.err = errGoexit}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
if g.m[key] == c {delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// 为了避免期待通道永远被阻塞,// needs to ensure that this panic cannot be recovered.
// 须要确保这种 panic 恐慌无奈复原。if len(c.chans) > 0 {go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
// 保留此 goroutine,以便它呈现在故障转储中。} else {panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
// 曾经在 goexit 过程中,无需再次调用
} else {
// Normal return
// 失常返回
for _, ch := range c.chans {ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// 现实状况下,咱们会期待获取堆栈跟踪,直到咱们确定
// whether this is a panic or a runtime.Goexit.
// 这是恐慌还是 runtime.Goexit。//
// Unfortunately, the only way we can distinguish the two is to see
// 可怜的是,咱们辨别两者的惟一办法就是看
// whether the recover stopped the goroutine from terminating, and by
// 复原是否阻止 goroutine 终止,并且通过
// the time we know that, the part of the stack trace relevant to the
// 当咱们晓得时,堆栈跟踪中与
// panic has been discarded.
// 恐慌已被抛弃。if r := recover(); r != nil {c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {recovered = true}
}
// Forget tells the singleflight to forget about a key. Future calls
// Forget 通知 singleflight 遗记某个键。将来的 calls 调用
// to Do for this key will call the function rather than waiting for
// 为此键执行的操作将调用该函数而不是期待
// an earlier call to complete.
// 较早的调用实现。func (g *Group) Forget(key string) {g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()}
并发状况下的 goroutine 执行状况
func BenchmarkUse(b *testing.B) {ctx := context.Background()
wordTouchRedisClient.Set(ctx, "k", "v", time.Second*600)
goCache := cache.New(time.Second*60, time.Second*60)
sg := singleflight.Group{}
for i := 0; i < b.N; i++ {_, ok := goCache.Get("k")
if !ok {go func() {_, _, _ = sg.Do("k", func() (interface{}, error) {v, _ := wordTouchRedisClient.Get(ctx, "k").Result()
goCache.Set("k", v, time.Second*60)
return v, nil
})
}()}
}
}
如图表展现
就是在第一个 子 goroutine 的从开始到完结,启动的 其余子 goroutine,都和第一个 goroutine,都领有雷同的 call,为同一个 group。而后返回同样的后果。
第一个子 goroutine,完结完,就删掉 key,而后在上面的 goroutine,为新的一组。