go http请求转发

1.说明

  • 日常开发中会遇到需要将请求转发到其它服务器的需求:

    • 1.如果是前端进行转发,需要解决跨域的问题;
    • 2.后端转发到目标服务器,并返回数据到client;

我们只讨论后端如何处理转发。

2. 原理

  • 转发需要对数据流解决的问题:

    • 1.向目标服务器发送请求,并获取数据
    • 2.将数据返回到client
    • 3.对于client来整个过程透明,不被感知到请求被转发

3. 方案

  • 解决方案:

    • 1.服务端使用tcp连接模拟http请求
    • 2.使用标准库提供的设置项进行代理
  • 实现

    • 1.tcp模拟转发

      • 先和目标服务器建立tcp连接
      • 开启goroutinue,然后将请求数据发送到目标服务器,同时将接受到的数据进行回写
      • 连接采用连接池的方式进行处理,代码较为简单,不作过多赘述
      • 此方法因为完全屏蔽了http细节,所以有数据读取超时问题,其实就是不知道数据流是否读写结束。在http协议中一般通过Content-Length可知数据体长度,但是在未设置此头部信息时(流传输),就很难确定数据是否读写结束。看到有文提出通过判断最后的“\r\n\r\n\r\n\r\n”来确定结束,但是这并不严谨。在没有绝对的协商约束下,会不经意的截断body中内容,导致数据丢失。
      • 可采用设置读写超时来结束读写操作阻塞的问题,但是时间设置长短可能影响并发性能。(SetDeadLine,SetReadDeadLine,SetWriteDeadLine)
       package proxy  import (     "io"     "log"     "net"     "sync"     "sync/atomic"     "time" )  /** 封装代理服务,对于http连接反馈又超时处理,注意超时问题 */  var pool = make(chan net.Conn, 100)  type conn struct {     conn  net.Conn     wg    *sync.WaitGroup     lock  sync.Mutex     state int32 }  const (     maybeValid = iota     isValid     isInvalid     isInPool     isClosed )  type timeoutErr interface {     Timeout() bool }  func isTimeoutError(err error) bool {     timeoutErr, _ := err.(timeoutErr)     if timeoutErr == nil {         return false     }     return timeoutErr.Timeout() }  func (cn *conn) Read(b []byte) (n int, err error) {     n, err = cn.conn.Read(b)     if err != nil {         if !isTimeoutError(err) {             atomic.StoreInt32(&cn.state, isInvalid)         }     } else {         atomic.StoreInt32(&cn.state, isValid)     }     return }  func (cn *conn) Write(b []byte) (n int, err error) {     n, err = cn.conn.Write(b)     if err != nil {         if !isTimeoutError(err) {             atomic.StoreInt32(&cn.state, isInvalid)         }     } else {         atomic.StoreInt32(&cn.state, isValid)     }     return }  func (cn *conn) Close() error {     atomic.StoreInt32(&cn.state, isClosed)     return cn.conn.Close() }  func getConn() (*conn, error) {     var cn net.Conn     var err error     select {     case cn = <-pool:         //service.Logger.Info().Msg("get conn from pool")     default:         cn, err = net.Dial("tcp", "127.0.0.1:8090")         //service.Logger.Info().Msg("get conn by new")     }     if err != nil {         service.Logger.Error().Err(err).Msgf("dial to dest %s failed ", "127.0.0.1:8090")         return nil, err     }     return &conn{         conn:  cn,         wg:    &sync.WaitGroup{},         state: maybeValid,     }, nil }  func release(cn *conn) error {     state := atomic.LoadInt32(&cn.state)     switch state {     case isInPool, isClosed:         return nil     case isInvalid:         return cn.conn.Close()     }     cn.lock.Lock()     defer cn.lock.Unlock()     select {     case pool <- cn.conn:         //service.Logger.Info().Msgf("%d  %d put conn to pool",os.Getpid(),os.Getppid())         atomic.StoreInt32(&cn.state, isInPool)         return nil     default:         return cn.Close()     } }  func Handle(conn net.Conn) {     if conn == nil {         return     }     defer conn.Close()     conn.SetDeadline(time.Now().Add(time.Millisecond * 100))  //设置读写超时     client, err := getConn()     if err != nil {         return     }      defer release(client)     client.conn.SetDeadline(time.Now().Add(time.Millisecond * 100)) //设置读写超时      client.wg.Add(2)     //进行转发     go func() {         if _, err := io.Copy(client, conn); err != nil {             service.Logger.Err(err).Msg("copy data to svr")         }         client.wg.Done()     }()     go func() {         if _, err := io.Copy(conn, client); err != nil {             service.Logger.Err(err).Msg("copy data to conn")         }         client.wg.Done()     }()      client.wg.Wait() }  func StartProxySvr() <-chan struct{} {     exit := make(chan struct{}, 1)     proxy_server, err := net.Listen("tcp", "8889")     if err != nil {         log.Printf("proxy server listen error: %v\n", err)         exit <- struct{}{}         return exit     }      for {         conn, err := proxy_server.Accept()         if err != nil {             log.Printf("proxy server accept error: %v\n", err)             exit <- struct{}{}             return exit         }         go Handle(conn)     } }
    • 2.使用原生提供的http代理

      • http.Client中的Transport可用来设置目标服务的addr
      • 详细内容请看源码说明,下文提供一个中间件样例来进行请求转发

         type Client struct {     // Transport specifies the mechanism by which individual     // HTTP requests are made.     // If nil, DefaultTransport is used.     Transport RoundTripper      // CheckRedirect specifies the policy for handling redirects.     // If CheckRedirect is not nil, the client calls it before     // following an HTTP redirect. The arguments req and via are     // the upcoming request and the requests made already, oldest     // first. If CheckRedirect returns an error, the Client's Get     // method returns both the previous Response (with its Body     // closed) and CheckRedirect's error (wrapped in a url.Error)     // instead of issuing the Request req.     // As a special case, if CheckRedirect returns ErrUseLastResponse,     // then the most recent response is returned with its body     // unclosed, along with a nil error.     //     // If CheckRedirect is nil, the Client uses its default policy,     // which is to stop after 10 consecutive requests.     CheckRedirect func(req *Request, via []*Request) error      // Jar specifies the cookie jar.     //     // The Jar is used to insert relevant cookies into every     // outbound Request and is updated with the cookie values     // of every inbound Response. The Jar is consulted for every     // redirect that the Client follows.     //     // If Jar is nil, cookies are only sent if they are explicitly     // set on the Request.     Jar CookieJar      // Timeout specifies a time limit for requests made by this     // Client. The timeout includes connection time, any     // redirects, and reading the response body. The timer remains     // running after Get, Head, Post, or Do return and will     // interrupt reading of the Response.Body.     //     // A Timeout of zero means no timeout.     //     // The Client cancels requests to the underlying Transport     // as if the Request's Context ended.     //     // For compatibility, the Client will also use the deprecated     // CancelRequest method on Transport if found. New     // RoundTripper implementations should use the Request's Context     // for cancelation instead of implementing CancelRequest.     Timeout time.Duration }  //中间件样例     http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {             proxy := func(_ *http.Request) (*url.URL, error) {                 return url.Parse("target ip:port")//127.0.0.1:8099             }             transport := &http.Transport{                 Proxy: proxy,                 DialContext: (&net.Dialer{                     Timeout:   30 * time.Second,                     KeepAlive: 30 * time.Second,                     DualStack: true,                 }).DialContext,                 MaxIdleConns:          100,                 IdleConnTimeout:       90 * time.Second,                 TLSHandshakeTimeout:   10 * time.Second,                 ExpectContinueTimeout: 1 * time.Second,                 MaxIdleConnsPerHost:   100,             }              client := &http.Client{Transport: transport}             url := "http://" + r.RemoteAddr + r.RequestURI             req, err := http.NewRequest(r.Method, url, r.Body)             //注: 设置Request头部信息             for k, v := range r.Header {                 for _, vv := range v {                     req.Header.Add(k, vv)                 }             }              resp, err := client.Do(req)             if err != nil {                 return             }             defer resp.Body.Close()             //注: 设置Response头部信息             for k, v := range resp.Header {                 for _, vv := range v {                     w.Header().Add(k, vv)                 }             }             data, _ := ioutil.ReadAll(resp.Body)             w.Write(data)      })

结束

本文是个人对工作中遇到的问题的总结,不够全面和深入还请多多指教。谢谢!