本文次要钻研一下dapr的Limiter

Limiter

dapr/pkg/concurrency/limiter.go

const (    // DefaultLimit is the default concurrency limit    DefaultLimit = 100)// Limiter objecttype Limiter struct {    limit         int    tickets       chan int    numInProgress int32}
Limiter定义了limit、tickets、numInProgress属性

NewLimiter

dapr/pkg/concurrency/limiter.go

// NewLimiter allocates a new ConcurrencyLimiterfunc NewLimiter(limit int) *Limiter {    if limit <= 0 {        limit = DefaultLimit    }    // allocate a limiter instance    c := &Limiter{        limit:   limit,        tickets: make(chan int, limit),    }    // allocate the tickets:    for i := 0; i < c.limit; i++ {        c.tickets <- i    }    return c}
NewLimiter办法依据limit来创立Limiter,并挨个调配ticket

Execute

dapr/pkg/concurrency/limiter.go

// Execute adds a function to the execution queue.// if num of go routines allocated by this instance is < limit// launch a new go routine to execute job// else wait until a go routine becomes availablefunc (c *Limiter) Execute(job func(param interface{}), param interface{}) int {    ticket := <-c.tickets    atomic.AddInt32(&c.numInProgress, 1)    go func(param interface{}) {        defer func() {            c.tickets <- ticket            atomic.AddInt32(&c.numInProgress, -1)        }()        // run the job        job(param)    }(param)    return ticket}
Execute办法首先获取ticket,而后递增numInProgress,之后异步执行job,执行完后偿还ticket

Wait

dapr/pkg/concurrency/limiter.go

// Wait will block all the previously Executed jobs completed running.//// IMPORTANT: calling the Wait function while keep calling Execute leads to//            un-desired race conditionsfunc (c *Limiter) Wait() {    for i := 0; i < c.limit; i++ {        <-c.tickets    }}
Wait办法遍历limit,挨个期待tickets返回

小结

dapr的Limiter定义了limit、tickets、numInProgress属性;它定义了Execute、Wait办法,同时提供NewLimiter的工厂办法。

doc

  • dapr