共计 6809 个字符,预计需要花费 18 分钟才能阅读完成。
原文链接:戳这里
前言
嗨,大家好,我是 asong。最近想写一个并发编程系列的文章,应用
Go
也有一段时间了,然而对并发的了解不是很透彻,借着这次总结,心愿能更进一步。咱们以 ” 原子操作 ” 开篇,对于并发操作而言,原子操作是个十分事实的问题,比拟典型的利用的就是i++
操作,并发状况下,同时对内存中的i
进行读取,就会产生与预期不符的后果,所以Go
语言中的sync/atomic
就是解决这个问题的,接下来咱们一起来看一看Go
的原子操作。
什么是原子性、原子操作
原子 (atomic) 本意是 ” 不能被进一步宰割的最小粒子 ”,而原子操作 (atomic operation) 意为 ” 不可中断的一个或一系列操作 ”。其实用大白话说进去就是让多个线程对同一块内存的操作是串行的,不会因为并发操作把内存写的不合乎预期。咱们来看这样一个例子:
假如当初是一个银行账户零碎,用户 A 想要本人从本人的账户直达 1 万元到用户 B 的账户上,直到转帐胜利实现一个事务,次要做这两件事:
- 从 A 的账户中减去 1 万元,如果 A 的账户原来就有 2 万元,当初就变成了 1 万元
- 给 B 的账户增加 1 万元,如果 B 的账户原来有 2 万元,那么当初就变成了 3 万元
假如在操作一的时候,零碎产生了故障,导致给 B 账户增加款项失败了,那么就要进行回滚。回滚就是回到事务之前的状态,咱们把这种要么一起胜利的操作叫做原子操作,而原子性就是要么残缺的被执行、要么齐全不执行。
如何保障原子性
- 锁机制
在处理器层面,能够采纳总线加锁或者对缓存加锁的形式来实现多处理器之间的原子操作。通过加锁保障从零碎内存中读取或写入一个字节是原子的,也就是当一个处理器读取一个字节时,其余处理器不能拜访这个字节的内存地址。
总线锁:处理器提供一个 Lock#
信号,当一个处理器上在总线上输入此信号时,其余处理器的申请将被阻塞住,那么该处理器能够独占共享内存。总线锁会把 CPU
和内存之间的通信锁住了,在锁定期间,其余解决就不能操作其余内存地址的数据,所以总线锁定的开销比拟大,所以解决会在某些场合应用缓存锁进行优化。
缓存锁:内存区域如果被缓存在处理器上的缓存行中,并且在 Lock#
操作期间,那么当它执行操作回写到内存时,解决不在总线上声言 Lock#
信号,而是批改外部的内存地址,并容许它的缓存统一机制来保障操作的原子性,因为缓存一致性机制会阻止同时批改由两个以上处理器缓存的内存区域的数据,其余处理器回写已被锁定的缓存行的数据时,就会使缓存有效。
锁机制尽管能够保障原子性,然而锁机制会存在以下问题:
- 多线程竞争的状况下,频繁的加锁、开释锁会导致较多的上下文切换和调度延时,性能会很差
- 当一个线程占用工夫比拟长时,就多导致其余须要此锁的线程挂起.
下面咱们说的都是乐观锁,要解决这种低效的问题,咱们能够采纳乐观锁,每次不加锁,而是假如没有抵触去实现某项操作,如果因为抵触失败就重试,直到胜利为止。也就是咱们接下来要说的 CAS(compare and swap).
- CAS(compare and swap)
CAS 的全称为 Compare And Swap
,直译就是比拟替换。是一条 CPU 的原子指令,其作用是让CPU
先进行比拟两个值是否相等,而后原子地更新某个地位的值,其实现形式是给予硬件平台的汇编指令,在 intel
的CPU
中,应用的 cmpxchg
指令,就是说 CAS
是靠硬件实现的,从而在硬件层面晋升效率。简述过程是这样:
假如蕴含 3 个参数内存地位 (V)、预期原值(A) 和新值 (B)。
V
示意要更新变量的值,E
示意预期值,N
示意新值。仅当V
值等于E
值时,才会将V
的值设为N
,如果V
值和E
值不同,则阐明曾经有其余线程在做更新,则以后线程什么都不做,最初CAS
返回以后V
的实在值。CAS 操作时抱着乐观的态度进行的,它总是认为本人能够胜利实现操作。基于这样的原理,CAS 操作即便没有锁,也能够发现其余线程对于以后线程的烦扰。
伪代码能够这样写:
func CompareAndSwap(int *addr,int oldValue,int newValue) bool{
if *addr == nil{return false}
if *addr == oldValue {
*addr = newValue
return true
}
return false
}
不过下面的代码可能会产生一个问题,也就是 ABA
问题,因为 CAS 须要在操作值的时候查看下值有没有发生变化,如果没有发生变化则更新,然而如果一个值原来是 A,变成了 B,又变成了 A,那么应用 CAS 进行查看时会发现它的值没有发生变化,然而实际上却变动了。ABA 问题的解决思路就是应用版本号。在变量后面追加上版本号,每次变量更新的时候把版本号加一,那么 A-B-A 就会变成 1A-2B-3A。
go 语言中如何进行原子操作
在 Go
语言规范库中,sync/atomic
包将底层硬件提供的原子操作封装成了 Go
的函数,次要分为 5 个系列的函数,别离是:
func SwapXXXX(addr *int32, new int32) (old int32)
系列:其实就是原子性的将new
值保留到*addr
并返回旧值。代码示意:
old = *addr
*addr = new
return old
func CompareAndSwapXXXX((addr *int64, old, new int64) (swapped bool)
系列:其就是原子性的比拟*addr
和 old 的值,如果雷同则将new
赋值给*addr
并返回真,代码示意:
if *addr == old{
*addr = new
return ture
}
return false
func AddXXXX(addr *int64, delta int64) (new int64)
系列:原子性的将val
的值增加到*addr
并返回新值。代码示意:
*addr += delta
return *addr
func LoadXXXX(addr *uint32) (val uint32)
系列:原子性的获取*addr
的值func StoreXXXX(addr *int32, val int32)
原子性的将 val 值保留到*addr
Go
语言在 1.4
版本时增加一个新的类型 Value
,此类型的值就相当于一个容器,能够被用来 ” 原子地 ” 存储(store) 和加载 (Load) 任意类型的值。这些应用起来都还比较简单,就不写例子了,接下来咱们一起看一看这些办法是如何实现的。
源码解析
因为系列比拟多。底层实现的办法也大同小异样,这里就次要剖析一下 Value
的实现办法吧。为什么不剖析其余系列的呢?因为原子操作由底层硬件反对,所以看其余系列实现都要看汇编,Go 的汇编是基于 Plan9
的,这个汇编语言真的材料甚少,我也是真的不懂,程度不够,也不自讨苦吃了,等前面真的能看懂这些汇编了,再来剖析吧。这个网站有一些对于 plan9
汇编的常识,有趣味能够看一看:http://doc.cat-v.org/plan_9/4…。
Value
构造
咱们先来看一下 Value
的构造:
type Value struct {v interface{}
}
Value
构造里就只有一个字段,是 interface 类型,尽管这里是 interface
类型,然而这里要留神,第一次 Store
写入的类型就确定了之后写入的类型,否则会产生 panic
。因为这里是interface
类型,所以为了之后写入与读取操作不便,又在这个包里定义了一个 ifaceWords
构造,其实他就是一个空 interface
,他的作用就是将interface
分解成类型和数值。构造如下:
// ifaceWords is interface{} internal representation.
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}
Value
的写入操作
咱们一起来看一看他是如何实现写入操作的:
// Store sets the value of the Value to x.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(x interface{}) {
if x == nil {panic("sync/atomic: store of nil value into Value")
}
vp := (*ifaceWords)(unsafe.Pointer(v))
xp := (*ifaceWords)(unsafe.Pointer(&x))
for {typ := LoadPointer(&vp.typ)
if typ == nil {
// Attempt to start first store.
// Disable preemption so that other goroutines can use
// active spin wait to wait for completion; and so that
// GC does not see the fake type accidentally.
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {runtime_procUnpin()
continue
}
// Complete first store.
StorePointer(&vp.data, xp.data)
StorePointer(&vp.typ, xp.typ)
runtime_procUnpin()
return
}
if uintptr(typ) == ^uintptr(0) {
// First store in progress. Wait.
// Since we disable preemption around the first store,
// we can wait with active spinning.
continue
}
// First store completed. Check type and overwrite data.
if typ != xp.typ {panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, xp.data)
return
}
}
// Disable/enable preemption, implemented in runtime.
func runtime_procPin()
func runtime_procUnpin()
这段代码中的正文集曾经通知了咱们,调用 Store
办法写入的类型必须与愿类型雷同,不统一便会产生 panic。接下来剖析代码实现:
- 首先判断条件写入参数不能为
nil
,否则触发panic
- 通过应用
unsafe.Pointer
将oldValue
和newValue
转换成ifaceWords
类型。不便咱们获取他的原始类型 (typ) 和值(data). - 为了保障原子性,所以这里应用一个
for
换来解决,当曾经有Store
正在进行写入时,会进行期待. - 如果还没写入过数据,那么获取不到原始类型,就会开始第一次写入操作,这里会把先调用
runtime_procPin()
办法禁止调度器对以后 goroutine 的抢占(preemption),这样也能够避免GC
线程看到一假类型。 - 调用
CAS
办法来判断以后地址是否有被抢占,这里大家可能对unsafe.Pointer(^uintptr(0))
这一句话有点不明确,因为是第一个写入数据,之前是没有数据的,所以通过这样一个两头值来做判断,如果失败就会解除抢占锁,解除禁止调度器,持续循环期待. - 设置两头值胜利后,咱们接下来就能够平安的把
v
设为传入的新值了,这里会先写入值,在写入类型(typ),因为咱们会依据 ty 来做实现判断。 - 第一次写入没实现,咱们还会通过
uintptr(typ) == ^uintptr(0)
来进行判断,因为还是第一次放入的两头类型,他仍然会持续期待第一次实现。 - 如果第一次写入实现,会查看上一次写入的类型与这次写入的类型是否统一,不统一则会抛出
panic
.
这里代码量没有多少,置信大家肯定看懂了吧~。
Value
的读操作
先看一下代码:
// Load returns the value set by the most recent Store.
// It returns nil if there has been no call to Store for this Value.
func (v *Value) Load() (x interface{}) {vp := (*ifaceWords)(unsafe.Pointer(v))
typ := LoadPointer(&vp.typ)
if typ == nil || uintptr(typ) == ^uintptr(0) {
// First store not yet completed.
return nil
}
data := LoadPointer(&vp.data)
xp := (*ifaceWords)(unsafe.Pointer(&x))
xp.typ = typ
xp.data = data
return
}
读取操作的代码就很简略了:
1. 第一步应用 unsafe.Pointer
将oldValue
转换成 ifaceWords
类型,而后获取他的类型,如果没有类型或者类型进来两头值,那么阐明当初还没数据或者第一次写入还没有实现。
- 通过查看后,调用
LoadPointer
办法能够获取他的值,而后结构一个新interface
的typ
和data
返回。
小彩蛋
后面咱们在说 CAS 时,说到了 ABA
问题,所以我就写了 demo
试一试 Go
规范库 atomic.CompareAndSwapXXX
办法是否有解决这个问题,看运行后果是没有,所以这里大家应用的时候要留神一下(尽管我也没想到什么当初什么业务场景会呈现这个问题,然而还是要留神一下,须要本人评估)。
func main() {
var share uint64 = 1
wg := sync.WaitGroup{}
wg.Add(3)
// 协程 1,期望值是 1, 欲更新的值是 2
go func() {defer wg.Done()
swapped := atomic.CompareAndSwapUint64(&share,1,2)
fmt.Println("goroutine 1",swapped)
}()
// 协程 2,期望值是 1,欲更新的值是 2
go func() {defer wg.Done()
time.Sleep(5 * time.Millisecond)
swapped := atomic.CompareAndSwapUint64(&share,1,2)
fmt.Println("goroutine 2",swapped)
}()
// 协程 3,期望值是 2,欲更新的值是 1
go func() {defer wg.Done()
time.Sleep(1 * time.Millisecond)
swapped := atomic.CompareAndSwapUint64(&share,2,1)
fmt.Println("goroutine 3",swapped)
}()
wg.Wait()
fmt.Println("main exit")
}
总结
原子操作是并发编程的一个根底,也是为我学习 sync.once
打基础,好啦,当初你们应该晓得下篇文章的内容是什么啦,敬请期待~。
好啦,这篇文章就到这里啦,素质三连(分享、点赞、在看)都是笔者继续创作更多优质内容的能源!
创立了一个 Golang 学习交换群,欢送各位大佬们踊跃入群,咱们一起学习交换。入群形式:加我 vx 拉你入群,或者公众号获取入群二维码
结尾给大家发一个小福利吧,最近我在看 [微服务架构设计模式] 这一本书,讲的很好,本人也收集了一本 PDF,有须要的小伙能够到自行下载。获取形式:关注公众号:[Golang 梦工厂],后盾回复:[微服务],即可获取。
我翻译了一份 GIN 中文文档,会定期进行保护,有须要的小伙伴后盾回复 [gin] 即可下载。
翻译了一份 Machinery 中文文档,会定期进行保护,有须要的小伙伴们后盾回复 [machinery] 即可获取。
我是 asong,一名普普通通的程序猿,让咱们一起缓缓变强吧。欢送各位的关注,咱们下期见~~~
举荐往期文章:
- machinery-go 异步工作队列
- 详解 defer 实现机制
- 真的了解 interface 了嘛
- Leaf—Segment 分布式 ID 生成零碎(Golang 实现版本)
- 十张动图带你搞懂排序算法(附 go 实现代码)
- go 参数传递类型
- 手把手教姐姐写音讯队列
- 常见面试题之缓存雪崩、缓存穿透、缓存击穿
- 详解 Context 包,看这一篇就够了!!!
- go-ElasticSearch 入门看这一篇就够了(一)
- 面试官:go 中 for-range 应用过吗?这几个问题你能解释一下起因吗