《Kotlin 极简教程》第9章 轻量级线程:协程(2)

华夏商周秦汉唐宋元明清中华民国 · · 4833 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

挂起函数的组合执行

本节我们介绍挂起函数组合的各种方法。

按默认顺序执行

假设我们有两个在别处定义的挂起函数:

    suspend fun doJob1(): Int {
        println("Doing Job1 ...")
        delay(1000L) // 此处模拟我们的工作代码
        println("Job1 Done")
        return 10
    }

    suspend fun doJob2(): Int {
        println("Doing Job2 ...")
        delay(1000L) // 此处模拟我们的工作代码
        println("Job2 Done")
        return 20
    }

如果需要依次调用它们, 我们只需要使用正常的顺序调用, 因为协同中的代码 (就像在常规代码中一样) 是默认的顺序执行。下面的示例通过测量执行两个挂起函数所需的总时间来演示:

    fun testSequential() = runBlocking<Unit> {
        val time = measureTimeMillis {
            val one = doJob1()
            val two = doJob2()
            println("最终结果: ${one + two}")
        }
        println("Completed in $time ms")
    }

执行上面的代码,我们将得到输出:

Doing Job1 ...
Job1 Done
Doing Job2 ...
Job2 Done
最终结果: 30
Completed in 2019 ms

可以看出,我们的代码是跟普通的代码一样顺序执行下去。

从硬件发展来看,从最初的单核单CPU,到单核多CPU,多核多CPU,似乎已经到了极限了,但是单核CPU性能却还在不断提升。如果将程序分为IO密集型应用和CPU密集型应用,二者的发展历程大致如下:

IO密集型应用: 多进程->多线程->事件驱动->协程

CPU密集型应用:多进程-->多线程

如果说多进程对于多CPU,多线程对应多核CPU,那么事件驱动和协程则是在充分挖掘不断提高性能的单核CPU的潜力。

常见的有性能瓶颈的API (例如网络 IO、文件 IO、CPU 或 GPU 密集型任务等),要求调用者阻塞(blocking)直到它们完成才能进行下一步。后来,我们又使用异步回调的方式来实现非阻塞,但是异步回调代码写起来并不简单。

协程提供了一种避免阻塞线程并用更简单、更可控的操作替代线程阻塞的方法:协程挂起。

协程主要是让原来要使用“异步+回调方式”写出来的复杂代码, 简化成可以用看似同步的方式写出来(对线程的操作进一步抽象)。这样我们就可以按串行的思维模型去组织原本分散在不同上下文中的代码逻辑,而不需要去处理复杂的状态同步问题。

协程最早的描述是由Melvin Conway于1958年给出:“subroutines who act as the master program”(与主程序行为类似的子例程)。此后他又在博士论文中给出了如下定义:

  • 数据在后续调用中始终保持( The values of data local to a coroutine persist between successive calls 协程的局部)

  • 当控制流程离开时,协程的执行被挂起,此后控制流程再次进入这个协程时,这个协程只应从上次离开挂起的地方继续 (The execution of a coroutine is suspended as control leaves it, only to carry on where it left off when control re-enters the coroutine at some later stage)。

协程的实现要维护一组局部状态,在重新进入协程前,保证这些状态不被改变,从而能顺利定位到之前的位置。

协程可以用来解决很多问题,比如nodejs的嵌套回调,Erlang以及Golang的并发模型实现等。

协程与异步回调

协程与线程

直接先说区别,Coroutine是编译器级的,Process和Thread是操作系统级的。

Coroutine通常是由编译器来实现的机制。Process和Thread看起来也在语言层次,但是内生原理却是操作系统先有这个东西,然后通过一定的API暴露给用户使用,两者在这里有不同。

协程就是用户空间下的线程。用协程来做的东西,用线程或进程通常也是一样可以做的,但往往多了许多加锁和通信的操作。

线程是抢占式,而协程是非抢占式的,所以需要用户自己释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。

