关于golang:Go实现安全双检锁的方法和应用场景

不平安的双检锁

从其余语言转入Go语言的同学常常会陷入一个思考:如何创立一个单例?

有些同学可能会把其它语言中的双检锁模式移植过去,双检锁模式也称为懒汉模式,首次用到的时候才创立实例。大部分人首次用Golang写进去的实例大略是这样的:

type Conn struct {
    Addr  string
    State int
}

var c *Conn
var mu sync.Mutex

func GetInstance() *Conn {
    if c == nil {
        mu.Lock()
        defer mu.Unlock()
        if c == nil {
            c = &Conn{"127.0.0.1:8080", 1}
        }
    }
    return c
}

这里先解释下这段代码的执行逻辑(曾经分明的同学能够间接跳过):

GetInstance用于获取构造体Conn的一个实例,其中:先判断c是否为空,如果为空则加锁,加锁之后再判断一次c是否为空,如果还为空,则创立Conn的一个实例,并赋值给c。这里有两次判空,所以称为双检,须要第二次判空的起因是:加锁之前可能有多个线程/协程都判断为空,这些线程/协程都会在这里等着加锁,它们最终也都会执行加锁操作,不过加锁之后的代码在多个线程/协程之间是串行执行的,一个线程/协程判空之后创立了实例,其它线程/协程在判断c是否为空时必然得出false的后果,这样就能保障c仅创立一次。而且后续调用GetInstance时都会仅执行第一次判空,得出false的后果,而后间接返回c。这样每个线程/协程最多只执行一次加锁操作,后续都只是简略的判断下就能返回后果,其性能必然不错。

理解Java的同学可能晓得Java中的双检锁是非线程平安的,这是因为赋值操作中的两个步骤可能会呈现乱序执行问题。这两个步骤是:对象内存空间的初始化和将内存地址设置给变量。因为编译器或者CPU优化,它们的执行程序可能不确定,先执行第2步的话,锁外边的线程很有可能拜访到没有初始化结束的变量,从而引发某些异样。针对这个问题,Java以及其它一些语言中能够应用volatile来润饰变量,理论执行时会通过插入内存栅栏阻止指令重排,强制依照编码的指令程序执行。

那么Go语言中的双检锁是平安的吗?

答案是也不平安

先来看看指令重排问题:

