几天前,我写了一篇解释go通道准则。那篇文章在reddit 和 HN上失去了很多反对。然而go通道设计细节失去了许多批评。

我总结批评内容如下:

  • 不批改channel状态的状况下,没有对立和简略的形式去判断channel是否敞开。
  • 敞开曾经敞开的channel会panic,所以敞开channel是十分危险的如果不晓得channel是否敞开。
  • 向敞开的channel发送值会panic,所以发送值到channel是十分危险的如果不晓得channel是否敞开。

这些评论看起来是正当的(实际上不然),是的,并没有内置的函数去判断channel是否敞开。

如果你能确定不再(未来)往channel发送值,的确有简略的办法去判断channel是否敞开,为了不便了解,请看上面的列子:

package mainimport "fmt"type T intfunc IsClosed(ch <-chan T) bool {    select {    case <-ch:        return true    default:    }    return false}func main() {    c := make(chan T)    fmt.Println(IsClosed(c)) // false    close(c)    fmt.Println(IsClosed(c)) // true}

就像下面提到的,没有对立的形式去查看channel是否敞开。

实际上,即便有简略的内置办法closed 去判断channel是否敞开,就像内置函数len 判断channel 元素个数,价值无限。起因是被查看的channel可能会在函数调用并返回后状态曾经扭转,所以返回的值并不能反映最新的channel状态。不过如果调用closed(ch)返回true能够进行向channel发送值,然而如果调用closed(ch),返回false,则敞开通道或持续向通道发送值是不平安的。

channel的敞开准则

最根本的准则是不要要接受者端敞开channel,也不要在有多个并发发送者的状况下敞开channel。换句话说,咱们只能在发送端敞开channel,并且是惟一的发送者。

(上面咱们将称下面的准则为敞开准则)

当然,并没有对立的准则去敞开channel。对立的准则便是不要敞开(或者发送值)曾经敞开的channel。如果咱们能保障没有协程发送或者敞开一个没有敞开且不为nil的channel,那咱们能够平安敞开这个channel。然而,对于接受者或或者多个发送者中作出这样的保障往往须要很多致力,而且常常使代码更为简单。相同,听从channel敞开准则绝对比较简单。

粗犷的敞开channel的解决方案

如果你无论如也想从接收端或者多个发送者中一个敞开channel,你能够应用谬误复原机制阻止恐慌的可能来防止程序宕机。上面是一个示例(假如channel 的类型是T):

func SafeClose(ch chan T) (justClosed bool) {    defer func() {        if recover() != nil {            // The return result can be altered            // in a defer function call.            justClosed = false        }    }()    // assume ch != nil here.    close(ch)   // panic if ch is closed    return true // <=> justClosed = true; return}

这种解决方案,显然违反了敞开准则。

同样的问题是向潜在的已敞开的channel发送值。

func SafeSend(ch chan T, value T) (closed bool) {    defer func() {        if recover() != nil {            closed = true        }    }()    ch <- value  // panic if ch is closed    return false // <=> closed = false; return}

不仅突破了敞开准则,在运行过程中可能产生数据竞争。

礼貌的敞开channel

许多人喜爱用async.once敞开channel:

type MyChannel struct {    C    chan T    once sync.Once}func NewMyChannel() *MyChannel {    return &MyChannel{C: make(chan T)}}func (mc *MyChannel) SafeClose() {    mc.once.Do(func() {        close(mc.C)    })}

当然,你也能够用async.Mutex 防止敞开channel屡次。

ype MyChannel struct {    C      chan T    closed bool    mutex  sync.Mutex}func NewMyChannel() *MyChannel {    return &MyChannel{C: make(chan T)}}func (mc *MyChannel) SafeClose() {    mc.mutex.Lock()    defer mc.mutex.Unlock()    if !mc.closed {        close(mc.C)        mc.closed = true    }}func (mc *MyChannel) IsClosed() bool {    mc.mutex.Lock()    defer mc.mutex.Unlock()    return mc.closed}

这种形式可能比拟礼貌,然而并不能防止数据竞争。目前,Go运行机制并不能保障敞开channel和向channel发送值同时执行不会产生数据竞争。如果对同一channel执行通道发送操作的同时调用SafeClose函数,则可能会产生数据竞争(只管这种数据竞争个别有害的)。

优雅的敞开channel

上述SafeSend函数的毛病是,在select块中case关键字分枝上不能调用作为发送操作;另一个毛病是很多人包含我认为在下面的SafeSend和SafeClose函数中应用panic/recover` 和async 包是不优雅的。接下来,针对各种解状况介绍一些不应用包的纯channel解决方案。

(在上面的示例中,sync.WaitGroup 齐全用于示例,理论中可能并不应用)

多个接收者,一个发送者,发送者敞开channel示意‘没有值能够发送’

这是一个非常简单的状况,仅仅是让发送者在不想发送数据时候敞开channel。

package mainimport (    "time"    "math/rand"    "sync"    "log")func main() {    rand.Seed(time.Now().UnixNano())    log.SetFlags(0)    // ...    const Max = 100000    const NumReceivers = 100    wgReceivers := sync.WaitGroup{}    wgReceivers.Add(NumReceivers)    // ...    dataCh := make(chan int)    // the sender    go func() {        for {            if value := rand.Intn(Max); value == 0 {                // The only sender can close the                // channel at any time safely.                close(dataCh)                return            } else {                dataCh <- value            }        }    }()    // receivers    for i := 0; i < NumReceivers; i++ {        go func() {            defer wgReceivers.Done()            // Receive values until dataCh is            // closed and the value buffer queue            // of dataCh becomes empty.            for value := range dataCh {                log.Println(value)            }        }()    }    wgReceivers.Wait()}
一个接收者,多个发送者,惟一的接收者通过敞开额定的channel通道示意‘请进行发送值到channel’

这是一个比下面较简单的状况。咱们不能为阻止数据传输让接收者敞开数据channel,这样违反了channel敞开的准则,然而咱们能够通过敞开额定的信号channel去告诉发送者进行发送值。

package mainimport (    "time"    "math/rand"    "sync"    "log")func main() {    rand.Seed(time.Now().UnixNano())    log.SetFlags(0)    // ...    const Max = 100000    const NumSenders = 1000    wgReceivers := sync.WaitGroup{}    wgReceivers.Add(1)    // ...    dataCh := make(chan int)    stopCh := make(chan struct{})        // stopCh is an additional signal channel.        // Its sender is the receiver of channel        // dataCh, and its receivers are the        // senders of channel dataCh.    // senders    for i := 0; i < NumSenders; i++ {        go func() {            for {                // The try-receive operation is to try                // to exit the goroutine as early as                // possible. For this specified example,                // it is not essential.                select {                case <- stopCh:                    return                default:                }                // Even if stopCh is closed, the first                // branch in the second select may be                // still not selected for some loops if                // the send to dataCh is also unblocked.                // But this is acceptable for this                // example, so the first select block                // above can be omitted.                select {                case <- stopCh:                    return                case dataCh <- rand.Intn(Max):                }            }        }()    }    // the receiver    go func() {        defer wgReceivers.Done()        for value := range dataCh {            if value == Max-1 {                // The receiver of channel dataCh is                // also the sender of stopCh. It is                // safe to close the stop channel here.                close(stopCh)                return            }            log.Println(value)        }    }()    // ...    wgReceivers.Wait()}

正如正文中说的,信号channel的发送者是数据接收者channel。信号channel听从channel敞开准则,只能被他的发送者敞开。

在下面的示例中,dataCh不会被敞开。是的,Channel不是必须被敞开的。一个channel无论是否敞开,当没有一个协程援用的时候,最终就会被GC回收。所以这里优雅的敞开channel就是不敞开channel。

M个接收者,N个发送者,任何一个通过中间人敞开信号channel示意‘让咱们完结游戏吧’

这是最简单的状况。咱们不能让任何一个发送者和接收者敞开数据channel。咱们也不能让任何一个接收者敞开信号channel告诉所有的接收者和发送者完结游戏。其中任何一种形式都突破了敞开准则。然而,咱们能够引入一个两头角色去敞开信号channel。在上面例子中有一个技巧是如何应用try-send操作去告诉中间人敞开信号通道。

package mainimport (    "time"    "math/rand"    "sync"    "log"    "strconv")func main() {    rand.Seed(time.Now().UnixNano())    log.SetFlags(0)    // ...    const Max = 100000    const NumReceivers = 10    const NumSenders = 1000    wgReceivers := sync.WaitGroup{}    wgReceivers.Add(NumReceivers)    // ...    dataCh := make(chan int)    stopCh := make(chan struct{})        // stopCh is an additional signal channel.        // Its sender is the moderator goroutine shown        // below, and its receivers are all senders        // and receivers of dataCh.    toStop := make(chan string, 1)        // The channel toStop is used to notify the        // moderator to close the additional signal        // channel (stopCh). Its senders are any senders        // and receivers of dataCh, and its receiver is        // the moderator goroutine shown below.        // It must be a buffered channel.    var stoppedBy string    // moderator    go func() {        stoppedBy = <-toStop        close(stopCh)    }()    // senders    for i := 0; i < NumSenders; i++ {        go func(id string) {            for {                value := rand.Intn(Max)                if value == 0 {                    // Here, the try-send operation is                    // to notify the moderator to close                    // the additional signal channel.                    select {                    case toStop <- "sender#" + id:                    default:                    }                    return                }                // The try-receive operation here is to                // try to exit the sender goroutine as                // early as possible. Try-receive and                // try-send select blocks are specially                // optimized by the standard Go                // compiler, so they are very efficient.                select {                case <- stopCh:                    return                default:                }                // Even if stopCh is closed, the first                // branch in this select block might be                // still not selected for some loops                // (and for ever in theory) if the send                // to dataCh is also non-blocking. If                // this is unacceptable, then the above                // try-receive operation is essential.                select {                case <- stopCh:                    return                case dataCh <- value:                }            }        }(strconv.Itoa(i))    }    // receivers    for i := 0; i < NumReceivers; i++ {        go func(id string) {            defer wgReceivers.Done()            for {                // Same as the sender goroutine, the                // try-receive operation here is to                // try to exit the receiver goroutine                // as early as possible.                select {                case <- stopCh:                    return                default:                }                // Even if stopCh is closed, the first                // branch in this select block might be                // still not selected for some loops                // (and forever in theory) if the receive                // from dataCh is also non-blocking. If                // this is not acceptable, then the above                // try-receive operation is essential.                select {                case <- stopCh:                    return                case value := <-dataCh:                    if value == Max-1 {                        // Here, the same trick is                        // used to notify the moderator                        // to close the additional                        // signal channel.                        select {                        case toStop <- "receiver#" + id:                        default:                        }                        return                    }                    log.Println(value)                }            }        }(strconv.Itoa(i))    }    // ...    wgReceivers.Wait()    log.Println("stopped by", stoppedBy)}

在这个例子中,仍然守住了channel敞开的准则。

请留神,toStop的缓冲大小是1。这是为了防止当中间人筹备从toStop接管信号之前信号失落。

咱们也能够设置toStop的缓冲大小是发送者和接收者之和。那样咱们就不须要try-send的select块去告诉中间人。

...toStop := make(chan string, NumReceivers + NumSenders)...            value := rand.Intn(Max)            if value == 0 {                toStop <- "sender#" + id                return            }...                if value == Max-1 {                    toStop <- "receiver#" + id                    return                }...
"多个接收者,单个发送者"变体的情景:敞开申请是通过第三方

有时候,敞开信号是由第三方收回。在这种状况下,咱们能够用额定的信号去告诉发送者敞开channel。例如:

package mainimport (    "time"    "math/rand"    "sync"    "log")func main() {    rand.Seed(time.Now().UnixNano())    log.SetFlags(0)    // ...    const Max = 100000    const NumReceivers = 100    const NumThirdParties = 15    wgReceivers := sync.WaitGroup{}    wgReceivers.Add(NumReceivers)    // ...    dataCh := make(chan int)    closing := make(chan struct{}) // signal channel    closed := make(chan struct{})        // The stop function can be called    // multiple times safely.    stop := func() {        select {        case closing<-struct{}{}:            <-closed        case <-closed:        }    }        // some third-party goroutines    for i := 0; i < NumThirdParties; i++ {        go func() {            r := 1 + rand.Intn(3)            time.Sleep(time.Duration(r) * time.Second)            stop()        }()    }    // the sender    go func() {        defer func() {            close(closed)            close(dataCh)        }()        for {            select{            case <-closing: return            default:            }            select{            case <-closing: return            case dataCh <- rand.Intn(Max):            }        }    }()    // receivers    for i := 0; i < NumReceivers; i++ {        go func() {            defer wgReceivers.Done()            for value := range dataCh {                log.Println(value)            }        }()    }    wgReceivers.Wait()}
'多个发送者'状况的变体:敞开channel必须通知所有的接收者曾经不再发送数据

在下面N发送者的状况,为了坚守channel敞开准则,咱们防止敞开channel。然而,有时候,咱们必须敞开channel来通知所有接收者不再发送数据。在这种状况下,咱们能够通过引入两头channel,将N-sender情景转化为One-sender情景。两头channel只有一个发送者,所以咱们能够通过敞开这个channel来代替敞开原始数据channel。

package mainimport (    "time"    "math/rand"    "sync"    "log"    "strconv")func main() {    rand.Seed(time.Now().UnixNano())    log.SetFlags(0)    // ...    const Max = 1000000    const NumReceivers = 10    const NumSenders = 1000    const NumThirdParties = 15    wgReceivers := sync.WaitGroup{}    wgReceivers.Add(NumReceivers)    // ...    dataCh := make(chan int)     // will be closed    middleCh := make(chan int)   // will never be closed    closing := make(chan string) // signal channel    closed := make(chan struct{})    var stoppedBy string    // The stop function can be called    // multiple times safely.    stop := func(by string) {        select {        case closing <- by:            <-closed        case <-closed:        }    }        // the middle layer    go func() {        exit := func(v int, needSend bool) {            close(closed)            if needSend {                dataCh <- v            }            close(dataCh)        }        for {            select {            case stoppedBy = <-closing:                exit(0, false)                return            case v := <- middleCh:                select {                case stoppedBy = <-closing:                    exit(v, true)                    return                case dataCh <- v:                }            }        }    }()        // some third-party goroutines    for i := 0; i < NumThirdParties; i++ {        go func(id string) {            r := 1 + rand.Intn(3)            time.Sleep(time.Duration(r) * time.Second)            stop("3rd-party#" + id)        }(strconv.Itoa(i))    }    // senders    for i := 0; i < NumSenders; i++ {        go func(id string) {            for {                value := rand.Intn(Max)                if value == 0 {                    stop("sender#" + id)                    return                }                select {                case <- closed:                    return                default:                }                select {                case <- closed:                    return                case middleCh <- value:                }            }        }(strconv.Itoa(i))    }    // receivers    for range [NumReceivers]struct{}{} {        go func() {            defer wgReceivers.Done()            for value := range dataCh {                log.Println(value)            }        }()    }    // ...    wgReceivers.Wait()    log.Println("stopped by", stoppedBy)}
更多情景?

应该还会有更多下面情景的变体,然而下面展现了最一般和最罕用的状况。通过奇妙地应用channel(和其余并发编程技术),对于每种状况变动,都能够找到一个放弃通道敞开准则的解决方案。

论断

没有情景逼迫你突破channel敞开的准则,如果你遇到这种状况,请从新思考你的设计和重构你的代码。

浏览原文