协程并不是取代线程, 而且抽象于线程之上, 线程是被分割的CPU资源, 协程是组织好的代码流程, 协程需要线程来承载运行, 线程是协程的资源, 但协程不会直接使用线程, 协程直接利用的是执行器(Interceptor), 执行器可以关联任意线程或线程池, 可以使当前线程, UI线程, 或新建新程.。

线程是协程的资源。协程通过Interceptor来间接使用线程这个资源。

协程的好处

与多线程、多进程等并发模型不同,协程依靠user-space调度,而线程、进程则是依靠kernel来进行调度。线程、进程间切换都需要从用户态进入内核态,而协程的切换完全是在用户态完成,且不像线程进行抢占式调度,协程是非抢占式的调度。

通常多个运行在同一调度器中的协程运行在一个线程内,这也消除掉了多线程同步等带来的编程复杂性。同一时刻同一调度器中的协程只有一个会处于运行状态。

我们使用协程,程序只在用户空间内切换上下文,不再陷入内核来做线程切换,这样可以避免大量的用户空间和内核空间之间的数据拷贝,降低了CPU的消耗,从而大大减缓高并发场景时CPU瓶颈的窘境。

另外,使用协程,我们不再需要像异步编程时写那么一堆callback函数,代码结构不再支离破碎,整个代码逻辑上看上去和同步代码没什么区别,简单,易理解,优雅。

我们使用协程,我们可以很简单地实现一个可以随时中断随时恢复的函数。

首先复习一下多线程。我们都知道线程——Thread。每一个线程都代表一个执行序列。
当我们在程序中创建多线程的时候,看起来,同一时刻多个线程是同时执行的,不过实质上多个线程是并发的,因为只有一个CPU,所以实质上同一个时刻只有一个线程在执行。
在一个时间片内执行哪个线程是不确定的,我们可以控制线程的优先级,不过真正的线程调度由CPU的调度决定。
(2)协程
那什么是协程呢?协程跟线程都代表一个执行序列。不同的是,协程把线程中不确定的地方尽可能的去掉,执行序列间的切换不再由CPU隐藏的进行,而是由程序显式的进行。
所以,使用协程实现并发,需要多个协程彼此协作。
二、resume和yeild的协作。
resume和yeild的协作是Lua协程的核心。这边用一幅图描述一下,有一个大体的印象。对照下面的coroutine库的详细解释和最后的代码,应该可以搞清楚协程的概念了。
注:这是在非首次resume协程的情况下,resume和yield的互相调用的情况。如果是首次resume协程,那么resume的参数会直接传递给协程函数。