在Go语言标准中,赋值操作分为两个阶段:第一阶段对赋值操作左右两侧的表达式进行求值,第二阶段赋值依照从左至右的程序执行。(参考:https://golang.google.cn/ref/…)

说的有点形象,但没有提到赋值存在指令重排的问题,隐约感觉不会有这个问题。为了验证,让咱们看一下上边那段代码中赋值操作的伪汇编代码:

红框圈进去的局部对应的代码是: c = &Conn{“127.0.0.1:8080”, 1}

其中有一行:CMPL $0x0, runtime.writeBarrier(SB) ,这个指令就是插入一个内存栅栏。前边是要赋值数据的初始化,后边是赋值操作。如此看,赋值操作不存在指令重排的问题。

既然赋值操作没有指令重排的问题,那这个双检锁怎么还是不平安的呢?

在Golang中,对于大于单个机器字的值,读写它的时候是以一种不确定的程序屡次执行单机器字的操作来实现的。机器字大小就是咱们通常说的32位、64位,即CPU实现一次无定点整数运算能够解决的二进制位数,也能够认为是CPU数据通道的大小。比方在32位的机器上读写一个int64类型的值就须要两次操作。(参考:https://golang.google.cn/ref/…)

因为Golang中对变量的读和写都没有原子性的保障,所以很可能呈现这种状况:锁里边变量赋值只解决了一半,锁外边的另一个goroutine就读到了未齐全赋值的变量。所以这个双检锁的实现是不平安的。

Golang中将这种问题称为data race,说的是对某个数据产生了并发读写,读到的数据不可预测,可能产生问题,甚至导致程序解体。能够在构建或者运行时查看是否会产生这种状况:

$ go test -race mypkg    // to test the package
$ go run -race mysrc.go  // to run the source file
$ go build -race mycmd   // to build the command
$ go install -race mypkg // to install the package

另外上边说单条赋值操作没有重排序的问题,然而重排序问题在Golang中还是存在的,稍不留神就可能写出BUG来。比方下边这段代码:

a=1
b=1
c=a+b

在执行这段程序的goroutine中并不会呈现问题,然而另一个goroutine读取到b==1时并不代表此时a==1,因为a=1和b=1的执行程序可能会被扭转。针对重排序问题,Golang并没有裸露相似volatile的关键字,因为了解和正确应用这类能力进行并发编程的门槛比拟高,所以Golang只是在一些本人认为比拟适宜的中央插入了内存栅栏,尽量放弃语言的简略。对于goroutine之间的数据同步,Go提供了更好的形式,那就是Channel,不过这不是本文的重点,这里就不介绍了。

sync.Once的启发

还是回到最开始的问题,如何在Golang中创立一个单例?

很多人应该会被举荐应用 sync.Once ,这里看下如何应用:

type Conn struct {
    Addr  string
    State int
}

var c *Conn
var once sync.Once

func setInstance() {
    fmt.Println("setup")
    c = &Conn{"127.0.0.1:8080", 1}
}

func doPrint() {
    once.Do(setInstance)
    fmt.Println(c)
}

func loopPrint() {
    for i := 0; i < 10; i++ {
        go doprint()
    }
}

这里重用上文的构造体Conn,设置Conn单例的办法是setInstance,这个办法在doPrint中被once.Do调用,这里的once就是sync.Once的一个实例,而后咱们在loopPrint办法中创立10个goroutine来调用doPrint办法。

依照sync.Once的语义,setInstance应该近执行一次。能够理论执行下看看,我这里间接贴出后果:

setup
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}
&{127.0.0.1:8080 1}

无论执行多少遍,都是这个后果。那么sync.Once是怎么做到的呢?源码很短很分明:

type Once struct {
    done uint32
    m    Mutex
}

func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

Once是一个构造体,其中第一个字段标识是否执行过,第二个字段是一个互斥量。Once仅公开了一个Do办法,用于执行指标函数f。

这里重点看下指标函数f是怎么被执行的?

  1. Do办法中第一行是判断字段done是否为0,为0则代表没执行过,为1则代表执行过。这里用了原子读,写的时候也要原子写,这样能够保障读写不会同时产生,可能读到以后最新的值。
  2. 如果done为0,则调用doSLow办法,从名字咱们就能够领会到这个办法比较慢。
  3. doSlow中首先会加锁,应用的是Once构造体的第二个字段。
  4. 而后再判断done是否为0,留神这里没有应用原子读,为什么呢?因为锁中的办法是串行执行的,不会产生并发读写。
  5. 如果done为0,则调用指标函数f,执行相干的业务逻辑。
  6. 在执行指标函数f前,这里还申明了一个defer:defer atomic.StoreUint32(&o.done, 1) ,应用原子写扭转done的值为1,代表指标函数曾经执行过。它会在指标函数f执行结束,doSlow办法返回之前执行。这个设计很精妙,准确管制了改写done值的机会。

能够看出,这里用的也是双检锁的模式,只不过做了两个加强:一是应用原子读写,防止了并发读写的内存数据不统一问题;二是在defer中更改实现标识,保障了代码执行程序,不会呈现实现标识更改逻辑被编译器或者CPU优化提前执行。

须要留神,如果指标函数f中产生了panic,指标函数也仅执行一次,不会执行屡次直到胜利。

平安的双检锁

有了对sync.Once的了解,咱们能够革新之前写的双检锁逻辑,让它也能平安起来。

type Conn struct {
    Addr  string
    State int
}

var c *Conn
var mu sync.Mutex
var done uint32

func getInstance() *Conn {
    if atomic.LoadUint32(&done) == 0 {
        mu.Lock()
        defer mu.Unlock()
        if done == 0 {
            defer atomic.StoreUint32(&done, 1)
            c = &Conn{"127.0.0.1:8080", 1}
        }
    }
    return c
}

扭转的中央就是sync.Once做的两个加强;原子读写和defer中更改实现标识。

当然如果要做的工作仅限于此,还不如间接应用sync.Once。

有时候咱们须要的单例不是变化无穷的,比方在ylog中须要每小时创立一个日志文件的实例,再比方须要为每一个用户创立不同的单例;再比方创立实例的过程中产生了谬误,可能咱们还会冀望再执行实例的创立过程,直到胜利。这两个需要是sync.Once无奈做到的。

解决panic

这里在创立Conn的时候模仿一个panic。

i:=0
func newConn() *Conn {
    fmt.Println("newConn")
    div := i
    i++
    k := 10 / div
    return &Conn{"127.0.0.1:8080", k}
}

第1次执行newConn时会产生一个除零谬误,并引发 panic。再执行时则能够失常创立。

panic能够通过recover进行解决,因而能够在捕捉到panic时不更改实现标识,之前的getInstance办法能够批改为:

func getInstance() *Conn {
    if atomic.LoadUint32(&done) == 0 {
        mu.Lock()
        defer mu.Unlock()

        if done == 0 {
            defer func() {
                if r := recover(); r == nil {
                    defer atomic.StoreUint32(&done, 1)
                }
            }()

            c = newConn()
        }
    }
    return c
}

能够看到这里只是改了下defer函数,捕获不到panic时才去更改实现标识。留神此时c并没有创立胜利,会返回零值,或者你还须要减少其它的错误处理。

解决error

如果业务代码不是抛出panic,而是返回error,这时候怎么解决?

能够将error转为panic,比方newConn是这样实现的:

func newConn() (*Conn, error) {
    fmt.Println("newConn")
    div := i
    i++
    if div == 0 {
        return nil, errors.New("the divisor is zero")
    }
    k := 1 / div
    return &Conn{"127.0.0.1:8080", k}, nil
}

咱们能够再把它包装一层:

func mustNewConn() *Conn {
    conn, err := newConn()
    if err != nil {
        panic(err)
    }
    return conn
}

如果不应用panic,还能够再引入一个变量,有error时对它赋值,在defer函数中减少对这个变量的判断,如果有谬误值,则不更新实现标识位。代码也比拟容易实现,不过还要减少变量,感觉简单了,这里就不测试这种办法了。

有范畴的单例

前文提到过有时单例不是变化无穷的,我这里将这种单例称为有范畴的单例。

这里还是复用前文的Conn构造体,不过需要批改为要为每个用户创立一个Conn实例。

看一下User的定义:

type User struct {
    done uint32
    Id   int64
    mu   sync.Mutex
    c    *Conn
}

其中包含一个用户Id,其它三个字段还是用于获取以后用户的Conn单例的。

再看看getInstance函数怎么改:

func getInstance(user *User) *Conn {
    if atomic.LoadUint32(&user.done) == 0 {
        user.mu.Lock()
        defer user.mu.Unlock()

        if user.done == 0 {
            defer func() {
                if r := recover(); r == nil {
                    defer atomic.StoreUint32(&user.done, 1)
                }
            }()

            user.c = newConn()
        }
    }
    return user.c
}

这里减少了一个参数 user,办法内的逻辑根本没变,只不过操作的货色都变成user的字段。这样就能够为每个用户创立一个Conn单例。

这个办法有点泛型的意思了,当然不是泛型。

有范畴单例的另一个示例:在ylog中须要每小时创立一个日志文件用于记录以后小时的日志,在每个小时只需创立并关上这个文件一次。

先看看Logger的定义(这里省略和创立单例无关的内容。):

type FileLogger struct {
    lastHour int64
    file     *os.File
    mu       sync.Mutex
    ...
}

lastHour是记录的小时数,如果以后小时数不等于记录的小时数,则阐明应该创立新的文件,这个变量相似于sync.Once中的done字段。

file是关上的文件实例。

mu是创立文件实例时须要加的锁。

下边看一下关上文件的办法:

func (l *FileLogger) ensureFile() (err error) {
    curTime := time.Now()
    curHour := getTimeHour(curTime)
    if atomic.LoadInt64(&l.lastHour) != curHour {
        return l.ensureFileSlow(curTime, curHour)
    }

    return
}

func (l *FileLogger) ensureFileSlow(curTime time.Time, curHour int64) (err error) {
    l.mu.Lock()
    defer l.mu.Unlock()
    if l.lastHour != curHour {
        defer func() {
            if r := recover(); r == nil {
                atomic.StoreInt64(&l.lastHour, curHour)
            }
        }()
        l.createFile(curTime, curHour)
    }
    return
}

这里模拟sync.Once中的解决办法,有两点次要的不同:数值比拟不再是0和1,而是每个小时都会变动的数字;减少了对panic的解决。如果关上文件失败,则还会再次尝试关上文件。

要查看残缺的代码请拜访Github:https://github.com/bosima/ylo…

双检锁的性能

从原理上剖析,双检锁的性能要好过互斥锁,因为互斥锁每次都要加锁;不应用原子操作的双检锁要比应用原子操作的双检锁好一些,毕竟原子操作也是有些老本的。那么理论差距是多少呢?

这里做一个Benchmark Test,还是解决上文的Conn构造体,为了不便测试,定义一个上下文:

type Context struct {
    done uint32
    c    *Conn
    mu   sync.Mutex
}

编写三个用于测试的办法:

func ensure_unsafe_dcl(context *Context) {
    if context.done == 0 {
        context.mu.Lock()
        defer context.mu.Unlock()
        if context.done == 0 {
            defer func() { context.done = 1 }()
            context.c = newConn()
        }
    }
}

func ensure_dcl(context *Context) {
    if atomic.LoadUint32(&context.done) == 0 {
        context.mu.Lock()
        defer context.mu.Unlock()
        if context.done == 0 {
            defer atomic.StoreUint32(&context.done, 1)
            context.c = newConn()
        }
    }
}

func ensure_mutex(context *Context) {
    context.mu.Lock()
    defer context.mu.Unlock()
    if context.done == 0 {
    defer func() { context.done = 1 }()
        context.c = newConn()
    }
}

这三个办法别离对应不平安的双检锁、应用原子操作的平安双检锁和每次都加互斥锁。它们的作用都是确保Conn构造体的实例存在,如果不存在则创立。

应用的测试方法都是上面这种写法,依照计算机逻辑处理器的数量并行运行测试方法:

func BenchmarkInfo_DCL(b *testing.B) {
    context := &Context{}
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            ensure_dcl(context)
            processConn(context.c)
        }
    })
}

