关于监控工具:Openfalcon-semaphore的源码实现

semaphore即信号量,Open-falcon在将queue中的数据发送给graph时,用semaphore管制并发的goroutine数量,防止产生大量的发送goroutine。

func forward2GraphTask(Q *list.SafeListLimited, node string, addr string, concurrent int) {
    sema := nsema.NewSemaphore(concurrent)
    for {
        items := Q.PopBackBy(batch)
        sema.Acquire()
        go func(addr string, graphItems []*cmodel.GraphItem, count int) {
            defer sema.Release()
            err = GraphConnPools.Call(addr, "Graph.Send", graphItems, resp)
            ....
        }
        ....
    }
}

semaphore应用channel实现:

  • 定义一个len的channel;
  • 当申请semaphore时,入队channel一个元素,当超过len时,阻塞住,也就是申请失败;
  • 当开释semaphore是,出队channel一个元素;
type Semaphore struct {
   bufSize int
   channel chan int8
}

func NewSemaphore(concurrencyNum int) *Semaphore {
   return &Semaphore{channel: make(chan int8, concurrencyNum), bufSize: concurrencyNum}
}

申请semaphore:

//channel入队胜利,返回true;否则返回false;
func (this *Semaphore) TryAcquire() bool {
   select {
   case this.channel <- int8(0):
      return true
   default:
      return false
   }
}

//若channel未满,入队返回;否则,阻塞
func (this *Semaphore) Acquire() {
   this.channel <- int8(0)
}

开释semaphore: channel出队一个元素

func (this *Semaphore) Release() {
   <-this.channel
}

查问可用的semaphore:

func (this *Semaphore) AvailablePermits() int {
   return this.bufSize - len(this.channel)
}

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理