序
本文次要钻研一下dapr的Limiter
Limiter
dapr/pkg/concurrency/limiter.go
const ( // DefaultLimit is the default concurrency limit DefaultLimit = 100)// Limiter objecttype Limiter struct { limit int tickets chan int numInProgress int32}
Limiter定义了limit、tickets、numInProgress属性
NewLimiter
dapr/pkg/concurrency/limiter.go
// NewLimiter allocates a new ConcurrencyLimiterfunc NewLimiter(limit int) *Limiter { if limit <= 0 { limit = DefaultLimit } // allocate a limiter instance c := &Limiter{ limit: limit, tickets: make(chan int, limit), } // allocate the tickets: for i := 0; i < c.limit; i++ { c.tickets <- i } return c}
NewLimiter办法依据limit来创立Limiter,并挨个调配ticket
Execute
dapr/pkg/concurrency/limiter.go
// Execute adds a function to the execution queue.// if num of go routines allocated by this instance is < limit// launch a new go routine to execute job// else wait until a go routine becomes availablefunc (c *Limiter) Execute(job func(param interface{}), param interface{}) int { ticket := <-c.tickets atomic.AddInt32(&c.numInProgress, 1) go func(param interface{}) { defer func() { c.tickets <- ticket atomic.AddInt32(&c.numInProgress, -1) }() // run the job job(param) }(param) return ticket}
Execute办法首先获取ticket,而后递增numInProgress,之后异步执行job,执行完后偿还ticket
Wait
dapr/pkg/concurrency/limiter.go
// Wait will block all the previously Executed jobs completed running.//// IMPORTANT: calling the Wait function while keep calling Execute leads to// un-desired race conditionsfunc (c *Limiter) Wait() { for i := 0; i < c.limit; i++ { <-c.tickets }}
Wait办法遍历limit,挨个期待tickets返回
小结
dapr的Limiter定义了limit、tickets、numInProgress属性;它定义了Execute、Wait办法,同时提供NewLimiter的工厂办法。
doc
- dapr