关于golang:更简的并发代码更强的并发控制

9次阅读

共计 4273 个字符,预计需要花费 11 分钟才能阅读完成。

有没感觉 Gosync 包不够用?有没遇到类型没有 sync/atomic 反对?

咱们一起看看 go-zerosyncx 包对规范库的一些增值补充。

https://github.com/tal-tech/g…

name 作用
AtomicBool bool 类型 原子类
AtomicDuration Duration 无关 原子类
AtomicFloat64 float64 类型 原子类
Barrier 栏栅【将加锁解锁包装】
Cond 条件变量
DoneChan 优雅告诉敞开
ImmutableResource 创立后不会批改的资源
Limit 管制申请数
LockedCalls 确保办法的串行调用
ManagedResource 资源管理
Once 提供 once func
OnceGuard 一次性应用的资源管理
Pool pool,简略的池
RefResource 援用计数的资源
ResourceManager 资源管理器
SharedCalls 相似 singflight 的性能
SpinLock 自旋锁:自旋 +CAS
TimeoutLimit Limit + timeout 管制

上面开始对以上库组件做别离介绍。

atomic

因为没有 泛型 反对,所以才会呈现多种类型的原子类反对。以下采纳 float64 作为例子:

func (f *AtomicFloat64) Add(val float64) float64 {
    for {old := f.Load()
        nv := old + val
        if f.CompareAndSwap(old, nv) {return nv}
    }
}

func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool {return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val))
}

func (f *AtomicFloat64) Load() float64 {return math.Float64frombits(atomic.LoadUint64((*uint64)(f)))
}

func (f *AtomicFloat64) Set(val float64) {atomic.StoreUint64((*uint64)(f), math.Float64bits(val))
}
  • Add(val):如果 CAS 失败,一直 for 循环重试,获取 old val,并 set old+val;
  • CompareAndSwap(old, new):调用底层 atomicCAS
  • Load():调用 atomic.LoadUint64,而后转换
  • Set(val):调用 atomic.StoreUint64

至于其余类型,开发者想本人扩大本人想要的类型,能够按照上述,基本上调用原始 atomic 操作,而后转换为须要的类型,比方:遇到 bool 能够借助 0, 1 来分辨对应的 false, true

Barrier

这里 Barrier 只是将业务函数操作封装,作为闭包传入,外部将 lock 操作的加锁解锁自行解决了【避免开发者加锁了遗记解锁】

func (b *Barrier) Guard(fn func()) {b.lock.Lock()
    defer b.lock.Unlock()
  // 本人的业务逻辑
    fn()}

Cond/Limit/TimeoutLimit

这个数据结构和 Limit 一起组成了 TimeoutLimit,这里将这 3 个一起讲:

func NewTimeoutLimit(n int) TimeoutLimit {
    return TimeoutLimit{limit: NewLimit(n),
        cond:  NewCond(),}
}

func NewLimit(n int) Limit {
    return Limit{pool: make(chan lang.PlaceholderType, n),
    }
}
  • limit 这里是有缓冲的 channel
  • cond 是无缓冲的;

所以这里联合名字来了解:因为 Limit 是限度某一种资源的应用,所以须要事后在资源池中放入预置数量的资源;Cond 相似阀门,须要两边都筹备好,能力进行数据交换,所以应用无缓冲,同步控制。

这里咱们看看 stores/mongo 中对于 session 的治理,来了解 资源管制:

func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) {
  // 选项参数注入
    ...
  // 看 limit 中是否还能取出资源
    if err := cs.limit.Borrow(o.timeout); err != nil {return nil, err} else {return cs.Copy(), nil
    }
}

func (l TimeoutLimit) Borrow(timeout time.Duration) error {
  // 1. 如果还有 limit 中还有资源,取出一个,返回
    if l.TryBorrow() {return nil}
    // 2. 如果 limit 中资源曾经用完了
    var ok bool
    for {
    // 只有 cond 能够取出一个【无缓存,也只有 cond <- 此条能力通过】timeout, ok = l.cond.WaitWithTimeout(timeout)
    // 尝试取出一个【下面 cond 通过时,就有一个资源返回了】// 看 `Return()`
        if ok && l.TryBorrow() {return nil}
        // 超时管制
        if timeout <= 0 {return ErrTimeout}
    }
}