三、coroutine库详解
(1)coroutine.create (f)
传一个函数参数,用来创建协程。返回一个“thread”对象。
(2)coroutine.isyieldable ()
如果正在运行的协程可以让出,则返回真。值得注意的是,只有主协程(线程)和C函数中是无法让出的。
(3)coroutine.resume (co [, val1, ···])
这是一个非常重要的函数。用来启动或再次启动一个协程,使其由挂起状态变成运行状态。
可以这么说,resume函数相当于在执行协程中的方法。参数Val1...是执行协程co时传递给协程的方法。
首次执行协程co时,参数Val1...会传递给协程co的函数;
再次执行协程co时,参数Val1...会作为给协程co中上一次yeild的返回值。
不知道这句话大家理解了没,这是协程的核心。如果没理解也不用急,继续往下看,稍后我会详细解释。
resume函数返回什么呢?有3种情况:
1)、如果协程co的函数执行完毕,协程正常终止,resume返回 true和函数的返回值。
2)、如果协程co的函数执行过程中,协程让出了(调用了yeild()方法),那么resume返回true和协程中调用yeild传入的参数。
3)、如果协程co的函数执行过程中发生错误,resume返回false与错误消息。
可以看到resume无论如何都不会导致程序崩溃。它是在保护模式下执行的。
(4)coroutine.running ()
用来判断当前执行的协程是不是主线程,如果是,就返回true。
(5)coroutine.status (co)
返回一个字符串,表示协程的状态。有4种状态:
1)、running。如果在协程的函数中调用status,传入协程自身的句柄,那么执行到这里的时候才会返回running状态。
2)、suspended。如果协程还未结束,即自身调用了yeild或还没开始运行,那么就是suspended状态。
3)、normal。如果协程Aresume协程B时,协程A处于的状态为normal。在协程B的执行过程中,协程A就一直处于normal状态。因为它这时候既不是挂起状态、也不是运行状态。
4)、dead。如果一个协程发生错误结束,或正常终止。那么就处于dead状态。如果这时候对它调用resume,将返回false和错误消息。
(6)coroutine.wrap (f)
wrap()也是用来创建协程的。只不过这个协程的句柄是隐藏的。跟create()的区别在于:
1)、wrap()返回的是一个函数,每次调用这个函数相当于调用coroutine.resume()。
2)、调用这个函数相当于在执行resume()函数。
3)、调用这个函数时传入的参数,就相当于在调用resume时传入的除协程的句柄外的其他参数。
4)、调用这个函数时,跟resume不同的是,它并不是在保护模式下执行的,若执行崩溃会直接向外抛出。
(7)coroutine.yield (···)
使正在执行的函数挂起。
传递给yeild的参数会作为resume的额外返回值。
同时,如果对该协程不是第一次执行resume,resume函数传入的参数将会作为yield的返回值。

Kotlin 1.1 版本中 引入了多项新的语言特性,其中最值得注意的就是协程(Coroutines)。

尽管依然被认为处于试验性阶段,但是Kotlin 1.1最关键的新特性之一就是协程(coroutine),这个特性可以通过使用三个高层级的构造(construct)来实现:async、await和yield。举例来说:我们可以采用async和await来处理异步操作:

// 在后台线程池中运行代码
fun asyncOverlay() = async(CommonPool) {
// 开启两个异步操作
val original = asyncLoadImage("original")
val overlay = asyncLoadImage("overlay")
// 然后,将overlay应用到这两个结果上
applyOverlay(original.await(), overlay.await())
}
// 在UI上下文中启动新的协程
launch(UI) {
// 等待异步overlay完成
val image = asyncOverlay().await()
// 然后在UI上显示
showImage(image)
}
注意,这里使用了launch,它会启动一个协程。实际上,await只能在一个协程中使用或者在使用关键字suspend声明的函数中使用,这样的话,能够让编译器生成相应的代码,从而在协程中运行函数:

suspend fun workload(n: Int): Int {
delay(1000)
return n
}
在上例中,delay推迟了协程,并不会阻塞它关联的线程。

协程还能以懒加载的方式借助yield来生成序列:

// 推断出的类型为Sequence
val fibonacci = buildSequence {
yield(1) // 第一个Fibonacci数字
var cur = 1
var next = 1
while (true) {
yield(next) // 下一个Fibonacci数字
val tmp = cur + next
cur = next
next = tmp
}
}
println(fibonacci.take(10).joinToString())
因为还是试验性的,协程目前只是选择性使用状态(opt-in),它们的API在未来的释放版本中可能会有所变更。

Kotlin 1.1添加的其他重要的新特性包括:

类型别名,允许用户为某种类型定义其他的名称。
::操作符能够获取特定对象某个方法的成员引用。
数据类可以进行扩展。
在lambdas中支持Destructuring。
在对JavaScript的支持方面,1.1版本的主要目标是让对JavaScript的支持能够达到与JVM对等的程度。这意味着,所有的语言特性都可以在这两个目标平台中使用,不过反射除外,目前JavaScript还没有这样的特性。尤其是:

Kotlin标准库中很大一部分都可以用在JavaScript上。
生成的代码对JavaScript工具更加友好,比如压缩器(minifier),优化器(optimizer)等等。
用于Kotlin类声明的external修饰符目前在JavaScript中已经实现了。
我们可以使用在线的REPL来尝试Kotlin,也可以按照多种不同的方式来进行安装。

