一、概念

* 场景:List 是同步一次性返回多个值,Sequence 是同步(阻塞)分开返回多个值,Suspend 是异步返回单个值,Flow可以异步返回多个值。
*
冷流:只能在创建的时候定义生产数据的代码,无法在外部通过实例更新数据。创建和中间操作只是对上一步的包装并不会执行,所以是响应式编程(也叫声明式),因此可以调用挂起函数自身却不是suspend修饰,只有消费的时候才真正生产和操作数据,所以消费都是suspend修饰得在携程中调用。
* 一些名词:构建→上游(前面的流)→当前操作→下游(后面的流)→消费。
* 有序:元素遵循先进先出原则,是一个接一个操作完,而不是统一操作再进行下一步。
* 协作取消:消费的取消只能在可取消的挂起函数挂起的时候取消(即跟随协程取消)。
* 切换线程:默认生产和消费在同一协程上下文中,消费所在的协程决定了生产所在的线程。对于生产可以通过 flowOn
改变上流执行线程而不会影响下流。对于消费可通过 launchIn 改变流执行的线程(不会影响生产中的flowOn)。
*
背压:当数据的消费速度赶不上生产速度。默认情况下生产一个消费一个是同步交替进行的不存在背压,除非使用flowOn切换线程。由于消费操作是挂起函数(协程能轻松切换线程),挂起恢复的特性会阻塞数据生产的速度不会对下流产生背压。
* 防抖:新值的内容发生变化才会消费,相同会被忽略。
FlowChannelSharedFlow
类型冷流(数据只能在流内生产)热流(数据在流外生产然后传递给流)
数据产生消费时才会生产数据。不消费也会生产数据,旧值可以缓存在内存中。
关系
与订阅者是一对一关系(多个订阅者彼此之间独立,数据是完整重新发送)。

与订阅者是一对多关系(多个订阅者轮流接收,收到的不是同一个值)。

与订阅者是一对多关系(多个订阅者同时接收,收到的是同一个值)。

关闭流会自动关闭(停止订阅或者数据数据生产完)。构造创建的不会自动关闭,构建器创建的会跟随协程关闭。
构造创建的不会自动关闭,转换的启动模式配置为WhileSubscribed会超时关闭。
初始值无有
二、生产者 producer

用来生产数据,发送值到流中。

2.1 flow

创建Flow的基本方法,需要手动调用 emit 发射单个值,调用 emitAll 发射另一个Flow中的值。emit 不是线程安全的,不应该并发调用(可考虑
channelFlow),不应该在内部调用withContext切换线程(可考虑flowOn操作符)。

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() ->
Unit): Flow<T> val flow = flow { emit(5) for (i in 1..3) { emit(i) }
emitAll(flowOf(1, 2, 3)) }
2.2 flowOf

快速创建固定值集的Flow(类似 listOf() )。

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) { emit(element) }
} val flow = flowOf(1, 2, 3)
2.3 asFlow

遍历其它容器(Array、Range、Sequence、Iterable、Iterator)将元素发送到流中。也可以将普通函数或挂起函数返回的值发送到流中。

public fun Array.asFlow(): Flow<Int> = flow { forEach { value -> emit(value) }
}

public fun XXArray.asFlow(): Flow<Int> = flow { forEach { value -> emit(value)
} }

public fun XXRange.asFlow(): Flow<Int> = flow { forEach { value -> emit(value)
} }

public fun <T> Sequence<T>.asFlow(): Flow<T> = flow { forEach { value ->
emit(value) } }

public fun <T> Iterable<T>.asFlow(): Flow<T> = flow { forEach { value ->
emit(value) } }

public fun <T> Iterator<T>.asFlow(): Flow<T> = flow { forEach { value ->
emit(value) } }

public fun <T> (() -> T).asFlow(): Flow<T> = flow { emit(invoke()) }

public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow { emit(invoke()) }
val flow = listOf(1, 2, 3).asFlow()
2.4 callbackFlow

将回调API改造成Flow。底层使用的sendChannel,默认容量64满了会挂起直到消费出空位,为了避免生产被挂起可以配置为CONFLATED或者
UNLIMITED。

