共计 4273 个字符,预计需要花费 11 分钟才能阅读完成。
有没感觉
Go
的sync
包不够用?有没遇到类型没有sync/atomic
反对?咱们一起看看
go-zero
的syncx
包对规范库的一些增值补充。https://github.com/tal-tech/g…
name | 作用 |
---|---|
AtomicBool | bool 类型 原子类 |
AtomicDuration | Duration 无关 原子类 |
AtomicFloat64 | float64 类型 原子类 |
Barrier | 栏栅【将加锁解锁包装】 |
Cond | 条件变量 |
DoneChan | 优雅告诉敞开 |
ImmutableResource | 创立后不会批改的资源 |
Limit | 管制申请数 |
LockedCalls | 确保办法的串行调用 |
ManagedResource | 资源管理 |
Once | 提供 once func |
OnceGuard | 一次性应用的资源管理 |
Pool | pool,简略的池 |
RefResource | 援用计数的资源 |
ResourceManager | 资源管理器 |
SharedCalls | 相似 singflight 的性能 |
SpinLock | 自旋锁:自旋 +CAS |
TimeoutLimit | Limit + timeout 管制 |
上面开始对以上库组件做别离介绍。
atomic
因为没有 泛型 反对,所以才会呈现多种类型的原子类反对。以下采纳 float64
作为例子:
func (f *AtomicFloat64) Add(val float64) float64 {
for {old := f.Load()
nv := old + val
if f.CompareAndSwap(old, nv) {return nv}
}
}
func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool {return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val))
}
func (f *AtomicFloat64) Load() float64 {return math.Float64frombits(atomic.LoadUint64((*uint64)(f)))
}
func (f *AtomicFloat64) Set(val float64) {atomic.StoreUint64((*uint64)(f), math.Float64bits(val))
}
Add(val)
:如果CAS
失败,一直 for 循环重试,获取 old val,并 set old+val;CompareAndSwap(old, new)
:调用底层atomic
的CAS
;Load()
:调用atomic.LoadUint64
,而后转换Set(val)
:调用atomic.StoreUint64
至于其余类型,开发者想本人扩大本人想要的类型,能够按照上述,基本上调用原始 atomic
操作,而后转换为须要的类型,比方:遇到 bool
能够借助 0, 1
来分辨对应的 false, true
。
Barrier
这里 Barrier
只是将业务函数操作封装,作为闭包传入,外部将 lock
操作的加锁解锁自行解决了【避免开发者加锁了遗记解锁】
func (b *Barrier) Guard(fn func()) {b.lock.Lock()
defer b.lock.Unlock()
// 本人的业务逻辑
fn()}
Cond/Limit/TimeoutLimit
这个数据结构和 Limit
一起组成了 TimeoutLimit
,这里将这 3 个一起讲:
func NewTimeoutLimit(n int) TimeoutLimit {
return TimeoutLimit{limit: NewLimit(n),
cond: NewCond(),}
}
func NewLimit(n int) Limit {
return Limit{pool: make(chan lang.PlaceholderType, n),
}
}
limit
这里是有缓冲的channel
;cond
是无缓冲的;
所以这里联合名字来了解:因为 Limit
是限度某一种资源的应用,所以须要事后在资源池中放入预置数量的资源;Cond
相似阀门,须要两边都筹备好,能力进行数据交换,所以应用无缓冲,同步控制。
这里咱们看看 stores/mongo
中对于 session
的治理,来了解 资源管制:
func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) {
// 选项参数注入
...
// 看 limit 中是否还能取出资源
if err := cs.limit.Borrow(o.timeout); err != nil {return nil, err} else {return cs.Copy(), nil
}
}
func (l TimeoutLimit) Borrow(timeout time.Duration) error {
// 1. 如果还有 limit 中还有资源,取出一个,返回
if l.TryBorrow() {return nil}
// 2. 如果 limit 中资源曾经用完了
var ok bool
for {
// 只有 cond 能够取出一个【无缓存,也只有 cond <- 此条能力通过】timeout, ok = l.cond.WaitWithTimeout(timeout)
// 尝试取出一个【下面 cond 通过时,就有一个资源返回了】// 看 `Return()`
if ok && l.TryBorrow() {return nil}
// 超时管制
if timeout <= 0 {return ErrTimeout}
}
}
func (l TimeoutLimit) Return() error {
// 返回去一个资源
if err := l.limit.Return(); err != nil {return err}
// 同步告诉另一个须要资源的协程【实现了阀门,两方替换】l.cond.Signal()
return nil
}
资源管理
同文件夹中还有 ResourceManager
,从名字上相似,这里将两个组件放在一起解说。
先从构造上:
type ManagedResource struct {
// 资源
resource interface{}
lock sync.RWMutex
// 生成资源的逻辑,由开发者本人管制
generate func() interface{}
// 比照资源
equals func(a, b interface{}) bool
}
type ResourceManager struct {
// 资源:这里看得出来是 I/O,resources map[string]io.Closer
sharedCalls SharedCalls
// 对资源 map 互斥拜访
lock sync.RWMutex
}
而后来看获取资源的办法签名:
func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error)
// 获取一个资源(有就间接获取,没有生成一个)func (mr *ManagedResource) Take() interface{}
// 判断这个资源是否不合乎传入的判断要求,不合乎则重置
func (mr *ManagedResource) MarkBroken(resource interface{})
ResourceManager
应用SharedCalls
做防反复申请,并将资源缓存在外部的sourMap
;另外传入的create func
和IO
操作无关,常见用在网络资源的缓存;ManagedResource
缓存资源没有map
而是繁多的interface
,阐明只有一份,然而它提供了Take()
和传入generate()
阐明能够让开发者自行更新resource
;
所以在用处上:
ResourceManager
:用在网络资源的治理。如:数据库连贯治理;ManagedResource
:用在一些变动资源,能够做资源前后比照,达到更新资源。如:token
治理和验证
RefResource
这个就和 GC
中援用计数相似:
Use() -> ref++
Clean() -> ref--; if ref == 0 -> ref clean
func (r *RefResource) Use() error {
// 互斥拜访
r.lock.Lock()
defer r.lock.Unlock()
// 革除标记
if r.cleaned {return ErrUseOfCleaned}
// 援用 +1
r.ref++
return nil
}
SharedCalls
一句话形容: 应用 SharedCalls 能够使得同时多个申请只须要发动一次拿后果的调用,其余申请 ” 不劳而获 ”,这种设计无效缩小了资源服务的并发压力,能够无效避免缓存击穿 。
这个组件被重复利用在其余组件中,下面说的 ResourceManager
。
相似当须要高频并发拜访一个资源时,就能够应用 SharedCalls
缓存。
// 当多个申请同时应用 Do 办法申请资源时
func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
// 先申请加锁
g.lock.Lock()
// 依据 key,获取对应的 call 后果, 并用变量 c 保留
if c, ok := g.calls[key]; ok {
// 拿到 call 当前,开释锁,此处 call 可能还没有理论数据,只是一个空的内存占位
g.lock.Unlock()
// 调用 wg.Wait,判断是否有其余 goroutine 正在申请资源,如果阻塞,阐明有其余 goroutine 正在获取资源
c.wg.Wait()
// 当 wg.Wait 不再阻塞,示意资源获取曾经完结,能够间接返回后果
return c.val, c.err
}
// 没有拿到后果,则调用 makeCall 办法去获取资源,留神此处依然是锁住的,能够保障只有一个 goroutine 能够调用 makecall
c := g.makeCall(key, fn)
// 返回调用后果
return c.val, c.err
}
总结
不反复造轮子 ,始终是 go-zero
设计宗旨之一;也同时将平时业务积淀到组件中,这才是框架和组件的意义。
对于 go-zero
更多的设计和实现文章,能够继续关注咱们。欢送大家去关注和应用。
我的项目地址
https://github.com/tal-tech/go-zero
欢送应用 go-zero 并 star 反对咱们!
微信交换群
关注『微服务实际』公众号并回复 进群 获取社区群二维码。
go-zero 系列文章见『微服务实际』公众号