关于golang:译如何优雅的关闭channel

35次阅读

共计 9902 个字符,预计需要花费 25 分钟才能阅读完成。

几天前,我写了一篇解释 go 通道准则。那篇文章在 reddit 和 HN 上失去了很多反对。然而 go 通道设计细节失去了许多批评。

我总结批评内容如下:

  • 不批改 channel 状态的状况下,没有对立和简略的形式去判断 channel 是否敞开。
  • 敞开曾经敞开的 channel 会 panic, 所以敞开 channel 是十分危险的如果不晓得 channel 是否敞开。
  • 向敞开的 channel 发送值会 panic, 所以发送值到 channel 是十分危险的如果不晓得 channel 是否敞开。

这些评论看起来是正当的(实际上不然),是的,并没有内置的函数去判断 channel 是否敞开。

如果你能确定不再 (未来) 往 channel 发送值,的确有简略的办法去判断 channel 是否敞开,为了不便了解,请看上面的列子:

package main

import "fmt"

type T int

func IsClosed(ch <-chan T) bool {
    select {
    case <-ch:
        return true
    default:
    }

    return false
}

func main() {c := make(chan T)
    fmt.Println(IsClosed(c)) // false
    close(c)
    fmt.Println(IsClosed(c)) // true
}

就像下面提到的,没有对立的形式去查看 channel 是否敞开。

实际上,即便有简略的内置办法 closed 去判断 channel 是否敞开,就像内置函数 len 判断 channel 元素个数,价值无限。起因是被查看的 channel 可能会在函数调用并返回后状态曾经扭转,所以返回的值并不能反映最新的 channel 状态。不过如果调用 closed(ch)返回 true 能够进行向 channel 发送值,然而如果调用 closed(ch),返回 false,则敞开通道或持续向通道发送值是不平安的。

channel 的敞开准则

最根本的准则是不要要接受者端敞开 channel,也不要在有多个并发发送者的状况下敞开 channel。换句话说,咱们只能在发送端敞开 channel,并且是惟一的发送者。

(上面咱们将称下面的准则为敞开准则)

当然,并没有对立的准则去敞开 channel。对立的准则便是不要敞开(或者发送值)曾经敞开的 channel。如果咱们能保障没有协程发送或者敞开一个没有敞开且不为 nil 的 channel,那咱们能够平安敞开这个 channel。然而,对于接受者或或者多个发送者中作出这样的保障往往须要很多致力,而且常常使代码更为简单。相同,听从 channel 敞开准则绝对比较简单。

粗犷的敞开 channel 的解决方案

如果你无论如也想从接收端或者多个发送者中一个敞开 channel, 你能够应用谬误复原机制阻止恐慌的可能来防止程序宕机。上面是一个示例(假如 channel 的类型是 T):

func SafeClose(ch chan T) (justClosed bool) {defer func() {if recover() != nil {
            // The return result can be altered
            // in a defer function call.
            justClosed = false
        }
    }()

    // assume ch != nil here.
    close(ch)   // panic if ch is closed
    return true // <=> justClosed = true; return
}

这种解决方案,显然违反了敞开准则。

同样的问题是向潜在的已敞开的 channel 发送值。

func SafeSend(ch chan T, value T) (closed bool) {defer func() {if recover() != nil {closed = true}
    }()

    ch <- value  // panic if ch is closed
    return false // <=> closed = false; return
}

不仅突破了敞开准则,在运行过程中可能产生数据竞争。

礼貌的敞开 channel

许多人喜爱用 async.once 敞开 channel:

type MyChannel struct {
    C    chan T
    once sync.Once
}