public fun <T> callbackFlow(@BuilderInference block: suspend
ProducerScope<T>.() -> Unit): Flow<T>
send ()发送数据。
offer ()允许在协程外提交。
sendBlocking ()尝试用offer,失败则用runBlocking{ send() }阻塞式提交。
awaitClose ()Flow关闭时执行,用来释放资源(注销回调函数),未调用报错 IllegalStateException。 fun
showWithFlow(): Flow<Int> = callbackFlow { //1.创建Flow并发送值(实现Callback接口) val
callback = object: NetCallback { override fun onNextValue(value: Int) { try {
offer(num) } catch(t: Throwable) {...} } override fun onError(ecxeption:
Throwable) { cancel(CancellationException("API发生错误", exception)) } override fun
onCompleted() = close() } //2.注册回调(传参使用,并对API进行配置操作) getData(callback)
//3.取消协程并注销回调(用来释放API资源) awaitClose {...} }
2.5 emptyFlow

返回一个空的Flow。

public fun <T> emptyFlow(): Flow<T> = EmptyFlow val flow = emptyFlow<Int>()
2.6 channelFlow

支持缓冲通道,线程安全,允许不同的CorotineContext发送事件。生产和消费可以异步进行,不像flow默认生产一个消费一个是同步交替进行(使用缓存操作符就不会了)。

public fun <T> channelFlow(@BuilderInference block: suspend
ProducerScope<T>.() -> Unit): Flow<T> val flow = channelFlow<Int> { }
三、中间操作 Intermediaries

执行一些操作,不会立即执行,返回的还是Flow。

3.1 遍历

onEach
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

3.2 过滤出

filter
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) ->
Boolean): Flow<T>

保留符合条件的值。

filterNot
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) ->
Boolean): Flow<T>

保留不符合条件的值。

filterNotNull
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T>

保留不为null的值。

filterIsInstance
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter {
it is R } as Flow<R>

保留对应类型的值(类型填到泛型里面)。

3.3 线程切换

生产默认执行在消费处所在的协程指定的线程上,生产一个消费一个是同步的。指定线程后,生产和消费是异步进行。

flowOn
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> 

flowOn使上游执行在指定的线程上,不会影响下游。上游还有 flowOn 的时候,只影响他们之间的那些操作。下游默认还是执行在消费所在的线程。

3.4 背压 Backpressure

数据的消费速度赶不上生产速度。默认情况下生产一个消费一个是同步交替进行的不存在背压,emit和collect都是挂起函数会等待对方准备好,最终耗时是每个值生产和消耗的时间总和。buffer使生产和消费并发运行提高效率,设置缓冲区大小(从0开始算的),消费来不及就先把值缓存起来等待着。对于缓存区满了有三种策略:

* BufferOverflow.SUSPEND模式:默认模式,缓冲区满了就和默认情况一样挂起(等待消费),消费一个再生产一个。
* BufferOverflow.DROP_OLDEST模式:缓冲区满了就丢弃还没消费掉的旧值,存入新值。
* BufferOverflow.DROP_LATEST模式:缓冲区满了就丢弃后来生产的新值,直到有空位了再往里存又生产的值(不是丢弃掉的那些)。
buffer
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow:
BufferOverflow = BufferOverflow.SUSPEND): Flow<T> 

参数capacity是缓冲区大小,参数onBufferOverflow是对缓冲区满后新值的处理模式。

conflate
public fun <T> Flow<T>.conflate(): Flow<T>

会丢弃中间值,只消费首尾值。相当于 buffer(0 , BufferOverflow.DROP_OLDEST)。

3.5 转换

map 和 filter 底层就是使用的 transform。transform 能 emit 任意次数,flow 中每个值都会循环一遍 transform
中的 emit,而 map 只能对值转换。

transform
transformLatest
transformWhile
public inline fun <T, R> Flow<T>.transform(@BuilderInference crossinline
transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>

可以 emit 任意次数,flow 中每个值都会循环一遍 transform 中的 emit。

public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend
FlowCollector<R>.(value: T) -> Unit): Flow<R>

