企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 管道 ### 管道简介 当通过async()启动一个协程时,会返回一个defferd类型的对象,defferd相当于是输出一个具体的值,此时也可以通过管道(Channel)的方式来输出具体的值。管道提供了一种传输的价值流。 管道在概念上与BlockingQueue(阻塞队列)非常类似,但是区别在于管道不是一个阻塞put操作而是一个暂停发送操作,不是一个阻塞take操作而是一个暂停接收操作。阻塞队列会阻塞线程,而管道则不会阻塞线程。管道中有两个方法,分别是send()和receive(),这两个方法分别用于发送数据和接收数据。接下来我们通过一个案例来演示管道的send()和receive()方法,具体代码如下所示。 ``` import kotlinx.coroutines.experimental.channels.Channel import kotlinx.coroutines.experimental.delay import kotlinx.coroutines.experimental.launch import kotlinx.coroutines.experimental.runBlocking fun main(args: Array<String>): Unit = runBlocking { val channel = Channel<Int>() launch { (1..4).forEach { channel.send(it * 10) delay(1000L) } } repeat(4) { val result = channel.receive() print("result=${result}\t") } } ``` 运行结果: ``` result=10 result=20 result=30 result=40 ``` 上述代码中,Channel表示管道,在forEach循环中每间隔一秒会执行一次channel中的send()方法,来发送协程中的数据信息,接着通过repeat()方法来循环执行接收数据的代码,该方法中的参数4表示接收的信息数量,在repeat()方法中通过channel中的receive()方法来接收发送的数据信息并打印出来。 ### 管道的关闭 管道与阻塞队列比较类似。但是管道与阻塞队列的区别是,第一,队列是阻塞的,管道是非阻塞的;第二,管道可以通过close()方法进行关闭,当没有更多数据需要添加到管道中时,管道就可以进行关闭。在管道的接收端,通常使用for循环接收管道发送的数据,从概念上来讲,结束就像发送了一个特殊的密码令牌给该频道,一旦接收到这个关闭标记,迭代就会停止,之后所有发送的数据不会被接收。接下来我们通过一个案例来演示关闭管道后数据的接收情况,具体代码如下所示。 ``` import kotlinx.coroutines.experimental.channels.Channel import kotlinx.coroutines.experimental.delay import kotlinx.coroutines.experimental.launch import kotlinx.coroutines.experimental.runBlocking fun main(args: Array<String>): Unit = runBlocking { val channel = Channel<Int>() launch { (1..3).forEach { channel.send(it * 10) println("发送端的关闭状态=${channel.isClosedForSend}") delay(1000L) } //关闭管道 channel.close() println("管道关闭后发送端的关闭状态=${channel.isClosedForSend}") println("管道关闭后接收端的关闭状态=${channel.isClosedForReceive}") } repeat(10) { val result = channel.receive() println("result=$result 接收端的关闭状态=${channel. isClosedForReceive}") } } ``` 运行结果: ``` 发送端的关闭状态=false result=10接收端的关闭状态=false 发送端的关闭状态=false result=20接收端的关闭状态=false 发送端的关闭状态=false result=30接收端的关闭状态=false 管道关闭后发送端的关闭状态=true 管道关闭后接收端的关闭状态=true Exception in thread"main"kotlinx.coroutines.experimental.channels. ClosedReceiveChannelException:Channel was closed ``` 根据上述代码的运行结果可知,管道分为发送端关闭状态和接收端关闭状态,当管道被关闭之前,发送端的关闭状态都为false,接收端的关闭状态也为false,等接收完所有元素之后管道关闭,此时发送端的关闭状态与接收端的关闭状态才为true。管道被关闭之后就无法接收其他数据,否则,程序就会报错。由于上述代码中repeat()方法中传递的参数为10,也就是当接收完管道发送过来的3个数据之后,在接收端还在循环进行接收数据,此时程序运行结果就报错了。 ### 生产者与消费者 协程产生一系列元素的模式比较普遍,这通常是在并发代码中发现的“生产者—消费者”模式的一部分,可以将生产者抽象为一个以通道为参数的函数,但是生产者的结果必须是从函数中返回的。 接下来,通过管道来生成一个生产者和消费者模式,在管道中可以通过produce来生成一个协程,在协程中通过管道来发送一些数据。接下来我们通过一个案例来演示通过管道来生成生产者与消费者,具体代码如下所示。 ``` import kotlinx.coroutines.experimental.channels.consumeEach import kotlinx.coroutines.experimental.channels.produce import kotlinx.coroutines.experimental.delay import kotlinx.coroutines.experimental.runBlocking //生产者 fun produceSquares() = produce<Int> { (1..5).forEach { send(it * 10) delay(1000L) } } //消费者 suspend fun consumeSquares() { val squares = produceSquares() //接收生产者发送的信息 squares.consumeEach { //类似于for循环 print("it=$it \t") } } fun main(args: Array<String>): Unit = runBlocking { consumeSquares() } ``` 运行结果: ``` it=10 it=20 it=30 it=40 it=50 ``` 上述代码中,通过produce生成了一个协程。在这个协程中通过管道中的send()方法将信息发送出去,这个协程的返回值是一个函数produceSquares(),这个函数是一个生产者。接着在consumeSquares()方法中获取生产者发送的信息,并通过扩展函数consumeEach()可以替代for循环。 管道的发送顺序和接收顺序是一致的,管道主要用于线程间通信和父子进程间的通信。如果以后遇到需要描述生产者和消费者的模式时,可以通过管道来进行演示。 ### 管道缓存区 当默认创建一个管道时,这个管道是没有缓冲区的,发送端和接收端彼此间相遇时才可以进行发送和接收的操作,也就是说当发送端发送时,首先调用send()方法,发送完信息之后就必须要接收;接收端接收完之后才可以发送下一条信息。如果发送完信息之后还没有被接收,此时程序就暂时停在这个地方,并不属于阻塞,属于挂起,等到后续接收完信息之后才会继续发送信息。接收端接收时也是一样的,如果接收时没有数据发送过来,此时程序就暂时停止,直到有信息发送过来才会进行接收,在这里的等待也属于挂起而不是阻塞。为了解决这个暂时停止的问题,可以在程序中创建管道的缓冲区。 在创建缓冲通道时,可以设置缓冲区的大小,其中Channel()函数中传递的capacity参数是来指定缓冲区大小的,缓冲区允许发送者在挂起之前发送多个元素,类似于BlockingQueue指定的容量,当缓冲区没有满时,无论发送端发送的信息有没有被接收都可以一直向缓冲区存放发送的元素,直到缓冲区被存放满时,程序才会挂起,挂起之后等待后续接收这些信息,接收完之后才会继续进行这样的操作。接下来我们通过一个案例来演示管道的缓冲区,具体代码如下所示。 ``` import kotlinx.coroutines.experimental.channels.Channel import kotlinx.coroutines.experimental.delay import kotlinx.coroutines.experimental.launch import kotlinx.coroutines.experimental.runBlocking fun main(args: Array<String>): Unit = runBlocking { val channel = Channel<Int>(3) //创建缓冲通道 val sender = launch(coroutineContext) { //启动协程 repeat(10) { println("sending $it") //打印发送的每个元素 channel.send(it) //发送元素,当缓冲区已满时将暂停发送 } } delay(1000L) sender.cancel() //取消协程sender println("") } ``` 运行结果: ``` sending 0 sending 1 sending 2 sending 3 ``` 上述代码中,通过Channel创建了一个缓冲区,这个缓冲区的大小设置为3,也就是可以向缓冲区中存放3个元素。当发送完前3个元素之后,这3个元素已经缓存在管道中,等着接收端进行接收。当发送第4个元素时,发现这个缓冲区已经没有空间了,这个发送此时就变为挂起状态。由于第15行代码中的cancel()方法的返回值是布尔类型的,而main()函数的返回值是Unit,也就是没有返回值,因此需要在调用cancel()方法的下方随意添加一行没有返回值的代码,不然程序会报错。 ## 本章小结 本章主要介绍了Kotlin中的协程,详细介绍了协程的概念、协程的取消以及管道。通过对本章的学习,读者可以掌握Kotlin程序中协程的使用方法。要求读者必须掌握本章内容,便于后续开发Kotlin程序。 【思考题】 1. 请思考线程与协程的效率对比。 2. 请思考协程是如何取消的。