func (l TimeoutLimit) Return() error {
  // 返回去一个资源
    if err := l.limit.Return(); err != nil {return err}
    // 同步告诉另一个须要资源的协程【实现了阀门,两方替换】l.cond.Signal()
    return nil
}

资源管理

同文件夹中还有 ResourceManager,从名字上相似,这里将两个组件放在一起解说。

先从构造上:

type ManagedResource struct {
  // 资源
    resource interface{}
    lock     sync.RWMutex
  // 生成资源的逻辑,由开发者本人管制
    generate func() interface{}
  // 比照资源
    equals   func(a, b interface{}) bool
}

type ResourceManager struct {
  // 资源:这里看得出来是 I/O,resources   map[string]io.Closer
    sharedCalls SharedCalls
  // 对资源 map 互斥拜访
    lock        sync.RWMutex
}

而后来看获取资源的办法签名:

func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error)

// 获取一个资源(有就间接获取,没有生成一个)func (mr *ManagedResource) Take() interface{}
// 判断这个资源是否不合乎传入的判断要求,不合乎则重置
func (mr *ManagedResource) MarkBroken(resource interface{})
  1. ResourceManager 应用 SharedCalls 做防反复申请,并将资源缓存在外部的 sourMap;另外传入的 create funcIO 操作无关,常见用在网络资源的缓存;
  2. ManagedResource 缓存资源没有 map 而是繁多的 interface,阐明只有一份,然而它提供了 Take() 和传入 generate() 阐明能够让开发者自行更新 resource

所以在用处上:

  • ResourceManager:用在网络资源的治理。如:数据库连贯治理;
  • ManagedResource:用在一些变动资源,能够做资源前后比照,达到更新资源。如:token 治理和验证

RefResource

这个就和 GC 中援用计数相似:

  • Use() -> ref++
  • Clean() -> ref--; if ref == 0 -> ref clean
func (r *RefResource) Use() error {
  // 互斥拜访
    r.lock.Lock()
    defer r.lock.Unlock()
    // 革除标记
    if r.cleaned {return ErrUseOfCleaned}
    // 援用 +1
    r.ref++
    return nil
}

SharedCalls

一句话形容: 应用 SharedCalls 能够使得同时多个申请只须要发动一次拿后果的调用,其余申请 ” 不劳而获 ”,这种设计无效缩小了资源服务的并发压力,能够无效避免缓存击穿

这个组件被重复利用在其余组件中,下面说的 ResourceManager

相似当须要高频并发拜访一个资源时,就能够应用 SharedCalls 缓存。

// 当多个申请同时应用 Do 办法申请资源时
func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  // 先申请加锁
  g.lock.Lock()

  // 依据 key,获取对应的 call 后果, 并用变量 c 保留
  if c, ok := g.calls[key]; ok {
    // 拿到 call 当前,开释锁,此处 call 可能还没有理论数据,只是一个空的内存占位
    g.lock.Unlock()
    // 调用 wg.Wait,判断是否有其余 goroutine 正在申请资源,如果阻塞,阐明有其余 goroutine 正在获取资源
    c.wg.Wait()
    // 当 wg.Wait 不再阻塞,示意资源获取曾经完结,能够间接返回后果
    return c.val, c.err
  }

  // 没有拿到后果,则调用 makeCall 办法去获取资源,留神此处依然是锁住的,能够保障只有一个 goroutine 能够调用 makecall
  c := g.makeCall(key, fn)
  // 返回调用后果
  return c.val, c.err
}

总结

不反复造轮子 ,始终是 go-zero 设计宗旨之一;也同时将平时业务积淀到组件中,这才是框架和组件的意义。

对于 go-zero 更多的设计和实现文章,能够继续关注咱们。欢送大家去关注和应用。

我的项目地址

https://github.com/tal-tech/go-zero

欢送应用 go-zero 并 star 反对咱们!

微信交换群

关注『微服务实际』公众号并回复 进群 获取社区群二维码。

go-zero 系列文章见『微服务实际』公众号

正文完
 0