乐趣区

关于udp:线上-udp-客户端请求服务端客户端句柄泄漏问题

本题别离从如下三个方面来分享:

  • 问题形容

<!—->

  • 自定义连接池的编写

<!—->

  • common_pool 的应用

问题形容

线上有一个业务,某个通服务告诉 udp 客户端通过向 udp 服务端(某个硬件设施)发送 udp 包来进行用户上线操作

当同时有大量的申请打到 udp 服务端的时候,udp 服务端的回包可能会在网络环境中丢包,(udp 是不牢靠的)导致 udp 客户端不能及时的收到 udp 服务端的回包,在短时间内,udp 客户端的句柄又没有失去复用或者开释,没有收到回包的句柄就始终阻塞在那里,最终导致句柄透露

那么能够如何解决呢?

  • 增大客户端的句柄数

<!—->

  • 应用连接池并且在读取服务端响应数据时加上超时工夫

显然,第一个解决形式治标不治本,改大句柄数,当申请质变大的时候,依然会呈现句柄透露的状况

第二种形式绝对靠谱很多

  • 首先,咱们将发送 udp 包给服务端后,期待读取服务端的回包时,设置超时工夫,超时后读取失败,开释或者偿还句柄

<!—->

  • 保护一个外部的 连接池 ,缩小每一次创立句柄耗费的资源和工夫,应用的时候从池子外面获取句柄, 应用结束之后再偿还句柄

自定义连接池的编写 customer_pool

那么对于连接池,咱们实际上是能够本人来进行造轮子的,仅用于学习,理论应用的话,天然还是会去应用通过公众考研过的公共开源库,咱们能够来根本的剖析和钻研一下一个连接池须要有些什么?

  • 创立池子,敞开池子,池子的敞开状态

<!—->

  • 从池子中获取连贯,偿还连贯,销毁以后连贯

<!—->

  • 池子中能包容的最大连接数,最小连接数,以后连接数

<!—->

  • 依据以后理论的连接数来对池子进行扩容和缩容

<!—->

  • 池子中创立连贯的函数具体实现

当然,咱们本人来领会一下连接池以及演示上述 udp 的 demo,咱们仅实现如下几个简略性能作为演示

  • 创立池子,池子的敞开状态

<!—->

  • 从池子中获取连贯,偿还连贯

<!—->

  • 池子中能包容的最大连接数,最小连接数,以后连接数

<!—->

  • 池子中创立连贯的函数具体实现

对于池子中具体链接的销毁,池子的敞开,池子的扩缩容,以及其余高级应用,xdm 能够进行扩大

customer_pool demo

自定义连接池,实际上咱们是应用 chan 通道来进行实现,具体源码能够查看:https://github.com/qingconglaixueit/customer_pool/blob/master/customer_pool/pool.go

  1. 定义连接池 MyConnPool 数据结构,和创立连接池

    • MyConnPool 构造中的 sync.Mutex 次要是用于管制多协程中 非 pool 成员的其余成员的互斥,咱们晓得 chan 外部是有锁进行管制的
  1. 获取对象的具体实现

  • 从池子中获取对象,如果获取不到则默认查看以后的池子状态是否能够创立新的连贯

<!—->

  • 若能够,则间接创立连贯,返回对象

<!—->

  • 此处在进行池子成员的变动时,须要加锁进行管制
func (conn *MyConnPool) GetObject() (interface{}, error) {return conn.getObject()
}
func (conn *MyConnPool) getObject() (interface{}, error) {
   if conn.isClosed {return nil, errors.New("pool is closed")
   }
   // 从通道外面读,如果通道外面没有则新建一个
   select {
   case object := <-conn.pool:
      return object, nil
   default:

   }
   // 校验以后的连接数是否大于最大连接数,若是,则还是须要从 pool 中取
   // 此时应用 mutex 次要是为了锁 MyConnPool 的非通道的其余成员
   conn.Lock()
   if conn.currentConn >= conn.maxConn {
      object := <-conn.pool
      conn.Unlock()
      return object, nil
   }
   // 逻辑走到此处须要新建对象放到 pool 中
   object, err := conn.connFun()
   if err != nil {conn.Unlock()
      return nil, fmt.Errorf("create conn error : %+v", err)
   }
   // 以后 pool 已有连接数 +1
   conn.currentConn++
   conn.Unlock()

   return object, nil
}
  1. 开释对象的具体实现

  • 应用结束对象之后,须要偿还

<!—->

  • 具体偿还操作,则是将具体的连贯再丢回通道外面即可
func (conn *MyConnPool) ReturnObject(object interface{}) error {return conn.returnObject(object)
}
func (conn *MyConnPool) returnObject(object interface{}) error {
   if conn.isClosed {return errors.New("pool is closed")
   }
   conn.pool <- object
   return nil
}
  1. 具体的利用

简略写一个 udp 服务端

  • 能够查看源码地址:https://github.com/qingconglaixueit/use_common_pool/blob/master/server/main.go

<!—->

  • 代码正文局部用于测试超时的成果

应用咱们上述的自定义连接池编写客户端的 demo