只对最后一个值消费。

public fun <T, R> Flow<T>.transformWhile(@BuilderInference transform: suspend
FlowCollector<R>.(value: T) -> Boolean): Flow<R>

为true继续消费,为false后续值都丢弃。

map
mapLatest
mapNotNull
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T)
-> R): Flow<R>

对值进行转换后再emit。

public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend
(value: T) -> R): Flow<R>

只对最后一个值转换,会丢弃其它值。

public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform:
suspend (value: T) -> R?): Flow<R>

转换会丢弃null值。
val flow = (1..3).asFlow() //map只能转换元素 flow.map { "[$it-a]" }.collect {
print(it) } //打印:[1-a][2-a][3-a] //mapNotNull不处理null值 flow{ emit(1) emit(2)
emit(null) emit(3) }.mapNotNull { it?.plus(1) }.collect { print("$it,") }
//打印:2,3,4 //flow中每个值都会循环一遍transform中的emit flow.transform { emit("[$it-a]")
emit("[$it-b]") }.collect { print(it) } //打印:[1-a][1-b][2-a][2-b][3-a][3-b]
//transform实现filter和map flow.transform { emit(it + 1) }.transform { if (it % 2
== 0) emit(it) }.collect { print("$it,") } //打印:2,4 flow.transformLatest {
delay(1000) //此处不延迟的话会每个值都处理而不是只处理最后一个 emit(it*10) }.collect { print("$it,") }
//打印:20 flow.transformWhile { emit(it) it != 3 }.collect { print("$it,") }
//打印:1,2
3.6 截取

take
takeWhile
public fun <T> Flow<T>.take(count: Int): Flow<T> 

获取几个值,丢弃剩下的。

public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>

满足条件就获取值,只要碰到不满足条件的值就丢弃剩下全部值(即便剩下的里面有满足条件的),第一个值就不满足会全部丢弃。

drop
dropWhile
public fun <T> Flow<T>.drop(count: Int): Flow<T>

丢弃几个值,获取剩下的。

public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>

满足条件就丢弃值,只要碰到不满足条件的值就获取剩下全部值(即便剩下的里面有满足条件的),第一个值就不满足会全部获取。
val flow = (1..5).asFlow() flow.take(3).collect { print(",$it") } //打印:1,2,3
flow.takeWhile { it > 3 }.collect { print(",$it") } //打印:无内容
flow.drop(3).collect { print(",$it") } //打印:4,5 flow.dropWhile { it == 3
}.collect { print(",$it") } //打印:1,2,3,4,5
3.7 合并

merge
public fun <T> merge(vararg flows: Flow<T>): Flow<T>

将两个Flow中的值连接起来。

zip
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1,
T2) -> R): Flow<R>

将两个Flow中同索引的值根据条件合并成一个值(两个值是并行各自计算出来后合并的),长度短的Flow执行完就结束,长度长的Folw多的值会被舍弃。使用场景:两件无关的事情同时做再一起显示,例如当前天气和未来7天天气没有先后依赖关系却要同时显示,先后请求效率低一些可以同时做。当然3个请求同时并发处理也是可以的。

combine
public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a:
T1, b: T2) -> R): Flow<R>

将两个Flow中同索引的值根据条件合并成一个值,短的Flow最后一个值重复跟长的Flow剩下的值合并。
val flow1 = flowOf(1, 2, 3, 4, 5) val flow2 = flowOf('a', 'b', 'c')
merge(flow1, flow2).collect { print(",$it") } //打印:1,2,3,4,5,a,b,c merge(flow2,
flow1).collect { print(",$it") } //打印:a,b,c,1,2,3,4,5 flow1.zip(flow2) { a, b
-> "[$a$b]" }.collect { print(it) } //打印:[1a][2b][3c] flow2.zip(flow1) { a, b
-> "[$a$b]" }.collect { print(it) } //打印:[a1][b2][c3] flow1.combine(flow2) { a,
b -> "[$a$b]" }.collect { print(it) } //打印:[1a][2b][3c][4c][5c]
flow2.combine(flow1) { a, b -> "[$a$b]" }.collect { print(it) }
//打印:[a1][b2][c3][c4][c5] fun one(): Flow<String> = flow { emit(a) } fun two():
Flow<String> = flow { emit(b) } fun three(): Flow<String> = flow { emit(c) }
one() .zip(two()) {a, b -> user.name = a user.age = b user }.zip(three()) {
user, c -> user.gender = c user }.collect { user -> //... }
3.8 展平多维值

flattenConcat
flattenMerge
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>

将Flow中的多维值都展平然后全部连接起来。

public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int =
DEFAULT_CONCURRENCY): Flow<T>

