[TOC]
## 共享的可变状态与并发
协程可用多线程调度器(比如默认的 [Dispatchers.Default](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html))并发执行。这样就可以提出所有常见的并发问题。主要的问题是同步访问**共享的可变状态**。协程领域对这个问题的一些解决方案类似于多线程领域中的解决方案,但其它解决方案则是独一无二的。
### 问题
我们启动一百个协程,它们都做一千次相同的操作。我们同时会测量它们的完成时间以便进一步的比较:
```kotlin
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
```
我们从一个非常简单的动作开始:使用多线程的 [Dispatchers.Default](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html) 来递增一个共享的可变变量。
```kotlin
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[这里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-01.kt)获取完整代码。
这段代码最后打印出什么结果?它不太可能打印出“Counter = 100000”,因为一百个协程在多个线程中同时递增计数器但没有做并发处理。
### volatile 无济于事
有一种常见的误解:volatile 可以解决并发问题。让我们尝试一下:
```kotlin
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
@Volatile // 在 Kotlin 中 `volatile` 是一个注解
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[这里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-02.kt)获取完整代码。
这段代码运行速度更慢了,但我们最后仍然没有得到“Counter = 100000”这个结果,因为 volatile 变量保证可线性化(这是“原子”的技术术语)读取和写入变量,但在大量动作(在我们的示例中即“递增”操作)发生时并不提供原子性。
### 线程安全的数据结构
一种对线程、协程都有效的常规解决方法,就是使用线程安全(也称为同步的、可线性化、原子)的数据结构,它为需要在共享状态上执行的相应操作提供所有必需的同步处理。在简单的计数器场景中,我们可以使用具有`incrementAndGet` 原子操作的 `AtomicInteger` 类:
```kotlin
import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
var counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[这里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-03.kt)获取完整代码。
这是针对此类特定问题的最快解决方案。它适用于普通计数器、集合、队列和其他标准数据结构以及它们的基本操作。然而,它并不容易被扩展来应对复杂状态、或一些没有现成的线程安全实现的复杂操作。
### 以细粒度限制线程
_限制线程_ 是解决共享可变状态问题的一种方案:对特定共享状态的所有访问权都限制在单个线程中。它通常应用于 UI 程序中:所有 UI 状态都局限于单个事件分发线程或应用主线程中。这在协程中很容易实现,通过使用一个单线程上下文:
```kotlin
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 将每次自增限制在单线程上下文中
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[这里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-04.kt)获取完整代码。
这段代码运行非常缓慢,因为它进行了 _细粒度_ 的线程限制。每个增量操作都得使用[withContext(counterContext)] 块从多线程 [Dispatchers.Default](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html) 上下文切换到单线程上下文。
### 以粗粒度限制线程
在实践中,线程限制是在大段代码中执行的,例如:状态更新类业务逻辑中大部分都是限于单线程中。下面的示例演示了这种情况,在单线程上下文中运行每个协程。
```kotlin
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// 将一切都限制在单线程上下文中
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[这里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-05.kt)获取完整代码。
```
Completed 100000 actions in xxx ms
Counter = 100000
```
这段代码运行更快而且打印出了正确的结果。
### 互斥
该问题的互斥解决方案:使用永远不会同时执行的 _关键代码块_来保护共享状态的所有修改。在阻塞的世界中,你通常会为此目的使用 `synchronized` 或者 `ReentrantLock`。在协程中的替代品叫做 [Mutex](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html) 。它具有 [lock](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html)和 [unlock](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html)方法,可以隔离关键的部分。关键的区别在于 `Mutex.lock()` 是一个挂起函数,它不会阻塞线程。
还有 [withLock](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html) 扩展函数,可以方便的替代常用的`mutex.lock(); try { …… } finally { mutex.unlock() }` 模式:
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 用锁保护每次自增
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[这里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-06.kt)获取完整代码。
```
Completed 100000 actions in xxx ms
Counter = 100000
```
此示例中锁是细粒度的,因此会付出一些代价。但是对于某些必须定期修改共享状态的场景,它是一个不错的选择,但是没有自然线程可以限制此状态。
### Actors
一个 [actor](https://en.wikipedia.org/wiki/Actor_model) 是由协程、被限制并封装到该协程中的状态以及一个与其它协程通信的 _通道_ 组合而成的一个实体。一个简单的 actor 可以简单的写成一个函数,但是一个拥有复杂状态的 actor 更适合由类来表示。
有一个 [actor](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html) 协程构建器,它可以方便地将 actor 的邮箱通道组合到其作用域中(用来接收消息)、组合发送 channel 与结果集对象,这样对 actor 的单个引用就可以作为其句柄持有。
使用 actor 的第一步是定义一个 actor 要处理的消息类。Kotlin 的[密封类](https://kotlinlang.org/docs/reference/sealed-classes.html)很适合这种场景。我们使用 `IncCounter` 消息(用来递增计数器)和 `GetCounter` 消息(用来获取值)来定义 `CounterMsg` 密封类。后者需要发送回复。[CompletableDeferred](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-completable-deferred/index.html) 通信原语表示未来可知(可传达)的单个值,这里被用于此目的。
```kotlin
// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求
```
接下来我们定义一个函数,使用 [actor](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html) 协程构建器来启动一个 actor:
```kotlin
// 这个函数启动一个新的计数器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor 状态
for (msg in channel) { // 即将到来消息的迭代器
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
```
main 函数代码很简单:
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同个动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求
// 这个函数启动一个新的计数器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor 状态
for (msg in channel) { // 即将到来消息的迭代器
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
//sampleStart
fun main() = runBlocking<Unit> {
val counter = counterActor() // 创建该 actor
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// 发送一条消息以用来从一个 actor 中获取计数值
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // 关闭该actor
}
//sampleEnd
```
> 可以在[这里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-07.kt)获取完整代码。
```
Completed 100000 actions in xxx ms
Counter = 100000
```
actor 本身执行时所处上下文(就正确性而言)无关紧要。一个 actor 是一个协程,而一个协程是按顺序执行的,因此将状态限制到特定协程可以解决共享可变状态的问题。实际上,actor 可以修改自己的私有状态,但只能通过消息互相影响(避免任何锁定)。
actor 在高负载下比锁更有效,因为在这种情况下它总是有工作要做,而且根本不需要切换到不同的上下文。
> 注意,[actor](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html) 协程构建器是一个双重的 [produce](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html) 协程构建器。一个 actor 与它接收消息的通道相关联,而一个 producer 与它发送元素的通道相关联。
[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
[CompletableDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-completable-deferred/index.html
[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html
[withLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html
[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
- 前言
- Kotlin简介
- IntelliJ IDEA技巧总结
- idea设置类注释和方法注释模板
- 像Android Studion一样创建工程
- Gradle
- Gradle入门
- Gradle进阶
- 使用Gradle创建一个Kotlin工程
- 环境搭建
- Androidstudio平台搭建
- Eclipse的Kotlin环境配置
- 使用IntelliJ IDEA
- Kotlin学习路线
- Kotlin官方中文版文档教程
- 概述
- kotlin用于服务器端开发
- kotlin用于Android开发
- kotlin用于JavaScript开发
- kotlin用于原生开发
- Kotlin 用于数据科学
- 协程
- 多平台
- 新特性
- 1.1的新特性
- 1.2的新特性
- 1.3的新特性
- 开始
- 基本语法
- 习惯用法
- 编码规范
- 基础
- 基本类型
- 包与导入
- 控制流
- 返回与跳转
- 类与对象
- 类与继承
- 属性与字段
- 接口
- 可见性修饰符
- 扩展
- 数据类
- 密封类
- 泛型
- 嵌套类
- 枚举类
- 对象
- 类型别名
- 内嵌类
- 委托
- 委托属性
- 函数与Lambda表达式
- 函数
- Lambda表达式
- 内联函数
- 集合
- 集合概述
- 构造集合
- 迭代器
- 区间与数列
- 序列
- 操作概述
- 转换
- 过滤
- 加减操作符
- 分组
- 取集合的一部分
- 取单个元素
- 排序
- 聚合操作
- 集合写操作
- List相关操作
- Set相关操作
- Map相关操作
- 多平台程序设计
- 平台相关声明
- 以Gradle创建
- 更多语言结构
- 解构声明
- 类型检测与转换
- This表达式
- 相等性
- 操作符重载
- 空安全
- 异常
- 注解
- 反射
- 作用域函数
- 类型安全的构造器
- Opt-in Requirements
- 核心库
- 标准库
- kotlin.test
- 参考
- 关键字与操作符
- 语法
- 编码风格约定
- Java互操作
- Kotlin中调用Java
- Java中调用Kotlin
- JavaScript
- 动态类型
- kotlin中调用JavaScript
- JavaScript中调用kotlin
- JavaScript模块
- JavaScript反射
- JavaScript DCE
- 原生
- 并发
- 不可变性
- kotlin库
- 平台库
- 与C语言互操作
- 与Object-C及Swift互操作
- CocoaPods集成
- Gradle插件
- 调试
- FAQ
- 协程
- 协程指南
- 基础
- 取消与超时
- 组合挂起函数
- 协程上下文与调度器
- 异步流
- 通道
- 异常处理与监督
- 共享的可变状态与并发
- Select表达式(实验性)
- 工具
- 编写kotlin代码文档
- 使用Kapt
- 使用Gradle
- 使用Maven
- 使用Ant
- Kotlin与OSGI
- 编译器插件
- 编码规范
- 演进
- kotlin语言演进
- 不同组件的稳定性
- kotlin1.3的兼容性指南
- 常见问题
- FAQ
- 与Java比较
- 与Scala比较(官方已删除)
- Google开发者官网简介
- Kotlin and Android
- Get Started with Kotlin on Android
- Kotlin on Android FAQ
- Android KTX
- Resources to Learn Kotlin
- Kotlin样品
- Kotlin零基础到进阶
- 第一阶段兴趣入门
- kotlin简介和学习方法
- 数据类型和类型系统
- 入门
- 分类
- val和var
- 二进制基础
- 基础
- 基本语法
- 包
- 示例
- 编码规范
- 代码注释
- 异常
- 根类型“Any”
- Any? 可空类型
- 可空性的实现原理
- kotlin.Unit类型
- kotlin.Nothing类型
- 基本数据类型
- 数值类型
- 布尔类型
- 字符型
- 位运算符
- 变量和常量
- 语法和运算符
- 关键字
- 硬关键字
- 软关键字
- 修饰符关键字
- 特殊标识符
- 操作符和特殊符号
- 算术运算符
- 赋值运算符
- 比较运算符
- 逻辑运算符
- this关键字
- super关键字
- 操作符重载
- 一元操作符
- 二元操作符
- 字符串
- 字符串介绍和属性
- 字符串常见方法操作
- 字符串模板
- 数组
- 数组介绍创建及遍历
- 数组常见方法和属性
- 数组变化以及下标越界问题
- 原生数组类型
- 区间
- 正向区间
- 逆向区间
- 步长
- 类型检测与类型转换
- is、!is、as、as-运算符
- 空安全
- 可空类型变量
- 安全调用符
- 非空断言
- Elvis操作符
- 可空性深入
- 可空性和Java
- 函数
- 函数式编程概述
- OOP和FOP
- 函数式编程基本特性
- 组合与范畴
- 在Kotlin中使用函数式编程
- 函数入门
- 函数作用域
- 函数加强
- 命名参数
- 默认参数
- 可变参数
- 表达式函数体
- 顶层、嵌套、中缀函数
- 尾递归函数优化
- 函数重载
- 控制流
- if表达式
- when表达式
- for循环
- while循环
- 循环中的 Break 与 continue
- return返回
- 标签处返回
- 集合
- list集合
- list集合介绍和操作
- list常见方法和属性
- list集合变化和下标越界
- set集合
- set集合介绍和常见操作
- set集合常见方法和属性
- set集合变换和下标越界
- map集合
- map集合介绍和常见操作
- map集合常见方法和属性
- map集合变换
- 集合的函数式API
- map函数
- filter函数
- “ all ”“ any ”“ count ”和“ find ”:对集合应用判断式
- 别样的求和方式:sumBy、sum、fold、reduce
- 根据人的性别进行分组:groupBy
- 扁平化——处理嵌套集合:flatMap、flatten
- 惰性集合操作:序列
- 区间、数组、集合之间转换
- 面向对象
- 面向对象-封装
- 类的创建及属性方法访问
- 类属性和字段
- 构造器
- 嵌套类(内部类)
- 枚举类
- 枚举类遍历&枚举常量常用属性
- 数据类
- 密封类
- 印章类(密封类)
- 面向对象-继承
- 类的继承
- 面向对象-多态
- 抽象类
- 接口
- 接口和抽象类的区别
- 面向对象-深入
- 扩展
- 扩展:为别的类添加方法、属性
- Android中的扩展应用
- 优化Snackbar
- 用扩展函数封装Utils
- 解决烦人的findViewById
- 扩展不是万能的
- 调度方式对扩展函数的影响
- 被滥用的扩展函数
- 委托
- 委托类
- 委托属性
- Kotlin5大内置委托
- Kotlin-Object关键字
- 单例模式
- 匿名类对象
- 伴生对象
- 作用域函数
- let函数
- run函数
- with函数
- apply函数
- also函数
- 标准库函数
- takeIf 与 takeUnless
- 第二阶段重点深入
- Lambda编程
- Lambda成员引用高阶函数
- 高阶函数
- 内联函数
- 泛型
- 泛型的分类
- 泛型约束
- 子类和子类型
- 协变与逆变
- 泛型擦除与实化类型
- 泛型类型参数
- 泛型的背后:类型擦除
- Java为什么无法声明一个泛型数组
- 向后兼容的罪
- 类型擦除的矛盾
- 使用内联函数获取泛型
- 打破泛型不变
- 一个支持协变的List
- 一个支持逆变的Comparator
- 协变和逆变
- 第三阶段难点突破
- 注解和反射
- 声明并应用注解
- DSL
- 协程
- 协程简介
- 协程的基本操作
- 协程取消
- 管道
- 慕课霍丙乾协程笔记
- Kotlin与Java互操作
- 在Kotlin中调用Java
- 在Java中调用Kotlin
- Kotlin与Java中的操作对比
- 第四阶段专题练习
- 朱凯Kotlin知识点总结
- Kotlin 基础
- Kotlin 的变量、函数和类型
- Kotlin 里那些「不是那么写的」
- Kotlin 里那些「更方便的」
- Kotlin 进阶
- Kotlin 的泛型
- Kotlin 的高阶函数、匿名函数和 Lambda 表达式
- Kotlin协程
- 初识
- 进阶
- 深入
- Kotlin 扩展
- 会写「18.dp」只是个入门——Kotlin 的扩展函数和扩展属性(Extension Functions / Properties)
- Kotlin实战-开发Android