具体源码地址:https://github.com/qingconglaixueit/customer_pool/blob/master/main.go

  1. 定义咱们有 udp 连贯的对象
  • 定义 PoolTest 对象,并简略的将 udp 连贯退出到成员中

<!—->

  • 编写创立 udp 连贯的函数 connectUdp

<!—->

  • 初始化连接池,设置池子最大 3 个连贯,最小 1 个连贯,理论创立连贯函数为 connectUdp()
type PoolTest struct {Conn *net.UDPConn}

var myPool *customer_pool.MyConnPool

func init() {myPool = customer_pool.NewMyConnPool(3, 1, func() (interface{}, error) {return connectUdp()
   })
   if myPool == nil {log.Panicln("NewMyConnPool error")
      return
   }
   log.Println("myPool ==", myPool)
}
// 创立连贯函数
func connectUdp() (*PoolTest, error) {
   // 创立一个 udp 句柄
   log.Println(">>>>> 创立一个 udp 句柄 ...")
   // 连贯服务器
   conn, err := net.DialUDP("udp", nil, &net.UDPAddr{IP:   net.IPv4(127, 0, 0, 1),
      Port: 9998,
   })

   if err != nil {log.Println("Connect to udp server failed,err:", err)
      return nil, err
   }
   log.Printf("<<<<<< new udp connect %+v", conn)
   return &PoolTest{Conn: conn}, nil
}
  1. 获取到连贯对象之后,咱们给 udp server 写入数据
  • GetObject() 获取具体的对象,获取到连贯

<!—->

  • SendMsg 进行具体音讯的发送

<!—->

  • ReturnObject() 将具体的对象偿还到池子中

<!—->

  • 其中代码被正文掉的局部,是使劲验证超时成果的,感兴趣的 xdm 能够将代码关上尝试一波
  1. 成果展现

最初补充上咱们的 main 函数,就能够进行测试验证了

func main() {
   for i := 0; i < 10; i++ {msg := fmt.Sprintf("send udp data is %d", i)
      go SendMsg(msg)
   }

   time.Sleep(2 * time.Second)
}

启动咱们的 udp 客户端,和 udp 服务端,咱们能够查看到如下成果

客户端成果:

同时启了 10 个协程,每一个协程都会去池子外面拿连贯对象,如果池子有现成的则间接应用,如果没有现成的,那么就新建一个连贯,如果以后池子已创立连贯曾经等于最大值,那么只能等着池子中有连贯偿还的时候再进行调配

  • 总的来说,以后 demo 只会创立 3 个 udp 连贯句柄

服务端成果:

能够看到服务端收到的 10 个申请,实际上只有 3 个句柄在屡次申请

再一次印证了客户端实际上的确只创立了 3 次 udp 句柄

上述是自定义简略连接池的根本 demo,对于 udp 超时解决的内容就不做演示,感兴趣的 xdm 能够下载源码来进行查看成果

https://github.com/qingconglaixueit/customer_pool

应用 go-commons-pool

当然,咱们大抵晓得连接池根本是都有哪些组成部分,能够如何玩之后,咱们来利用一个 golang 通用的连接池 go-commons-pool, 源码地址为:https://github.com/jolestar/go-commons-pool

use_common_pool demo

利用 go-commons-pool 咱们的 demo 仅验证该库的通用和便捷,对于上述咱们自定义的池子,咱们应用到的 udp 波及到的代码,能够根本不必变动,间接应用 go-commons-pool 间接网上套即可

和咱们自定义池子不一样的中央

  • init 初始化池子的办法和配置不一样

<!—->

  • SendMsg 办法,实现时应用的池子句柄不一样

<!—->

  • 当然,go-commons-pool 会好太多

理论 demo 为:

  • 其余截图上未体现的 connectUdp(),(this *PoolTest) SendMsg(data []byte),和上述自定义池子实现形式完全一致

此处初始化池子配置,咱们也是一样传入具体池子最大的对象数,应用池子的默认配置,传入咱们创立连贯的具体函数 connectUdp()

对于到 main 函数 和 SendMsg 函数,咱们的用法和写法根本完全一致

天然,成果也是一样的

当然对于 go-commons-pool 池子还有其余很多有意思的货色,感兴趣的能够来一起浏览以下他的源码,如下为当 前池子的根本数据结构和创立池子的代码,咱们能够依据这个构造来追以下代码

代码目录如下:

./pool.go

  • 根本数据结构

<!—->

  • 创立池子代码

至此,本文完结

文中波及到的源码地址:

  • customer_pool https://github.com/qingconglaixueit/customer_pool

<!—->

  • use_common_pool https://github.com/qingconglaixueit/use_common_pool

<!—->

  • go-commons-pool https://github.com/jolestar/go-commons-pool

感激浏览,欢送交换,点个赞,关注一波 再走吧

欢送点赞,关注,珍藏

敌人们,你的反对和激励,是我保持分享,提高质量的能源

好了,本次就到这里

技术是凋谢的,咱们的心态,更应是凋谢的。拥抱变动,背阴而生,致力向前行。

我是 阿兵云原生,欢送点赞关注珍藏,下次见~

退出移动版