将Flow中的多维值都展平然后全部连接起来,可以设置并发数。
val flow1 = flowOf(1, 2, 3, 4, 5) val flow2 = flowOf('a', 'b', 'c', 'd', 'e',
'f') val flow3 = flowOf(flow1, flow2) flow3.flattenMerge(2).collect {
print(",$it") } //打印:1,2,3,4,5,a,b,c,d,e,f flow3.flattenConcat().collect {
print(",$it") } //打印:1,2,3,4,5,a,b,c,d,e,f
3.9 组合操作

flatMapContact
flatMapMerge
flatMapLatest
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) ->
Flow<R>): Flow<R> = map(transform).flattenConcat()

流中流,(同步处理数据?)严格按照顺序执行(外流值执行完内流操作,才轮到下一个外流值)。使用场景:前一个值依赖于另一个值,例如获取用户信息依赖于token授权,先发送一个请求获取token再发送一个请求获取信息。

public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>): Flow<R> =
map(transform).flattenMerge(concurrency)

流中流,并发处理数据不保证顺序(delay(耗时?)短的会先执行)。

public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline
transform: suspend (value: T) -> Flow<R>): Flow<R> = transformLatest {
emitAll(transform(it)) }

流中流,外流再次发送新值的时候,内流没操作完就会被取消,然后开始处理新的一轮。
fun getToken(): Flow<String> = Flow { emit(token) } //获取token fun
getUserInfo(): Flow<String> = Flow { emit(info) } //获取用户信息 getToken()
.flatMapContact { token -> getUserInfo(token) }.flowOn(Dispatches.IO) .collect
{ userInfo -> println(userInfo) } flowOf(300, 200, 100) .flatMapMerge { flow {
delay(it.toLong()) emit("a$it") emit("b$it") } }.collect { println(it) }
//打印:a100,b100,a200,b200,a300,b300 flow { emit(1) delay(150) emit(2) delay(50)
emit(3) }.flatMapLatest { flow { delay(100) emit("$it") } }.collect {
println(it) } //打印:1,3
3.10 状态回调

注意:链式调用中出现多个onStart { action }时,后出现的 action 会先执行,因为后续 onStart 构建的下游流包在了上游
onStart 的外面,并且 action 会在收集上游流数据之前执行。而这个结论却不能沿用到onCompletion { action },虽然
onCompletion 构建的下游流也包裹在上游 onCompletion 外面,但是 action 总是在收集上游流之后执行。

onStart
onCompletion

onEmpty

public fun <T> Flow<T>.onStart(action: suspend FlowCollector<T>.() -> Unit):
Flow<T> 

在数据生产之前调用。(可以用来做耗时操作之前的操作,例如UI展示进度条)

public fun <T> Flow<T>.onCompletion(action: suspend FlowCollector<T>.(cause:
Throwable?) -> Unit): Flow<T>

在数据消费完后(参数case = null)或者出现异常时(case !=
null)调用。只能判断有没有发生异常,不能捕获异常。(可以用来做耗时操作结束后的操作,例如UI隐藏进度条)

public fun <T> Flow<T>.onEmpty(action: suspend FlowCollector<T>.() -> Unit):
Flow<T>

在flow完成却未发送任何值时调用。(可以用来发送默认值)

3.11 重试

