乐趣区

关于后端:最小堆提升每次排序的效率

之前写过一个散布是任务调度零碎,每次执行完工作都要对工作进行排序,应用最小堆的确优化了效率及 cpu

我的项目中须要应用一个简略的定时任务调度的框架,最后间接从 GitHub 上搜了一个 star 比拟多的,就是 https://github.com/robfig/cron,目前有 8000+ star。刚开始应用的时候发现问题不大,然而随着单机须要定时调度的工作越来越多,高峰期差不多靠近 500QPS,随着业务的推广应用,能够预期增长还会比拟快,然而曾经遇到 CPU 使用率偏高的问题,通过 pprof 剖析,很多都是在做排序,看了下这个我的项目的代码,整体执行的过程大略如下:

对所有工作进行排序,依照下次执行工夫进行排序

抉择数组中第一个工作,计算下次执行工夫减去以后工夫失去工夫 t,而后 sleep t

而后从数组第一个元素开始遍历工作,如果此工作须要调度的工夫 < now,那么就执行此工作,执行之后从新计算这个工作的 next 执行工夫

每次待执行的工作执行结束之后,都会从新对这个数组进行排序

而后再循环从排好序的数组中找到第一个须要执行的工作去执行。

代码如下:

for {
        // Determine the next entry to run.
        sort.Sort(byTime(c.entries))

        var timer *time.Timer
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // If there are no entries yet, just sleep - it still handles new entries
            // and stop requests.
            timer = time.NewTimer(100000 * time.Hour)
        } else {timer = time.NewTimer(c.entries[0].Next.Sub(now))
        }

        for {
            select {
            case now = <-timer.C:
                now = now.In(c.location)
                c.logger.Info("wake", "now", now)

                // Run every entry whose next time was less than now
                for _, e := range c.entries {if e.Next.After(now) || e.Next.IsZero() {break}
                    c.startJob(e.WrappedJob)
                    e.Prev = e.Next
                    e.Next = e.Schedule.Next(now)
                    c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
                }

            case newEntry := <-c.add:
                timer.Stop()
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                c.entries = append(c.entries, newEntry)
                c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue

            case <-c.stop:
                timer.Stop()
                c.logger.Info("stop")
                return

            case id := <-c.remove:
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed", "entry", id)
            }

            break
        }
    }

问题就不言而喻了,执行一个工作(或几个工作)都从新计算 next 执行工夫,从新排序,最坏状况就是每次执行 1 个工作,排序一遍,那么执行 k 个工作须要的工夫的工夫复杂度就是 O(k*nlogn),这无疑是十分低效的。

于是想着怎么优化一下这个框架,不难想到每次找最先须要执行的工作就是从一堆工作中找 schedule_time 最小的那一个(设 schedule_time 是工作的执行工夫),那么比拟容易想到的思路就是应用最小堆:

在初始化工作列表的时候就间接构建一个最小堆

每次执行查看 peek 元素是否须要执行

须要执行就 pop 堆顶元素,计算 next 执行工夫,从新 push 入堆

不须要执行就 break 到外层循环取堆顶元素,计算 next_time-now() = need_sleep_time,而后 select 睡眠、add、remove 等操作。

我批改为 min-heap 的形式之后,每次增加工作的时候通过堆的属性进行 up 和 down 调整,每次增加工作工夫复杂度 O(logn),执行 k 个工作工夫复杂度是 O(klogn)。通过验证线上 CPU 应用升高 4~5 倍。CPU 从 50% 左右升高至 10% 左右。

优化后的代码如下,只是其中一部分。

全副的代码也曾经在 github 上曾经创立了一个 Fork 的仓库并推送下来了,全副单测 Case 也都 PASS。感兴趣能够点过来看。https://github.com/tovenja/cron

    for {
        // Determine the next entry to run.
        // Use min-heap no need sort anymore


     // 这里不再须要排序了,因为 add 的时候间接进行堆调整
     //sort.Sort(byTime(c.entries))

        var timer *time.Timer
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // If there are no entries yet, just sleep - it still handles new entries
            // and stop requests.
            timer = time.NewTimer(100000 * time.Hour)
        } else {timer = time.NewTimer(c.entries[0].Next.Sub(now))
            //fmt.Printf("%v, %+v\n", c.entries[0].Next.Sub(now), c.entries[0].ID)
        }

        for {
            select {
            case now = <-timer.C:
                now = now.In(c.location)
                c.logger.Info("wake", "now", now)
                // Run every entry whose next time was less than now
                for {e := c.entries.Peek()
                    if e.Next.After(now) || e.Next.IsZero() {break}
                    e = heap.Pop(&c.entries).(*Entry)
                    c.startJob(e.WrappedJob)
                    e.Prev = e.Next
                    e.Next = e.Schedule.Next(now)
                    heap.Push(&c.entries, e)
                    c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
                }

            case newEntry := <-c.add:
                timer.Stop()
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                heap.Push(&c.entries, newEntry)
                c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue

            case <-c.stop:
                timer.Stop()
                c.logger.Info("stop")
                return

            case id := <-c.remove:
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed", "entry", id)
            }

            break
        }
    }

转自:

cnblogs.com/aboutblank/p/14860571.html

更多好文 关注

本文由 mdnice 多平台公布

退出移动版