乐趣区

关于kotlin:Kotlin协程Channel浅析

论断后行

Kotlin 协程中的 Channel 用于解决多个数据组合的流,随用随取,时刻筹备着,就像自来水一样,关上开关就有水了。

Channel 应用示例

fun main() = runBlocking {logX("开始")
    val channel = Channel<Int> { }
    launch {(1..3).forEach{channel.send(it)
            logX("发送数据: $it")
        }
        // 敞开 channel,节俭资源
        channel.close()}
    launch {for (i in channel){logX("接收数据: $i")
        }
    }

    logX("完结")
}

示例代码 应用 Channel 创立了一组 int 类型的数据流,通过 send 发送数据,并通过 for 循环取出 channel 中的数据,最初 channel 是一种协程资源,应用完结后应该及时调用 close 办法敞开,免得节约不必要的资源。

Channel 的源码

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {RENDEZVOUS -> {}
        CONFLATED -> {}
        UNLIMITED -> {}
        else -> {}}

能够看到 Channel 的构造函数蕴含了三个参数,别离是 capacity、onBufferOverflow、onUndeliveredElement.

首先看 capacity,这个参数代表了管道的容量,默认参数是 RENDEZVOUS,取值是 0,还有其余一些值:

  • UNLIMITED: Int = Int.MAX_VALUE,没有限量
  • CONFLATED: 容量为 1,新的笼罩旧的值
  • BUFFERED: 增加缓冲容量,默认值是 64,能够通过批改 VM 参数:kotlinx.coroutines.channels.defaultBuffer,进行批改

接下来看 onBufferOverflow, 顾名思义就是管道容量满了,怎么办?默认是挂起,也就是 suspend,一共有三种别离是:
SUSPNED、DROP_OLDEST 以及 DROP_LATEST

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,

    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,

    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}
  • SUSPEND,当管道的容量满了当前,如果发送方还要持续发送,咱们就会挂起以后的 send() 办法。因为它是一个挂起函数,所以咱们能够以非阻塞的形式,将发送方的执行流程挂起,等管道中有了闲暇地位当前再复原,有点像生产者 - 消费者模型
  • DROP_OLDEST,顾名思义,就是抛弃最旧的那条数据,而后发送新的数据,有点像 LRU 算法。
  • DROP_LATEST,抛弃最新的那条数据。这里要留神,这个动作的含意是抛弃以后正筹备发送的那条数据,而管道中的内容将维持不变。

最初一个参数是 onUndeliveredElement,从名字看像是没有投递胜利的回调,也的确如此,当管道中某些数据没有胜利接管时,这个就会被调用。

综合这个参数应用一下

fun main() = runBlocking {println("开始")
    val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {println("onUndeliveredElement = $it")
    }

    launch {(1..3).forEach{channel.send(it)
            println("发送数据: $it")
        }
        // 敞开 channel,节俭资源
        channel.close()}
    launch {for (i in channel){println("接收数据: $i")
        }
    }

    println("完结")
}

输入后果如下:开始
完结
发送数据: 1
发送数据: 2
发送数据: 3
接收数据: 2
接收数据: 3

平安的从 Channel 中取数据

先看一个例子

val channel: ReceiveChannel<Int> = produce {(1..100).forEach{send(it)
            println("发送:$it")
        }
    }

while (!channel.isClosedForReceive){val i = channel.receive();
    println("接管:$i")
}
    
输入报错信息:Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

能够看到应用 isClosedForReceive 判断是否敞开再应用 receive 办法接收数据,仍然会报错,所以不举荐应用这种形式。

举荐应用下面 for 循环的形式取数据,还有 kotlin 举荐的 consumeEach 形式,看一下示例代码

val channel: ReceiveChannel<Int> = produce {(1..100).forEach{send(it)
            println("发送:$it")
        }
    }
channel.consumeEach {println("接管:$it")
}

所以,当咱们想要获取 Channel 当中的数据时,咱们尽量应用 for 循环,或者是 channel.consumeEach {},不要间接调用 channel.receive()。

“热的数据流”从何而来?

先看一下代码

    println("开始")
    val channel = Channel<Int>(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) {println("onUndeliveredElement = $it")
    }

    launch {(1..3).forEach{channel.send(it)
            println("发送数据: $it")
        }
    }
    println("完结")
}
输入:开始
完结
发送数据: 1
发送数据: 2
发送数据: 3

能够看到上述代码中并没有 取 channel 中的数据,然而发送的代码失常执行了,这种“不论有没有接管方,发送方都会工作”的模式,就是咱们将其认定为“热”的起因。

举个例子,就像去海底捞吃火锅一样,你不须要被动要求服务员加水,服务员看到你的杯子中水少了,会主动给你增加,你只管拿起水杯喝水就行了。

总的来说,不论接管方是否存在,Channel 的发送方肯定会工作。

Channel 能力的起源

通过源码能够看到 Channel 只是一个接口,它的能力来源于 SendChannel 和 ReceiveChannel,一个发送管道,一个接管管道,相当于做了一个组合。

这也是一种良好的设计思维,“对读取凋谢,对写入关闭”的开闭准则。

欢送关注公众号:君伟说,后盾回复“技术交换”,邀你进群,一起提高

退出移动版