retry当上游发生异常时可以重新执行几次。retryWhen 的简化版。
retryWhen有条件的进行重试 ,lambda 中有两个参数: 一个是 异常原因,一个是当前重试的 index (从0开始)。lambda 的返回值 为
Boolean ,true则继续重试 ,false 则结束重试。 (1..3).asFlow().onEach{ if(it == 3) throw
Exception("发生异常") delay(100) println("生产:$it") }.retry(2){ it.message = "有异常"
}.catch{ ex -> println("捕获异常:${ex.massage}") //用it试试 }.collect{
println("消费:$it") }
3.12 规约(发送每步计算的结果)

scan
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation:
suspend (accumulator: R, value: T) -> R): Flow<R>

会把初始值和每一步的操作结果发送出去。
(1..3).asFlow() .onEach { delay(200) } .scan(1) { acc, v -> acc + v } .collect
{ print("$it,") } //打印:1,2,4,7
3.13 去重

distinctUntilChanged
distinctUntilChangedBy
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T>

过滤用,下面一个的简化版本。连续两个值一样,则跳过发送。

public fun <T> Flow<T>.distinctUntilChanged(): Flow<T>

去重操作符,判断连续的两个值是否重复,可以选择是否丢弃重复值。

3.14 防抖

debounce
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>

用来确保前后两次值之间存在指定的时间间隔,大于等于间隔时间会发送前一个值,小于间隔时间只保留最后一个,肯定会发送最后一个值。使用场景:例如搜索框等用户停顿一段时间再去显示搜索建议,而不是每个字都搜索(网络延迟低于输入速度得到的数据也无意义)。

sample
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T>

周期性在指定时间后采样一个值。使用场景:例如数据量很大只需要取少量的时候,像同一时间存在大量的弹幕,只取其中一条显示不可能全部显示。
flow { emit(1) emit(2) delay(600) emit(3) delay(100) emit(4) delay(100)
emit(5) }.debounce(500) //只发送 2、5 flow { while(true) { emit("弹幕") }
}.sample(1000) //每一秒只取一个值 .flowOn(Dispatches.IO) //由于是死循环需要放在子线程
3.15 Android生命周期

生命周期低于目标状态会取消上游,不影响下游,需要注意调用顺序。

flowWithLifecycle 
public fun <T> Flow<T>.flowWithLifecycle(
    lifecycle: Lifecycle,
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED
): Flow<T> = callbackFlow {
    lifecycle.repeatOnLifecycle(minActiveState) {
        this@flowWithLifecycle.collect {
            send(it)
        }
    }
    close()
}

四、消费者 Consumer

最终操作,会触发流的执行,返回的是结果。

4.1 收集

collect
public suspend fun collect(collector: FlowCollector<T>)

收集Flow中的值。

collectIndexed
public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action:
suspend (index: Int, value: T) -> Unit): Unit

collectLatest
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) ->
Unit) = mapLatest(action).buffer(0).collect()

当新值到来时,前一个值自己内部没处理完就开始处理新的。
flow { var count = 0 while (true) { emit(count) delay(1000) count++ }
}.collectLatest { println("start $it") delay(2000) println("end $it") }
//打印:start 0,start 1,start 2,start 3.....
4.2 线程切换

launchIn
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() }

在指定的其它协程作用域中消费,而不是当前协程中。指定该协程作用域的上下文也就相当于切换了线程,记得接着调用join()。
flowOf(1,2,3) .launchIn(CoroutineScope(Dispatchers.IO)) //相当于切换了线程 .join()
4.3 规约(累计到单个值)

reduce

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator:
S, value: T) -> S): S

前一个值和后一个值进行指定运算,再拿结果跟第三个值运算,以此类推得到最终值。(1和2计算后,结果和3运算,结果再和4运算...返回最终结果)。

fold
public suspend inline fun <T, R> Flow<T>.fold( initial: R, crossinline
operation: suspend (acc: R, value: T) -> R): R

带初始值,同上(初始值和1计算后,结果和2计算,结果再和3运算...返回最终结果)。

