企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] `SharedFlow` 和 `StateFlow` 是 Kotlin Coroutines 库提供的两种流类型,它们都用于管理和处理流数据,但在使用场景和特性上有所不同。 ## flowOn切换线程 ### 协程在IO线程 ```kotlin private fun testFlowOn() { lifecycleScope.launch(Dispatchers.IO) { flow { Log.d("TAGTAG", "emit ${Thread.currentThread().name}") emit(1) }.onStart { Log.d("TAGTAG", "onStart ${Thread.currentThread().name}") }.flowOn(Dispatchers.Main).onEach { Log.d("TAGTAG", "onEach ${Thread.currentThread().name}") }.flowOn(Dispatchers.IO).onCompletion { Log.d("TAGTAG", "onCompletion ${Thread.currentThread().name}") }.collect { Log.d("TAGTAG", "collect ${Thread.currentThread().name}") } } } ``` 输出: ```log 14:44:31.038 D onStart main 14:44:31.038 D emit main 14:44:31.039 D onEach DefaultDispatcher-worker-1 14:44:31.039 D collect DefaultDispatcher-worker-1 14:44:31.044 D onCompletion DefaultDispatcher-worker-1 ``` ### 协程在主线程 ```kotlin private fun testFlowOn() { lifecycleScope.launch(Dispatchers.Main) { flow { Log.d("TAGTAG", "emit ${Thread.currentThread().name}") emit(1) }.onStart { Log.d("TAGTAG", "onStart ${Thread.currentThread().name}") }.flowOn(Dispatchers.Main).onEach { Log.d("TAGTAG", "onEach ${Thread.currentThread().name}") }.flowOn(Dispatchers.IO).onCompletion { Log.d("TAGTAG", "onCompletion ${Thread.currentThread().name}") }.collect { Log.d("TAGTAG", "collect ${Thread.currentThread().name}") } } } ``` 输出结果 ```log 14:46:27.449 D onStart main 14:46:27.449 D emit main 14:46:27.451 D onEach DefaultDispatcher-worker-1 14:46:27.453 D collect main 14:46:27.454 D onCompletion main ``` ### 总结 1. `flowOn`操作符对上游范围有效, 范围是指两个`flowOn`之间,如果只有一个`flowOn`,则上游全部有效; 2. 最后一个`flowOn`后的操作所在线程与当前整个`flow`所在的线程池相同,即 `collect`和`onCompletion`的执行线程只与所启动的协程所在的线程有关; ## Flow、StateFlow、SharedFlow ### 热流与冷流 **热流**和**冷流**是关于数据流的两个基本概念,它们描述了数据流何时开始以及如何传递事件的方式。 - **热流**是一种主动的数据流。它在创建时就开始发射事件,无论是否有观察者订阅。即使没有观察者,热流也会持续产生事件。当观察者订阅时,它只是加入了已经运行的数据流,开始接收当前已经产生的事件。 - **冷流**是一种被动的数据流。它在有观察者订阅时才开始发射事件。每个观察者都会获得相同的事件序列,而不会受到其他观察者的影响。 `SharedFlow` 和 `StateFlow`都是热流。即没有观察者,数据会持续更新,与`LiveData`类似。 其中`MutableSharedFlow`与`MutableStateFlow`是它们的可变类型。 ### `StateFlow` #### 特性 * **持久状态**:`StateFlow` 总是持有最新的状态值,并且当新的观察者(collector)开始收集时,它会立即收到当前的状态。 * **可变性**:`StateFlow` 是可变的,可以通过 `.value` 属性进行读取和更新。 * **广播给所有观察者**:每次更新值时,`StateFlow` 会广播最新的状态给所有当前的观察者。 * **`Replay` 缓存**:`StateFlow` 总是将最新的状态重播给新观察者,不需要设置重播缓存大小。 #### 使用场景 * **UI 状态管理**:适用于需要持续更新和持有最新状态值的场景,例如在 ViewModel 中管理 UI 状态。 * **单一来源的状态更新**:适用于只有一个数据源负责更新状态的场景。 ```kotlin private val _state = MutableStateFlow("Initial State") val state: StateFlow<String> get() = _state fun updateState(newState: String) { _state.value = newState } // 观察者 viewModel.state.collect { newState -> // 更新UI } ``` #### MutableStateFlow `MutableStateFlow`的构造函数有一个默认参数,即初始状态值。以下是`MutableStateFlow`构造函数: ```kotlin public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL) ``` 构造函数中的`value`参数表示`MutableStateFlow`的初始状态值。在创建`MutableStateFlow`时,需要提供这个初始状态值。 ### `SharedFlow` #### 特性 * **多播**:`SharedFlow` 支持多播,可以有多个同时活跃的观察者。 * **无初始值**:与 `StateFlow` 不同,`SharedFlow` 不持有初始值。 * **重播缓存**:可以配置重播缓存的大小,以便新观察者能够接收到一定数量的最近发射的值。 * **完全控制**:相比于 `StateFlow`,`SharedFlow` 提供了更多的控制,例如重播缓冲区大小、过期策略等。 #### 使用场景 * **事件流**:适用于需要发送一次性事件的场景,例如导航事件、错误消息等。 * **广播消息**:适用于需要广播消息给多个观察者的场景,且这些消息不需要持久化状态。 ```kotlin private val _events = MutableSharedFlow<String>(replay = 2) val events: SharedFlow<String> get() = _events fun sendEvent(event: String) { viewModelScope.launch { _events.emit(event) } } // 观察者 viewModel.events.collect { event -> // 处理事件 } ``` #### MutableSharedFlow **MutableSharedFlow**是一种可变的、用于创建共享流的类。下面是MutableSharedFlow的一些主要构造函数参数及其默认值: ```kotlin public fun <T> MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : MutableSharedFlow<T> { /*...*/ } ``` 1. `replay`: 表示在订阅时从流中回放的元素数量。默认值为 0,表示不回放任何元素。如果设置为正整数 n,则在订阅时将向新订阅者回放最近的 n 个元素。 2. `extraBufferCapacity`: 表示额外的缓冲容量,用于存储订阅者尚未消耗的元素。默认值为 0,表示不使用额外的缓冲容量。设置为正整数 m 时,会在内部使用一个带有额外 m 容量的缓冲区。 3. `onBufferOverflow`: 表示在缓冲区溢出时的处理策略。默认值为 BufferOverflow.SUSPEND,表示当缓冲区溢出时暂停发射,等待订阅者消费。其他选项还包括 BufferOverflow.DROP_OLDEST 和 BufferOverflow.DROP_LATEST,它们分别表示在缓冲区溢出时丢弃最老的元素或最新的元素。 ### 如何选择 1. **需要持久状态且最新值可供新观察者立即获取**: * 使用 `StateFlow`。它适用于需要始终持有最新状态的场景,如 UI 状态管理。 2. **需要广播事件且事件不需要持久化**: * 使用 `SharedFlow`。它适用于一次性事件流或广播消息的场景。 3. **多个观察者同时接收同样的数据**: * `SharedFlow` 提供了多播能力,并且可以配置重播缓存以满足需求。 ### 总结 * **`StateFlow`** 是一种特别适用于状态管理的流,它始终持有最新的状态值,适合单一数据源更新的场景。 * **`SharedFlow`** 提供了更灵活的事件处理能力,支持多播和重播缓存,适合处理一次性事件或广播消息。