共计 2966 个字符,预计需要花费 8 分钟才能阅读完成。
微信公众号:LinuGo,欢送关注
Golang 规范库搭建的 http 服务端会为每一个申请创立一个协程去解决,尽管每个协程占有的栈空间很小,然而如果万一来个数百万千万的申请(当然,这种可能性有点极其),服务端只能对每一条申请乖乖创立一个协程,这时候,该 go 过程就存在大量的 goroutine,占用服务器资源不说,还会增大 gc 压力。这时候就想给该机制加一个限度,搞一个协程池限度一下最大解决申请的协程数量。
浏览一下规范库该局部的源码实现
func (srv *Server) Serve(l net.Listener) error {
.....
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
// 期待建设连贯,没有申请则会阻塞住
rw, err := l.Accept()
if err != nil {......}
connCtx := ctx
if cc := srv.ConnContext; cc != nil {......}
......
// 开启协程解决申请,次要须要革新的就在此处
go c.serve(connCtx)
}
}
次要须要针对创立协程加一个限度条件,如果小于协程池规定的数量就容许创立,否则期待协程池有闲暇位再创立。
大抵思路
总体应用 生产者消费者模式。应用两个有缓冲区的 channel 来实现协程的并发管制,一个 sigChannel 通过缓冲空间限度最大的协程数量,另一个 jobChannel 则用于传递申请的数据(包含申请函数以及参数),该 jobChannel 对于是否缓冲没有要求。
流程
(1)首先当申请到来之后,往 sigChannel 中写入标记位数据,如果此时有闲暇地位,则不会阻塞在此处;
(2)之后往 jobChannel 中写入要执行的函数以及参数;
(3)后盾监听 jobChannel 的函数 worker(该函数要源源不断读取管道数据)则会取出管道中的数据;
(4)worker 创立 goroutine 执行申请函数;
(5)该申请函数执行实现后,goroutine 再去取出 sigChannel 管道中的标记数据,腾出来地位;
注:如果开始时候 sigChannel 写数据写入不了,则阐明该池子满了,则须要阻塞期待。这样就实现了应用 sigChannel 管制并发量的性能。
代码实现
接下来应用代码实现这种思维
1、首先把 net/http 包中的代码给保留一份,避免被搞坏。间接在目录下搞了一个 git 仓库,先把源码 commit 一次,再搞一个分支本人瞎搞着玩,。在 net/http 下建了一个放协程池函数的文件夹,创立一个 go 文件。
2、首先定义两个 channel,一个用来寄存信号,一个寄存函数以及参数,联合到 http 解决这里
type Info struct {// 函数名称,对应 http 中 c.serve()函数
ParamFunc func(ctx context.Context)
// 函数的参数,对应 c.serve()的 connCtx 参数
Param context.Context
}
type Task struct {
// 用于传递函数以及参数的管道,对应 jobChannel
taskLet chan Info
// 用于传递信号量的管道
taskCmp chan int64
}
type Pool struct {
// 两个管道对应的构造体
tasks *Task
// 协程池容量
taskNum int64
}
3、创立一个协程池对象,也就是初始化这两个管道
func NewPool(n int64) *Pool {taskc := make(chan Info,n)
workc := make(chan int64,n)
return &Pool{
tasks: &Task{
taskLet: taskc,
taskCmp: workc,
},
taskNum: n,
}
}
4、创立一个 put 函数,用于往两个 channel 中塞数据,即生产者
func (p *Pool) Put(a Info) {
// 在 sigChannel 中塞数据,如果阻塞阐明没有闲暇
p.tasks.taskCmp <- 1
// 在 jobChannel 中塞数据
p.tasks.taskLet <- a
}
5、创立一个 run 函数,用于监听管道并取出数据,即消费者
func (p *Pool) Run() {
// 继续监听 jobChannel 管道,只有有数据监听到则阐明曾经有闲暇位了,// 须要创立 goroutine 执行传来的函数以及参数
for {
select {
case let := <- p.tasks.taskLet:
go p.Work(let)
}
}
}
func (p *Pool) Work(f Info) {
// 执行传入的函数
f.ParamFunc(f.Param)
// 执行完函数后把 sigChannel 中标记位取出
<- p.tasks.taskCmp
}
6、批改源码,须要批改的代码加到 server.go 中
func (srv *Server) Serve(l net.Listener) error {
.....
// 初始化一个连接池
po := currencyctl.NewPool(srv.CorrencyNum)
// 异步开启这个池子,否则会阻塞
go po.Run()
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
// 期待建设连贯,没有申请则会阻塞住
rw, err := l.Accept()
if err != nil {......}
connCtx := ctx
if cc := srv.ConnContext; cc != nil {......}
......
//go c.serve(connCtx)
// 革新成协程池
po.Put(currencyctl.Info{ParamFunc:c.serve,Param:connCtx})
}
}
我将解决并发数量的参数放到了 server 构造体中,通过 http.ListenAndServe()办法传递并在下一次赋值。
测试阶段
接下来跑一个测试用例:
测试代码很简略,如下:
package main
import (
"fmt"
"net/http"
_"net/http/pprof"
"time"
)
func main() {go func() {// 应用 pprof 跟踪
http.ListenAndServe(":6060",nil,10)
}()
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {fmt.Println("收到申请。。。")
time.Sleep(time.Second*1)
writer.Write([]byte("hello http"))
})
http.ListenAndServe(":8000", nil,100)// 限度最大并发量 100
}
启动我的项目,做一个压力测试(这里我是用了 go-stress-testing 工具):
应用并发申请量为 1000 时候,查看 pprof 工具,查看零碎协程数,管制在了 100 左右。
设置协程池协程量为 200 时候,应用 1000 并发申请,看到协程量管制在 200
通过验证,该协程池在 net/http 规范库上的利用根本胜利了,然而只是测试了一个简略的接口,没有通过简单的业务验证,可能存在好多未知问题。
所以,又乖乖 git checkout 切到了原始分支。