func NewMyChannel() *MyChannel {return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {mc.once.Do(func() {close(mc.C)
    })
}

当然,你也能够用 async.Mutex 防止敞开 channel 屡次。

ype MyChannel struct {
    C      chan T
    closed bool
    mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {mc.mutex.Lock()
    defer mc.mutex.Unlock()
    if !mc.closed {close(mc.C)
        mc.closed = true
    }
}

func (mc *MyChannel) IsClosed() bool {mc.mutex.Lock()
    defer mc.mutex.Unlock()
    return mc.closed
}

这种形式可能比拟礼貌,然而并不能防止数据竞争。目前,Go 运行机制并不能保障敞开 channel 和向 channel 发送值同时执行不会产生数据竞争。如果对同一 channel 执行通道发送操作的同时调用 SafeClose 函数,则可能会产生数据竞争(只管这种数据竞争个别有害的)。

优雅的敞开 channel

上述 SafeSend 函数的毛病是,在 select 块中 case 关键字分枝上不能调用作为发送操作;另一个毛病是很多人包含我认为在下面的 SafeSend 和 SafeClose 函数中应用 panic/recover` 和 async 包是不优雅的。接下来,针对各种解状况介绍一些不应用包的纯 channel 解决方案。

(在上面的示例中,sync.WaitGroup 齐全用于示例,理论中可能并不应用)

多个接收者,一个发送者,发送者敞开 channel 示意‘没有值能够发送’

这是一个非常简单的状况,仅仅是让发送者在不想发送数据时候敞开 channel。

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const Max = 100000
    const NumReceivers = 100

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int)

    // the sender
    go func() {
        for {if value := rand.Intn(Max); value == 0 {
                // The only sender can close the
                // channel at any time safely.
                close(dataCh)
                return
            } else {dataCh <- value}
        }
    }()

    // receivers
    for i := 0; i < NumReceivers; i++ {go func() {defer wgReceivers.Done()

            // Receive values until dataCh is
            // closed and the value buffer queue
            // of dataCh becomes empty.
            for value := range dataCh {log.Println(value)
            }
        }()}

    wgReceivers.Wait()}
一个接收者,多个发送者,惟一的接收者通过敞开额定的 channel 通道示意‘请进行发送值到 channel’

这是一个比下面较简单的状况。咱们不能为阻止数据传输让接收者敞开数据 channel,这样违反了 channel 敞开的准则,然而咱们能够通过敞开额定的信号 channel 去告诉发送者进行发送值。

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const Max = 100000
    const NumSenders = 1000

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(1)

    // ...
    dataCh := make(chan int)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the receiver of channel
        // dataCh, and its receivers are the
        // senders of channel dataCh.

    // senders
    for i := 0; i < NumSenders; i++ {go func() {
            for {
                // The try-receive operation is to try
                // to exit the goroutine as early as
                // possible. For this specified example,
                // it is not essential.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first
                // branch in the second select may be
                // still not selected for some loops if
                // the send to dataCh is also unblocked.
                // But this is acceptable for this
                // example, so the first select block
                // above can be omitted.
                select {
                case <- stopCh:
                    return
                case dataCh <- rand.Intn(Max):
                }
            }
        }()}

    // the receiver
    go func() {defer wgReceivers.Done()

        for value := range dataCh {
            if value == Max-1 {
                // The receiver of channel dataCh is
                // also the sender of stopCh. It is
                // safe to close the stop channel here.
                close(stopCh)
                return
            }

            log.Println(value)
        }
    }()

    // ...
    wgReceivers.Wait()}

正如正文中说的,信号 channel 的发送者是数据接收者 channel。信号 channel 听从 channel 敞开准则,只能被他的发送者敞开。

在下面的示例中,dataCh 不会被敞开。是的,Channel 不是必须被敞开的。一个 channel 无论是否敞开,当没有一个协程援用的时候,最终就会被 GC 回收。所以这里优雅的敞开 channel 就是不敞开 channel。

M 个接收者,N 个发送者,任何一个通过中间人敞开信号 channel 示意‘让咱们完结游戏吧’

这是最简单的状况。咱们不能让任何一个发送者和接收者敞开数据 channel。咱们也不能让任何一个接收者敞开信号 channel 告诉所有的接收者和发送者完结游戏。其中任何一种形式都突破了敞开准则。然而,咱们能够引入一个两头角色去敞开信号 channel。在上面例子中有一个技巧是如何应用 try-send 操作去告诉中间人敞开信号通道。

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
    "strconv"
)

func main() {rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const Max = 100000
    const NumReceivers = 10
    const NumSenders = 1000

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the moderator goroutine shown
        // below, and its receivers are all senders
        // and receivers of dataCh.
    toStop := make(chan string, 1)
        // The channel toStop is used to notify the
        // moderator to close the additional signal
        // channel (stopCh). Its senders are any senders
        // and receivers of dataCh, and its receiver is
        // the moderator goroutine shown below.
        // It must be a buffered channel.

    var stoppedBy string

    // moderator
    go func() {
        stoppedBy = <-toStop
        close(stopCh)
    }()

    // senders
    for i := 0; i < NumSenders; i++ {go func(id string) {
            for {value := rand.Intn(Max)
                if value == 0 {
                    // Here, the try-send operation is
                    // to notify the moderator to close
                    // the additional signal channel.
                    select {
                    case toStop <- "sender#" + id:
                    default:
                    }
                    return
                }

                // The try-receive operation here is to
                // try to exit the sender goroutine as
                // early as possible. Try-receive and
                // try-send select blocks are specially
                // optimized by the standard Go
                // compiler, so they are very efficient.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first
                // branch in this select block might be
                // still not selected for some loops
                // (and for ever in theory) if the send
                // to dataCh is also non-blocking. If
                // this is unacceptable, then the above
                // try-receive operation is essential.
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }

    // receivers
    for i := 0; i < NumReceivers; i++ {go func(id string) {defer wgReceivers.Done()

            for {
                // Same as the sender goroutine, the
                // try-receive operation here is to
                // try to exit the receiver goroutine
                // as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first
                // branch in this select block might be
                // still not selected for some loops
                // (and forever in theory) if the receive
                // from dataCh is also non-blocking. If
                // this is not acceptable, then the above
                // try-receive operation is essential.
                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    if value == Max-1 {
                        // Here, the same trick is
                        // used to notify the moderator
                        // to close the additional
                        // signal channel.
                        select {
                        case toStop <- "receiver#" + id:
                        default:
                        }
                        return
                    }

                    log.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }

    // ...
    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
}

在这个例子中,仍然守住了 channel 敞开的准则。

请留神,toStop 的缓冲大小是 1。这是为了防止当中间人筹备从 toStop 接管信号之前信号失落。

咱们也能够设置 toStop 的缓冲大小是发送者和接收者之和。那样咱们就不须要 try-send 的 select 块去告诉中间人。

...
toStop := make(chan string, NumReceivers + NumSenders)
...
            value := rand.Intn(Max)
            if value == 0 {
                toStop <- "sender#" + id
                return
            }
...
                if value == Max-1 {
                    toStop <- "receiver#" + id
                    return
                }
...
“ 多个接收者,单个发送者 ” 变体的情景:敞开申请是通过第三方

有时候,敞开信号是由第三方收回。在这种状况下,咱们能够用额定的信号去告诉发送者敞开 channel。例如:

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const Max = 100000
    const NumReceivers = 100
    const NumThirdParties = 15

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int)
    closing := make(chan struct{}) // signal channel
    closed := make(chan struct{})
    
    // The stop function can be called
    // multiple times safely.
    stop := func() {
        select {case closing<-struct{}{}:
            <-closed
        case <-closed:
        }
    }
    
    // some third-party goroutines
    for i := 0; i < NumThirdParties; i++ {go func() {r := 1 + rand.Intn(3)
            time.Sleep(time.Duration(r) * time.Second)
            stop()}()}

    // the sender
    go func() {defer func() {close(closed)
            close(dataCh)
        }()

        for {
            select{
            case <-closing: return
            default:
            }

            select{
            case <-closing: return
            case dataCh <- rand.Intn(Max):
            }
        }
    }()

    // receivers
    for i := 0; i < NumReceivers; i++ {go func() {defer wgReceivers.Done()

            for value := range dataCh {log.Println(value)
            }
        }()}

    wgReceivers.Wait()}
‘ 多个发送者 ’ 状况的变体: 敞开 channel 必须通知所有的接收者曾经不再发送数据

在下面 N 发送者的状况,为了坚守 channel 敞开准则,咱们防止敞开 channel。然而,有时候,咱们必须敞开 channel 来通知所有接收者不再发送数据。在这种状况下,咱们能够通过引入两头 channel,将 N -sender 情景转化为 One-sender 情景。两头 channel 只有一个发送者,所以咱们能够通过敞开这个 channel 来代替敞开原始数据 channel。

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
    "strconv"
)

func main() {rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const Max = 1000000
    const NumReceivers = 10
    const NumSenders = 1000
    const NumThirdParties = 15

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int)     // will be closed
    middleCh := make(chan int)   // will never be closed
    closing := make(chan string) // signal channel
    closed := make(chan struct{})

    var stoppedBy string

    // The stop function can be called
    // multiple times safely.
    stop := func(by string) {
        select {
        case closing <- by:
            <-closed
        case <-closed:
        }
    }
    
    // the middle layer
    go func() {exit := func(v int, needSend bool) {close(closed)
            if needSend {dataCh <- v}
            close(dataCh)
        }

        for {
            select {
            case stoppedBy = <-closing:
                exit(0, false)
                return
            case v := <- middleCh:
                select {
                case stoppedBy = <-closing:
                    exit(v, true)
                    return
                case dataCh <- v:
                }
            }
        }
    }()
    
    // some third-party goroutines
    for i := 0; i < NumThirdParties; i++ {go func(id string) {r := 1 + rand.Intn(3)
            time.Sleep(time.Duration(r) * time.Second)
            stop("3rd-party#" + id)
        }(strconv.Itoa(i))
    }

    // senders
    for i := 0; i < NumSenders; i++ {go func(id string) {
            for {value := rand.Intn(Max)
                if value == 0 {stop("sender#" + id)
                    return
                }

                select {
                case <- closed:
                    return
                default:
                }

                select {
                case <- closed:
                    return
                case middleCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }

    // receivers
    for range [NumReceivers]struct{}{} {go func() {defer wgReceivers.Done()

            for value := range dataCh {log.Println(value)
            }
        }()}

    // ...
    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
}
更多情景?

应该还会有更多下面情景的变体,然而下面展现了最一般和最罕用的状况。通过奇妙地应用 channel(和其余并发编程技术),对于每种状况变动,都能够找到一个放弃通道敞开准则的解决方案。

论断

没有情景逼迫你突破 channel 敞开的准则,如果你遇到这种状况,请从新思考你的设计和重构你的代码。

浏览原文

正文完
 0