先看一下Benchmark Test的后果:

能够看到应用双检锁相比每次加锁的晋升是两个数量级,这是失常的。

而不平安的双检锁和应用原子操作的平安双检锁工夫耗费相差无几,为什么呢?

次要起因是这里写只有1次,剩下的全是读。即便应用了原子操作,绝大部分状况下CPU读数据的时候也不必在多个外围之间同步(锁总线、锁缓存等),只须要读缓存就能够了。这也从一个方面证实了双检锁模式的意义。

另外上文提到过Go读写超过一个机器字的变量时是非原子的,那如果读写只有1个机器字呢?在64位机器上读写int64自身就是原子操作,也就是说读写应该都只需1次操作,不论用不必atomic办法。这能够在编译器文档或者CPU手册中验证。(Reference:https://preshing.com/20130618…)

不过这两个剖析不是说咱们应用原子操作没有意义,不平安双检锁的执行后果是没有Go语言标准保障的,上边的后果只是在特定编译器、特定平台下的基准测试后果,不同的编译器、CPU,甚至不同版本的Go都不晓得会出什么幺蛾子,运行的成果也就无奈保障。咱们不得不思考程序的可移植性。


以上就是本文次要内容,如有问题欢送反馈。残缺代码曾经上传到Github,欢送拜访:https://github.com/bosima/go-…

播种更多架构常识,请关注微信公众号 萤火架构。原创内容,转载请注明出处。

【腾讯云】轻量 2核2G4M,首年65元

阿里云限时活动-云数据库 RDS MySQL  1核2G配置 1.88/月 速抢

本文由乐趣区整理发布,转载请注明出处,谢谢。

您可能还喜欢...

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据