关于golang:Go实现安全双检锁的方法和应用场景

55次阅读

共计 7860 个字符,预计需要花费 20 分钟才能阅读完成。

不平安的双检锁

从其余语言转入 Go 语言的同学常常会陷入一个思考:如何创立一个单例?

有些同学可能会把其它语言中的双检锁模式移植过去,双检锁模式也称为懒汉模式,首次用到的时候才创立实例。大部分人首次用 Golang 写进去的实例大略是这样的:

type Conn struct {
    Addr  string
    State int
}

var c *Conn
var mu sync.Mutex

func GetInstance() *Conn {
    if c == nil {mu.Lock()
        defer mu.Unlock()
        if c == nil {c = &Conn{"127.0.0.1:8080", 1}
        }
    }
    return c
}

这里先解释下这段代码的执行逻辑(曾经分明的同学能够间接跳过):

GetInstance 用于获取构造体 Conn 的一个实例,其中:先判断 c 是否为空,如果为空则加锁,加锁之后再判断一次 c 是否为空,如果还为空,则创立 Conn 的一个实例,并赋值给 c。这里有两次判空,所以称为双检,须要第二次判空的起因是:加锁之前可能有多个线程 / 协程都判断为空,这些线程 / 协程都会在这里等着加锁,它们最终也都会执行加锁操作,不过加锁之后的代码在多个线程 / 协程之间是串行执行的,一个线程 / 协程判空之后创立了实例,其它线程 / 协程在判断 c 是否为空时必然得出 false 的后果,这样就能保障 c 仅创立一次。而且后续调用 GetInstance 时都会仅执行第一次判空,得出 false 的后果,而后间接返回 c。这样每个线程 / 协程最多只执行一次加锁操作,后续都只是简略的判断下就能返回后果,其性能必然不错。

理解 Java 的同学可能晓得 Java 中的双检锁是非线程平安的,这是因为赋值操作中的两个步骤可能会呈现乱序执行问题。这两个步骤是:对象内存空间的初始化和将内存地址设置给变量。因为编译器或者 CPU 优化,它们的执行程序可能不确定,先执行第 2 步的话,锁外边的线程很有可能拜访到没有初始化结束的变量,从而引发某些异样。针对这个问题,Java 以及其它一些语言中能够应用 volatile 来润饰变量,理论执行时会通过插入内存栅栏阻止指令重排,强制依照编码的指令程序执行。

那么 Go 语言中的双检锁是平安的吗?

答案是也 不平安

先来看看指令重排问题:

在 Go 语言标准中,赋值操作分为两个阶段:第一阶段对赋值操作左右两侧的表达式进行求值,第二阶段赋值依照从左至右的程序执行。(参考:https://golang.google.cn/ref/…)

说的有点形象,但没有提到赋值存在指令重排的问题,隐约感觉不会有这个问题。为了验证,让咱们看一下上边那段代码中赋值操作的伪汇编代码:

红框圈进去的局部对应的代码是:c = &Conn{“127.0.0.1:8080”, 1}

其中有一行:CMPL $0x0, runtime.writeBarrier(SB),这个指令就是插入一个内存栅栏。前边是要赋值数据的初始化,后边是赋值操作。如此看,赋值操作不存在指令重排的问题。

既然赋值操作没有指令重排的问题,那这个双检锁怎么还是不平安的呢?

在 Golang 中,对于大于单个机器字的值,读写它的时候是以一种不确定的程序屡次执行单机器字的操作来实现的。机器字大小就是咱们通常说的 32 位、64 位,即 CPU 实现一次无定点整数运算能够解决的二进制位数,也能够认为是 CPU 数据通道的大小。比方在 32 位的机器上读写一个 int64 类型的值就须要两次操作。(参考:https://golang.google.cn/ref/…)

因为Golang 中对变量的读和写都没有原子性的保障,所以很可能呈现这种状况:锁里边变量赋值只解决了一半,锁外边的另一个 goroutine 就读到了未齐全赋值的变量。所以这个双检锁的实现是不平安的。

Golang 中将这种问题称为 data race,说的是对某个数据产生了并发读写,读到的数据不可预测,可能产生问题,甚至导致程序解体。能够在构建或者运行时查看是否会产生这种状况:

$ go test -race mypkg    // to test the package
$ go run -race mysrc.go  // to run the source file
$ go build -race mycmd   // to build the command
$ go install -race mypkg // to install the package

另外上边说单条赋值操作没有重排序的问题,然而 重排序问题在 Golang 中还是存在的,稍不留神就可能写出 BUG 来。比方下边这段代码:

a=1
b=1
c=a+b

在执行这段程序的 goroutine 中并不会呈现问题,然而另一个 goroutine 读取到 b == 1 时并不代表此时 a ==1,因为 a = 1 和 b = 1 的执行程序可能会被扭转。针对重排序问题,Golang 并没有裸露相似 volatile 的关键字,因为了解和正确应用这类能力进行并发编程的门槛比拟高,所以 Golang 只是在一些本人认为比拟适宜的中央插入了内存栅栏,尽量放弃语言的简略。对于 goroutine 之间的数据同步,Go 提供了更好的形式,那就是 Channel,不过这不是本文的重点,这里就不介绍了。

sync.Once 的启发

还是回到最开始的问题,如何在 Golang 中创立一个单例?

很多人应该会被举荐应用 sync.Once,这里看下如何应用:

type Conn struct {
    Addr  string
    State int
}

var c *Conn
var once sync.Once

func setInstance() {fmt.Println("setup")
    c = &Conn{"127.0.0.1:8080", 1}
}

func doPrint() {once.Do(setInstance)
    fmt.Println(c)
}

func loopPrint() {
    for i := 0; i < 10; i++ {go doprint()
    }
}

这里重用上文的构造体 Conn,设置 Conn 单例的办法是 setInstance,这个办法在 doPrint 中被 once.Do 调用,这里的 once 就是 sync.Once 的一个实例,而后咱们在 loopPrint 办法中创立 10 个 goroutine 来调用 doPrint 办法。

依照 sync.Once 的语义,setInstance 应该近执行一次。能够理论执行下看看,我这里间接贴出后果:

setup
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}

无论执行多少遍,都是这个后果。那么 sync.Once 是怎么做到的呢?源码很短很分明:

type Once struct {
    done uint32
    m    Mutex
}

func (o *Once) Do(f func()) {if atomic.LoadUint32(&o.done) == 0 {o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {defer atomic.StoreUint32(&o.done, 1)
        f()}
}

Once 是一个构造体,其中第一个字段标识是否执行过,第二个字段是一个互斥量。Once 仅公开了一个 Do 办法,用于执行指标函数 f。

这里重点看下指标函数 f 是怎么被执行的?

  1. Do 办法中第一行是判断字段 done 是否为 0,为 0 则代表没执行过,为 1 则代表执行过。这里用了原子读,写的时候也要原子写,这样能够保障读写不会同时产生,可能读到以后最新的值。
  2. 如果 done 为 0,则调用 doSLow 办法,从名字咱们就能够领会到这个办法比较慢。
  3. doSlow 中首先会加锁,应用的是 Once 构造体的第二个字段。
  4. 而后再判断 done 是否为 0,留神这里没有应用原子读,为什么呢?因为锁中的办法是串行执行的,不会产生并发读写。
  5. 如果 done 为 0,则调用指标函数 f,执行相干的业务逻辑。
  6. 在执行指标函数 f 前,这里还申明了一个 defer:defer atomic.StoreUint32(&o.done, 1),应用原子写扭转 done 的值为 1,代表指标函数曾经执行过。它会在指标函数 f 执行结束,doSlow 办法返回之前执行。这个设计很精妙,准确管制了改写 done 值的机会。

能够看出,这里用的也是双检锁的模式,只不过做了两个加强:一是应用原子读写,防止了并发读写的内存数据不统一问题;二是在 defer 中更改实现标识,保障了代码执行程序,不会呈现实现标识更改逻辑被编译器或者 CPU 优化提前执行。

须要留神,如果指标函数 f 中产生了 panic,指标函数也仅执行一次,不会执行屡次直到胜利。

平安的双检锁

有了对 sync.Once 的了解,咱们能够革新之前写的双检锁逻辑,让它也能平安起来。

type Conn struct {
    Addr  string
    State int
}

var c *Conn
var mu sync.Mutex
var done uint32

func getInstance() *Conn {if atomic.LoadUint32(&done) == 0 {mu.Lock()
        defer mu.Unlock()
        if done == 0 {defer atomic.StoreUint32(&done, 1)
            c = &Conn{"127.0.0.1:8080", 1}
        }
    }
    return c
}

扭转的中央就是 sync.Once 做的两个加强;原子读写和 defer 中更改实现标识。

当然如果要做的工作仅限于此,还不如间接应用 sync.Once。

有时候咱们须要的单例不是变化无穷的,比方在 ylog 中须要每小时创立一个日志文件的实例,再比方须要为每一个用户创立不同的单例;再比方创立实例的过程中产生了谬误,可能咱们还会冀望再执行实例的创立过程,直到胜利。这两个需要是 sync.Once 无奈做到的。

解决 panic

这里在创立 Conn 的时候模仿一个 panic。

i:=0
func newConn() *Conn {fmt.Println("newConn")
    div := i
    i++
    k := 10 / div
    return &Conn{"127.0.0.1:8080", k}
}

第 1 次执行 newConn 时会产生一个除零谬误,并引发 panic。再执行时则能够失常创立。

panic 能够通过 recover 进行解决,因而能够在捕捉到 panic 时不更改实现标识,之前的 getInstance 办法能够批改为:

func getInstance() *Conn {if atomic.LoadUint32(&done) == 0 {mu.Lock()
        defer mu.Unlock()

        if done == 0 {defer func() {if r := recover(); r == nil {defer atomic.StoreUint32(&done, 1)
                }
            }()

            c = newConn()}
    }
    return c
}

能够看到这里只是改了下 defer 函数,捕获不到 panic 时才去更改实现标识。留神此时 c 并没有创立胜利,会返回零值,或者你还须要减少其它的错误处理。

解决 error

如果业务代码不是抛出 panic,而是返回 error,这时候怎么解决?

能够将 error 转为 panic,比方 newConn 是这样实现的:

func newConn() (*Conn, error) {fmt.Println("newConn")
    div := i
    i++
    if div == 0 {return nil, errors.New("the divisor is zero")
    }
    k := 1 / div
    return &Conn{"127.0.0.1:8080", k}, nil
}

咱们能够再把它包装一层:

func mustNewConn() *Conn {conn, err := newConn()
    if err != nil {panic(err)
    }
    return conn
}

如果不应用 panic,还能够再引入一个变量,有 error 时对它赋值,在 defer 函数中减少对这个变量的判断,如果有谬误值,则不更新实现标识位。代码也比拟容易实现,不过还要减少变量,感觉简单了,这里就不测试这种办法了。

有范畴的单例

前文提到过有时单例不是变化无穷的,我这里将这种单例称为有范畴的单例。

这里还是复用前文的 Conn 构造体,不过需要批改为要为每个用户创立一个 Conn 实例。

看一下 User 的定义:

type User struct {
    done uint32
    Id   int64
    mu   sync.Mutex
    c    *Conn
}

其中包含一个用户 Id,其它三个字段还是用于获取以后用户的 Conn 单例的。

再看看 getInstance 函数怎么改:

func getInstance(user *User) *Conn {if atomic.LoadUint32(&user.done) == 0 {user.mu.Lock()
        defer user.mu.Unlock()

        if user.done == 0 {defer func() {if r := recover(); r == nil {defer atomic.StoreUint32(&user.done, 1)
                }
            }()

            user.c = newConn()}
    }
    return user.c
}

这里减少了一个参数 user,办法内的逻辑根本没变,只不过操作的货色都变成 user 的字段。这样就能够为每个用户创立一个 Conn 单例。

这个办法有点泛型的意思了,当然不是泛型。

有范畴单例的另一个示例:在 ylog 中须要每小时创立一个日志文件用于记录以后小时的日志,在每个小时只需创立并关上这个文件一次。

先看看 Logger 的定义(这里省略和创立单例无关的内容。):

type FileLogger struct {
    lastHour int64
    file     *os.File
    mu       sync.Mutex
    ...
}

lastHour 是记录的小时数,如果以后小时数不等于记录的小时数,则阐明应该创立新的文件,这个变量相似于 sync.Once 中的 done 字段。

file 是关上的文件实例。

mu 是创立文件实例时须要加的锁。

下边看一下关上文件的办法:

func (l *FileLogger) ensureFile() (err error) {curTime := time.Now()
    curHour := getTimeHour(curTime)
    if atomic.LoadInt64(&l.lastHour) != curHour {return l.ensureFileSlow(curTime, curHour)
    }

    return
}

func (l *FileLogger) ensureFileSlow(curTime time.Time, curHour int64) (err error) {l.mu.Lock()
    defer l.mu.Unlock()
    if l.lastHour != curHour {defer func() {if r := recover(); r == nil {atomic.StoreInt64(&l.lastHour, curHour)
            }
        }()
        l.createFile(curTime, curHour)
    }
    return
}

这里模拟 sync.Once 中的解决办法,有两点次要的不同:数值比拟不再是 0 和 1,而是每个小时都会变动的数字;减少了对 panic 的解决。如果关上文件失败,则还会再次尝试关上文件。

要查看残缺的代码请拜访 Github:https://github.com/bosima/ylo…

双检锁的性能

从原理上剖析,双检锁的性能要好过互斥锁,因为互斥锁每次都要加锁;不应用原子操作的双检锁要比应用原子操作的双检锁好一些,毕竟原子操作也是有些老本的。那么理论差距是多少呢?

这里做一个 Benchmark Test,还是解决上文的 Conn 构造体,为了不便测试,定义一个上下文:

type Context struct {
    done uint32
    c    *Conn
    mu   sync.Mutex
}

编写三个用于测试的办法:

func ensure_unsafe_dcl(context *Context) {
    if context.done == 0 {context.mu.Lock()
        defer context.mu.Unlock()
        if context.done == 0 {defer func() {context.done = 1}()
            context.c = newConn()}
    }
}

func ensure_dcl(context *Context) {if atomic.LoadUint32(&context.done) == 0 {context.mu.Lock()
        defer context.mu.Unlock()
        if context.done == 0 {defer atomic.StoreUint32(&context.done, 1)
            context.c = newConn()}
    }
}

func ensure_mutex(context *Context) {context.mu.Lock()
    defer context.mu.Unlock()
    if context.done == 0 {defer func() {context.done = 1}()
        context.c = newConn()}
}

这三个办法别离对应不平安的双检锁、应用原子操作的平安双检锁和每次都加互斥锁。它们的作用都是确保 Conn 构造体的实例存在,如果不存在则创立。

应用的测试方法都是上面这种写法,依照计算机逻辑处理器的数量并行运行测试方法:

func BenchmarkInfo_DCL(b *testing.B) {context := &Context{}
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {for pb.Next() {ensure_dcl(context)
            processConn(context.c)
        }
    })
}

先看一下 Benchmark Test 的后果:

能够看到应用双检锁相比每次加锁的晋升是两个数量级,这是失常的。

而不平安的双检锁和应用原子操作的平安双检锁工夫耗费相差无几,为什么呢?

次要起因是这里写只有 1 次,剩下的全是读。即便应用了原子操作,绝大部分状况下 CPU 读数据的时候也不必在多个外围之间同步(锁总线、锁缓存等),只须要读缓存就能够了。这也从一个方面证实了双检锁模式的意义。

另外上文提到过 Go 读写超过一个机器字的变量时是非原子的,那如果读写只有 1 个机器字呢?在 64 位机器上读写 int64 自身就是原子操作,也就是说读写应该都只需 1 次操作,不论用不必 atomic 办法。这能够在编译器文档或者 CPU 手册中验证。(Reference:https://preshing.com/20130618…)

不过这两个剖析不是说咱们应用原子操作没有意义,不平安双检锁的执行后果是没有 Go 语言标准保障的,上边的后果只是在特定编译器、特定平台下的基准测试后果,不同的编译器、CPU,甚至不同版本的 Go 都不晓得会出什么幺蛾子,运行的成果也就无奈保障。咱们不得不思考程序的可移植性。


以上就是本文次要内容,如有问题欢送反馈。残缺代码曾经上传到 Github,欢送拜访:https://github.com/bosima/go-…

播种更多架构常识,请关注微信公众号 萤火架构。原创内容,转载请注明出处。

正文完
 0