semaphore 即信号量,Open-falcon 在将 queue 中的数据发送给 graph 时,用 semaphore 管制并发的 goroutine 数量,防止产生大量的发送 goroutine。
func forward2GraphTask(Q *list.SafeListLimited, node string, addr string, concurrent int) {sema := nsema.NewSemaphore(concurrent)
for {items := Q.PopBackBy(batch)
sema.Acquire()
go func(addr string, graphItems []*cmodel.GraphItem, count int) {defer sema.Release()
err = GraphConnPools.Call(addr, "Graph.Send", graphItems, resp)
....
}
....
}
}
semaphore 应用 channel 实现:
- 定义一个 len 的 channel;
- 当申请 semaphore 时,入队 channel 一个元素,当超过 len 时,阻塞住,也就是申请失败;
- 当开释 semaphore 是,出队 channel 一个元素;
type Semaphore struct {
bufSize int
channel chan int8
}
func NewSemaphore(concurrencyNum int) *Semaphore {return &Semaphore{channel: make(chan int8, concurrencyNum), bufSize: concurrencyNum}
}
申请 semaphore:
//channel 入队胜利,返回 true;否则返回 false;func (this *Semaphore) TryAcquire() bool {
select {case this.channel <- int8(0):
return true
default:
return false
}
}
// 若 channel 未满,入队返回;否则,阻塞
func (this *Semaphore) Acquire() {this.channel <- int8(0)
}
开释 semaphore: channel 出队一个元素
func (this *Semaphore) Release() {<-this.channel}
查问可用的 semaphore:
func (this *Semaphore) AvailablePermits() int {return this.bufSize - len(this.channel)
}