微信公众号:LinuGo,欢送关注

Golang规范库搭建的http服务端会为每一个申请创立一个协程去解决,尽管每个协程占有的栈空间很小,然而如果万一来个数百万千万的申请(当然,这种可能性有点极其),服务端只能对每一条申请乖乖创立一个协程,这时候,该go过程就存在大量的goroutine,占用服务器资源不说,还会增大gc压力。这时候就想给该机制加一个限度,搞一个协程池限度一下最大解决申请的协程数量。

浏览一下规范库该局部的源码实现

func (srv *Server) Serve(l net.Listener) error {  .....  ctx := context.WithValue(baseCtx, ServerContextKey, srv)  for {    //期待建设连贯,没有申请则会阻塞住    rw, err := l.Accept()    if err != nil {......}    connCtx := ctx    if cc := srv.ConnContext; cc != nil {......}    ......    //开启协程解决申请,次要须要革新的就在此处    go c.serve(connCtx)  }}

次要须要针对创立协程加一个限度条件,如果小于协程池规定的数量就容许创立,否则期待协程池有闲暇位再创立。


大抵思路

总体应用生产者消费者模式。应用两个有缓冲区的channel来实现协程的并发管制,一个sigChannel通过缓冲空间限度最大的协程数量,另一个jobChannel则用于传递申请的数据(包含申请函数以及参数),该jobChannel对于是否缓冲没有要求。

流程

(1)首先当申请到来之后,往sigChannel中写入标记位数据,如果此时有闲暇地位,则不会阻塞在此处;

(2)之后往jobChannel中写入要执行的函数以及参数;

(3)后盾监听jobChannel的函数worker(该函数要源源不断读取管道数据)则会取出管道中的数据;

(4)worker创立goroutine执行申请函数;

(5)该申请函数执行实现后,goroutine再去取出sigChannel管道中的标记数据,腾出来地位;

注:如果开始时候sigChannel写数据写入不了,则阐明该池子满了,则须要阻塞期待。这样就实现了应用sigChannel管制并发量的性能。


代码实现

接下来应用代码实现这种思维

1、首先把net/http包中的代码给保留一份,避免被搞坏。间接在目录下搞了一个git仓库,先把源码commit一次,再搞一个分支本人瞎搞着玩,。在net/http下建了一个放协程池函数的文件夹,创立一个go文件。

2、首先定义两个channel,一个用来寄存信号,一个寄存函数以及参数,联合到http解决这里

type Info struct {  //函数名称,对应http中c.serve()函数  ParamFunc func(ctx context.Context)  //函数的参数,对应c.serve()的connCtx参数  Param context.Context}type Task struct {  //用于传递函数以及参数的管道,对应jobChannel  taskLet chan Info  //用于传递信号量的管道  taskCmp chan int64}type Pool struct {  //两个管道对应的构造体  tasks *Task   //协程池容量  taskNum int64 }

3、创立一个协程池对象,也就是初始化这两个管道

func NewPool(n int64) *Pool {  taskc := make(chan Info,n)  workc := make(chan int64,n)  return &Pool{    tasks: &Task{      taskLet: taskc,      taskCmp: workc,    },    taskNum: n,  }}

4、创立一个put函数,用于往两个channel中塞数据,即生产者

func (p *Pool) Put(a Info)  {  //在sigChannel中塞数据,如果阻塞阐明没有闲暇  p.tasks.taskCmp <- 1  //在jobChannel中塞数据    p.tasks.taskLet <- a}

5、创立一个run函数,用于监听管道并取出数据,即消费者

func (p *Pool) Run()  { //继续监听jobChannel管道,只有有数据监听到则阐明曾经有闲暇位了, //须要创立goroutine执行传来的函数以及参数  for {    select {    case let := <- p.tasks.taskLet:      go p.Work(let)    }  }}func (p *Pool) Work(f Info)  {  //执行传入的函数  f.ParamFunc(f.Param)  //执行完函数后把sigChannel中标记位取出  <- p.tasks.taskCmp}

6、批改源码,须要批改的代码加到server.go中

func (srv *Server) Serve(l net.Listener) error {  .....  //初始化一个连接池  po := currencyctl.NewPool(srv.CorrencyNum)  //异步开启这个池子,否则会阻塞  go po.Run()  ctx := context.WithValue(baseCtx, ServerContextKey, srv)  for {    //期待建设连贯,没有申请则会阻塞住    rw, err := l.Accept()    if err != nil {......}    connCtx := ctx    if cc := srv.ConnContext; cc != nil {......}    ......    //go c.serve(connCtx)    //革新成协程池    po.Put(currencyctl.Info{ParamFunc:c.serve,Param:connCtx})  }}

我将解决并发数量的参数放到了server构造体中,通过http.ListenAndServe()办法传递并在下一次赋值。


测试阶段

接下来跑一个测试用例:

测试代码很简略,如下:

package mainimport (  "fmt"  "net/http"  _"net/http/pprof"  "time")func main() {  go func() {//应用pprof跟踪    http.ListenAndServe(":6060",nil,10)  }()  http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {    fmt.Println("收到申请。。。")    time.Sleep(time.Second*1)    writer.Write([]byte("hello http"))  })  http.ListenAndServe(":8000", nil,100)//限度最大并发量100}

启动我的项目,做一个压力测试(这里我是用了go-stress-testing工具):

应用并发申请量为1000时候,查看pprof工具,查看零碎协程数,管制在了100左右。

设置协程池协程量为200时候,应用1000并发申请,看到协程量管制在200

通过验证,该协程池在net/http规范库上的利用根本胜利了,然而只是测试了一个简略的接口,没有通过简单的业务验证,可能存在好多未知问题。

所以,又乖乖git checkout切到了原始分支。