runningFold区别于 fold ,就是返回一个新流,将每步的结果发射出去。
runningReduce区别于 reduce ,就是返回一个新流,将每步的结果发射出去。 val flow = (1..5).asFlow()
println(flow.reduce({ a, b -> a + b })) //打印:15 println(flow.fold(2, { a, b ->
a + b })) //打印:17
4.4 转换到其它容器

toCollection
toList
toSet将结果转到到集合。
将结果转换为List。
将结果转换为Set。
asLiveData
public fun <T> Flow<T>.asLiveData(
    context: CoroutineContext =
EmptyCoroutineContext,//区块所执行的协程上下文,默认+Main.immediate
    timeoutInMs: Long = DEFAULT_TIMEOUT//没有观察者后,多少毫秒后取消区块,默认5s。
): LiveData<T>

将结果转到LiveData。

asStateFlow转为StateFlow。
asShareFlow转为SharedFlow。
4.5 取最值

last

lastOrNull

first

firstOrNull

single

singleOrNull
获取最后一个值,为null会抛异常 NoSuchElementException。
获取最后一个值,可以为null。
获取第一个值,如果为空会抛异常 NoSuchElementException。
获取第一个元值,可以为null。
接收流发送的第一个值 ,区别于first,如果为空或者发了不止一个值,则都会报错。
接收流发送的第一个值 ,可以为努力了,发出多值的话除第一个,后面均被置为null。
count返回流发送值的个数。类似list.size,注:sharedFlow无效(无意义)
五、流的取消

5.1 协作取消

Flow在协程中消费,因此协程的取消会一并取消Flow。Flow{ }形式的创建对 emit() 操作附加了 ensureActive()
以检测协程的取消,但出于性能原因其它构建形式和操作不会自行检查以响应,因此需要手动调用 cancellable() 检查。

cancellable( )public fun <T> Flow<T>.cancellable(): Flow<T> val flow = flow {
repeat(5) { emit(it) } } withTimeout(2500) { flow.collect { println(it) } }
//该构建方式会对发射值附加取消检测 flow { repeat(5) { emit(it) } }.collect { if (it == 3)
cancel() } //其它构建方式需要手动检查 (1..5).asFlow().cancellable().collect { println(it)
if (it == 3) cancel() }
六、异常处理

在flow构建器或处理值的时候可能发生异常,异常会向下流动关闭每个处理步骤,如果没有调用catch,未捕获的异常会立即取消该flow,在消费时重新抛出。可以在flow内部使用
try-finally 语句,也可以在flow外部使用 try-catch 语句。在Android开发中 catch()
可以用来展示错误信息或者显示默认数据(例如空列表)。

catch
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(Throwable) ->
Unit): Flow<T>

只能捕获上游的异常,上游还有catch那就是它俩捕获之间的,捕获后下游不会再捕获得到。可以在代码块中使用 throw 再次抛出、可以使用 emit
转换为值发射、或其他处理。
flow { emit(1) emit(2) throw NullPointerException() }.map { it +1 }.catch {
println("catch:$it") }.collect { println("collect:$it") } //打印: //collect:2
//collect:3 //catch:java.lang.NullPointerException
七、Flow和Suspend区别

FlowSuspend
作用连续性的异步数据流(多个值)一次性的异步任务(单个值)
场景实时数据频繁刷新(点赞数、)无需更新数据(文章内容)
八、一些Android示例
fun updateNews() { flow{} .onStart { showProgressBar() } .onCompletion {
hideProgressBar() } .onEach { view.show(it) } .catch { view.handleError(it) }
.launchIn(viewModelScope) }
待整理

回调

onSubionSharedFlow 专属操作符 (StateFlow是SharedFlow 的一种特殊实现)。
变换

receiveAsFlow
consumeAsFlow将Channel 转换为Flow ,可以有多个观察者,但不是多播,可能会轮流收到值。
将Channel 转换为Flow ,但不能多个观察者(会crash)!
withIndex将结果包装成IndexedValue类型。
produceIn转换为ReceiveChannel , 不常用。
组合

conbineTransform顾名思义 combine+ transform

技术
今日推荐
PPT
阅读数 121
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:766591547
关注微信