一些 API 启动长时间运行的操作(例如网络 IO、文件 IO、CPU 或 GPU 密集型任务等),并要求调用者阻塞直到它们完成。协程提供了一种避免阻塞线程并用更廉价、更可控的操作替代线程阻塞的方法:协程挂起。

在 Kotlin 1.1 中协程是实验性的。详见下文

协程通过将复杂性放入库来简化异步编程。程序的逻辑可以在协程中顺序地表达,而底层库会为我们解决其异步性。该库可以将用户代码的相关部分包装为回调、订阅相关事件、在不同线程(甚至不同机器!)上调度执行,而代码则保持如同顺序执行一样简单。

许多在其他语言中可用的异步机制可以使用 Kotlin 协程实现为库。这包括源于 C# 和 ECMAScript 的 async/await、源于 Go 的 管道 和 select 以及源于 C# 和 Python 生成器/yield。关于提供这些结构的库请参见其下文描述。

阻塞 vs 挂起

基本上,协程计算可以被挂起而无需阻塞线程。线程阻塞的代价通常是昂贵的,尤其在高负载时,因为只有相对少量线程实际可用,因此阻塞其中一个会导致一些重要的任务被延迟。

另一方面,协程挂起几乎是无代价的。不需要上下文切换或者 OS 的任何其他干预。最重要的是,挂起可以在很大程度上由用户库控制:作为库的作者,我们可以决定挂起时发生什么并根据需求优化/记日志/截获。

另一个区别是,协程不能在随机的指令中挂起,而只能在所谓的挂起点挂起,这会调用特别标记的函数。

挂起函数

当我们调用标记有特殊修饰符 suspend 的函数时,会发生挂起:

suspend fun doSomething(foo: Foo): Bar {
……
}
Kotlin
这样的函数称为挂起函数,因为调用它们可能挂起协程(如果相关调用的结果已经可用,库可以决定继续进行而不挂起)。挂起函数能够以与普通函数相同的方式获取参数和返回值,但它们只能从协程和其他挂起函数中调用。事实上,要启动协程,必须至少有一个挂起函数,它通常是匿名的(即它是一个挂起 lambda 表达式)。让我们来看一个例子,一个简化的 async() 函数(源自 kotlinx.coroutines 库):

fun <T> async(block: suspend () -> T)
Kotlin
这里的 async() 是一个普通函数(不是挂起函数),但是它的 block 参数具有一个带 suspend 修饰符的函数类型: suspend () -> T。所以,当我们将一个 lambda 表达式传给 async() 时,它会是挂起 lambda 表达式,于是我们可以从中调用挂起函数:

async {
doSomething(foo)
……
}
Kotlin
继续该类比,await() 可以是一个挂起函数(因此也可以在一个 async {} 块中调用),该函数挂起一个协程,直到一些计算完成并返回其结果:

async {
……
val result = computation.await()
……
}
Kotlin
更多关于 async/await 函数实际在 kotlinx.coroutines 中如何工作的信息可以在这里找到。

请注意,挂起函数 await() 和 doSomething() 不能在像 main() 这样的普通函数中调用:

fun main(args: Array<String>) {
doSomething() // 错误:挂起函数从非协程上下文调用
}
Kotlin
还要注意的是,挂起函数可以是虚拟的,当覆盖它们时,必须指定 suspend 修饰符:

interface Base {
suspend fun foo()
}

class Derived: Base {
override suspend fun foo() { …… }
}
Kotlin
@RestrictsSuspension 注解

扩展函数(和 lambda 表达式)也可以标记为 suspend,就像普通的一样。这允许创建 DSL 及其他用户可扩展的 API。在某些情况下,库作者需要阻止用户添加新方式来挂起协程。

