关于go:如何用Golang处理每分钟100万个请求

33次阅读

共计 5504 个字符,预计需要花费 14 分钟才能阅读完成。

用 Golang 解决每分钟 100 万个申请

转载请注明起源:https://janrs.com/9yaq


面临的问题

在我设计一个剖析零碎中,咱们公司的指标是可能解决来自数百万个端点的大量 POST 申请。web 网络处理程序将收到一个 JSON 文档,其中可能蕴含许多有效载荷的汇合,须要写入 Amazon S3,以便咱们的地图还原零碎随后对这些数据进行操作。

传统上,咱们会钻研创立一个工人层架构,利用诸如以下货色:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • 还有等等其余的技术手段 …

并设置 2 个不同的集群,一个用于 Web 前端,另一个用于 worker 解决过程,这样咱们就能够扩充咱们能够解决的后盾工作量。

但从一开始,咱们的团队就晓得咱们应该在 Go 中这样做,因为在探讨阶段咱们看到这可能是一个十分大的流量零碎。我应用 Go 已有大概 2 年左右的工夫,咱们公司在解决业务时开发了一些零碎,但没有一个能接受如此大的负载。以下是优化的过程。

咱们首先创立一些构造体来定义咱们将通过 POST 调用接管的 Web 申请负载,以及一种将其上传到咱们的 S3 存储桶的办法。代码如下:

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`}

type Payload struct {// ... 负载字段}

func (p *Payload) UploadToS3() error {
    // storageFolder 办法确保在咱们在键名中取得雷同工夫戳时不会产生名称抵触
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {return encodeErr}

    // 咱们公布到 S3 存储桶的所有内容都应标记为“公有”var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

应用 Go 协程

最后咱们采纳了一个非常简单的 POST 处理程序实现,只是试图将 job 处理程序并行化到一个简略的 goroutine 中:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // 将 body 读入字符串进行 json 解码
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }
    
    // 别离查看每个无效负载和队列我的项目以公布到 S3
    for _, payload := range content.Payloads {go payload.UploadToS3()   // <----- 这是不倡议的做法。这里是最开始的做法。}

    w.WriteHeader(http.StatusOK)
}

对于中等负载,这可能实用于大多数公司的流量,但很快证实这在大规模状况下成果不佳。咱们冀望有很多申请,但没有达到咱们将第一个版本部署到生产环境时开始看到的数量级。咱们齐全低估了流量。

下面的办法在几个不同的方面是不好的。无法控制咱们生成了多少个 go routines。因为咱们每分钟收到 100 万个 POST 申请,因而这段代码很快解体了。

进一步优化

咱们须要找到一种不同的形式。从一开始咱们就开始探讨咱们须要如何放弃申请处理程序的生命周期十分短,并在后盾进行生成解决。当然,这是你在应用 Ruby on Rails 时必须做的,否则你将阻止所有可用的 worker web 处理器,无论你应用的是 puma、unicorn 还是 passenger(请不要进入 JRuby 探讨)。而后咱们须要利用常见的解决方案来做到这一点,例如 Resque、Sidekiq、SQS 等等,有很多办法能够实现这一点。

所以第二次迭代是创立一个缓冲通道,咱们能够创立一些队列,而后把 job push 到队列并将它们上传到 S3,并且因为咱们能够管制 job 队列中的最大数数量并且咱们有足够的内存来解决队列中的 job。在这个计划中,咱们认为只须要在通道队列中缓冲须要解决的 job 就能够了。

代码如下:

var Queue chan Payload

func init() {Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // 别离查看每个无效负载和队列我的项目以公布到 S3
    for _, payload := range content.Payloads {Queue <- payload // <----- 这是倡议的做法。}
    ...
}

而后为了理论出列作业并解决它们,咱们应用了相似的货色:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- 这里尽管优化了,但还不是最好的。}
    }
}

在下面的代码中,咱们用一个缓冲队列来替换有缺点的并发性,而缓冲队列只是推延了问题。咱们的同步处理器一次只将一个无效负载上传到 S3,并且因为传入申请的速率远远大于单个处理器上传到 S3 的能力,咱们的 job 缓冲通道很快达到了极限并阻止了申请处理程序的能力,队列很快就阻塞满了。

咱们只是在防止这个问题,并开始倒计时,直到咱们的零碎最终死亡。在咱们部署这个有缺点的版本后,咱们的提早率在几分钟内以恒定的速度继续减少。以下是提早率增长图:

更好的解决方案

咱们决定在应用 Go 通道时应用一种通用模式,以创立一个 2 层通道零碎,一个用于 Job 队列,另一个用于管制同时在 Job 队列上操作的 Worker 的数量。

这个想法是将上传到 S3 的数据并行化到某种程度上可继续的速度,这种速度既不会减弱机器也不会开始从 S3 生成连贯谬误。所以咱们抉择创立 Job/Worker 模式。对于那些相熟 Java、C# 等的人来说,能够将其视为 Golang 应用通道实现 Worker 线程池的形式。

代码如下:

var (MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job 示意要运行的作业
type Job struct {Payload Payload}

// 咱们能够在 Job 队列上发送工作申请的缓冲通道。var JobQueue chan Job

// Worker 代表执行作业的 Worker。type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start 办法为 Worker 启动循环监听。监听退出信号以防咱们须要进行它。func (w Worker) Start() {go func() {
        for {
            // 将以后 woker 注册到工作队列中。w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // 接管 work 申请。if err := job.Payload.UploadToS3(); err != nil {log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // 接管一个退出的信号。return
            }
        }
    }()}

// 将退出信号传递给 Worker 过程以进行解决清理。func (w Worker) Stop() {go func() {w.quit <- true}()}

咱们曾经批改了咱们的 Web 申请处理程序,以创立一个带有无效负载的 Job 构造实例,并将其发送到 JobQueue 通道以供 Worker 提取。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // 将 body 读入字符串进行 json 解码
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // 别离查看每个无效负载和队列我的项目以公布到 S3
    for _, payload := range content.Payloads {

        // 创立一个无效负载的 job
        work := Job{Payload: payload}

        // 将 work push 到队列。JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

在咱们的 Web 服务器初始化期间,咱们创立一个 Dispatcher 调度器并调用 Run() 来创立 Woker 工作池并开始侦听将呈现在 Job 队列中的 Job。

dispatcher := NewDispatcher(MaxWorker) 
dispatcher.Run()

上面是咱们的调度程序实现的代码:

type Dispatcher struct {
    // 通过调度器注册一个 Worker 通道池
    WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // 启动指定数量的 Worker
    for i := 0; i < d.maxWorkers; i++ {worker := NewWorker(d.pool)
        worker.Start()}

    go d.dispatch()}

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            // 接管一个 job 申请
            go func(job Job) {
                // 尝试获取可用的 worker job 通道
                // 这将阻塞 worker 直到闲暇
                jobChannel := <-d.WorkerPool

                // 调度一个 job 到 worker job 通道
                jobChannel <- job
            }(job)
        }
    }
}

请留神,咱们提供了要实例化并增加到咱们的 Worker 池中的最大 worker 数量。因为咱们在这个我的项目中应用了 Amazon Elasticbeanstalk 和 dockerized Go 环境,因而咱们从环境变量中读取这些值。这样咱们就能够管制 Job 队列的数量和最大大小,因而咱们能够疾速调整这些值而无需重新部署集群。

var (MaxWorker = os.Getenv("MAX_WORKERS")
  MaxQueue  = os.Getenv("MAX_QUEUE")
)

在咱们部署它之后,咱们立刻看到咱们所有的提早率都降落到极低的提早,并且咱们解决申请的能力急剧回升。以下是流量截图:

在咱们的弹性负载均衡器齐全预热几分钟后,咱们看到咱们的 ElasticBeanstalk 应用程序每分钟解决近 100 万个申请。咱们通常在早上有几个小时的流量会飙升至每分钟超过一百万。

一旦咱们部署了新代码,服务器数量就从 100 台服务器大幅降落到大概 20 台服务器。以下是服务器数量变动截图:

在正确配置集群和主动缩放设置后,咱们可能将其进一步升高到仅 4x EC2 c4.Large 实例,并且如果 CPU 使用率超过 90% 继续 5 天,Elastic Auto-Scaling 将生成一个新实例 分钟值。以下是截图:

总结

能够看出利用 Elasticbeanstalk 主动缩放的弱小性能以及 Golang 提供的开箱即用的高效和简略的并发办法,就能够构建出一个高性能的处理程序。


转载请注明起源:https://janrs.com/9yaq

正文完
 0