为了实现这一点,可以使用 @RestrictsSuspension 注解。当接收者类/接口 R 用它标注时,所有挂起扩展都需要委托给 R 的成员或其它委托给它的扩展。由于扩展不能无限相互委托(程序不会终止),这保证所有挂起都通过调用 R 的成员发生,库的作者就可以完全控制了。

这在少数情况是需要的,当每次挂起在库中以特殊方式处理时。例如,当通过 buildSequence() 函数实现下文所述的生成器时,我们需要确保在协程中的任何挂起调用最终调用 yield() 或 yieldAll() 而不是任何其他函数。这就是为什么 SequenceBuilder 用 @RestrictsSuspension 注解:

@RestrictsSuspension
public abstract class SequenceBuilder<in T> {
……
}
Kotlin
参见其 Github 上 的源代码。

协程的内部机制

我们不是在这里给出一个关于协程如何工作的完整解释,然而粗略地认识发生了什么是相当重要的。

协程完全通过编译技术实现(不需要来自 VM 或 OS 端的支持),挂起通过代码来生效。基本上,每个挂起函数(优化可能适用,但我们不在这里讨论)都转换为状态机,其中的状态对应于挂起调用。刚好在挂起前,下一状态与相关局部变量等一起存储在编译器生成的类的字段中。在恢复该协程时,恢复局部变量并且状态机从刚好挂起之后的状态进行。

挂起的协程可以作为保持其挂起状态与局部变量的对象来存储和传递。这种对象的类型是 Continuation,而这里描述的整个代码转换对应于经典的延续性传递风格(Continuation-passing style)。因此,挂起函数有一个 Continuation 类型的额外参数作为高级选项。

关于协程工作原理的更多细节可以在这个设计文档中找到。在其他语言(如 C# 或者 ECMAScript 2016)中的 async/await 的类似描述与此相关,虽然它们实现的语言功能可能不像 Kotlin 协程这样通用。

协程的实验性状态

协程的设计是实验性的,这意味着它可能在即将发布的版本中更改。当在 Kotlin 1.1 中编译协程时,默认情况下会报一个警告:“协程”功能是实验性的。要移出该警告,你需要指定 opt-in 标志。

由于其实验性状态,标准库中协程相关的 API 放在 kotlin.coroutines.experimental 包下。当设计完成并且实验性状态解除时,最终的 API 会移动到 kotlin.coroutines,并且实验包会被保留(可能在一个单独的构件中)以实现向后兼容。

重要注意事项:我们建议库作者遵循相同惯例:给暴露基于协程 API 的包添加“experimental”后缀(如 com.example.experimental),以使你的库保持二进制兼容。当最终 API 发布时,请按照下列步骤操作:

将所有 API 复制到 com.example(没有 experimental 后缀),
保持实验包的向后兼容性。
这将最小化你的用户的迁移问题。

标准 API

协程有三个主要组成部分:

语言支持(即如上所述的挂起功能),
Kotlin 标准库中的底层核心 API,
可以直接在用户代码中使用的高级 API。
底层 API:kotlin.coroutines

底层 API 相对较小,并且除了创建更高级的库之外,不应该使用它。 它由两个主要包组成:

kotlin.coroutines.experimental 带有主要类型与下述原语
createCoroutine()
startCoroutine()
suspendCoroutine()
kotlin.coroutines.experimental.intrinsics 带有甚至更底层的内在函数如 suspendCoroutineOrReturn
关于这些 API 用法的更多细节可以在这里找到。

kotlin.coroutines 中的生成器 API

kotlin.coroutines.experimental 中仅有的“应用程序级”函数是

buildSequence()
buildIterator()
这些包含在 kotlin-stdlib 中因为他们与序列相关。这些函数(我们可以仅限于这里的 buildSequence())实现了 生成器 ,即提供一种廉价构建惰性序列的方法:

kotlin import kotlin.coroutines.experimental.* fun main(args: Array<String>) { //sampleStart val fibonacciSeq = buildSequence { var a = 0 var b = 1 yield(1) while (true) { yield(a + b) val tmp = a + b a = b b = tmp } } //sampleEnd // 输出前五个斐波纳契数字 println(fibonacciSeq.take(8).toList()) }

这通过创建一个协程生成一个惰性的、潜在无限的斐波那契数列,该协程通过调用 yield() 函数来产生连续的斐波纳契数。当在这样的序列的迭代器上迭代每一步,都会执行生成下一个数的协程的另一部分。因此,我们可以从该序列中取出任何有限的数字列表,例如 fibonacciSeq.take(8).toList() 结果是 [1, 1, 2, 3, 5, 8, 13, 21]。协程足够廉价使这很实用。

为了演示这样一个序列的真正惰性,让我们在调用 buildSequence() 内部输出一些调试信息:

kotlin import kotlin.coroutines.experimental.* fun main(args: Array<String>) { //sampleStart val lazySeq = buildSequence { print("START ") for (i in 1..5) { yield(i) print("STEP ") } print("END") } // 输出序列的前三个元素 lazySeq.take(3).forEach { print("$it ") } //sampleEnd }

运行上面的代码看,是不是我们输出前三个元素的数字与生成循环的 STEP 有交叉。这意味着计算确实是惰性的。要输出 1,我们只执行到第一个 yield(i),并且过程中会输出 START。然后,输出 2,我们需要继续下一个 yield(i),并会输出 STEP。3 也一样。永远不会输出再下一个 STEP(以及END),因为我们再也没有请求序列的后续元素。

为了一次产生值的集合(或序列),可以使用 yieldAll() 函数:

kotlin import kotlin.coroutines.experimental.* fun main(args: Array<String>) { //sampleStart val lazySeq = buildSequence { yield(0) yieldAll(1..10) } lazySeq.forEach { print("$it ") } //sampleEnd }

buildIterator() 的工作方式类似于 buildSequence(),但返回一个惰性迭代器。

可以通过为 SequenceBuilder 类写挂起扩展(带有上文描述的 @RestrictsSuspension 注解)来为 buildSequence() 添加自定义生产逻辑(custom yielding logic):

kotlin import kotlin.coroutines.experimental.* //sampleStart suspend fun SequenceBuilder<Int>.yieldIfOdd(x: Int) { if (x % 2 != 0) yield(x) } val lazySeq = buildSequence { for (i in 1..10) yieldIfOdd(i) } //sampleEnd fun main(args: Array<String>) { lazySeq.forEach { print("$it ") } }

其他高级 API:kotlinx.coroutines

只有与协程相关的核心 API 可以从 Kotlin 标准库获得。这主要包括所有基于协程的库可能使用的核心原语和接口。

大多数基于协程的应用程序级API都作为单独的库发布:kotlinx.coroutines。这个库覆盖了

使用 kotlinx-coroutines-core 的平台无关异步编程
此模块包括支持 select 和其他便利原语的类似 Go 的管道
这个库的综合指南在这里。
基于 JDK 8 中的 CompletableFuture 的 API:kotlinx-coroutines-jdk8
基于 JDK 7 及更高版本 API 的非阻塞 IO(NIO):kotlinx-coroutines-nio
支持 Swing (kotlinx-coroutines-swing) 和 JavaFx (kotlinx-coroutines-javafx)
支持 RxJava:kotlinx-coroutines-rx
这些库既作为使通用任务易用的便利的 API,也作为如何构建基于协程的库的端到端示例。

无限Fibonacci数列:

Think about lazy, infinite streams of values, like the Fibonacci sequence:
// inferred type is Sequence<Int>
val fibonacci = buildSequence {
yield(1)
var cur = 1
var next = 1
while (true) {
yield(next)
val tmp = cur + next
cur = next
next = tmp
}
}


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:华夏商周秦汉唐宋元明清中华民国

查看原文:《Kotlin 极简教程》第9章 轻量级线程:协程(2)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

4833 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传