안드로이드 개발자 노트
[코틀린 코루틴] 플로우 처리 본문
플로우는 값이 흐르기 때문에 여러 가지 방법으로 변형하는 것도 가능하다.
플로우 생성과 최종 연산 사이의 연산들을 플로우 처리(flow processing)라고 한다.
이 포스팅에서 예제는 다이어그램을 사용하여 원소가 어떻게 바뀌는지 보여준다.
수평선은 시간, 선 위의 원소는 각 시간마다 플로우에서 내보낸 값들이다.
위쪽 선은 연산이 실행되기 전의 플로우를 나타내며, 아래쪽 선은 연산이 끝난 뒤의 플로우를 나타낸다.
map
map은 새로운 플로우를 생성한 뒤 변형된 원소들을 하나씩 내보낸다.
inline fun <T, R> Flow<T>.map(
transform: suspend (value: T) -> R
): Flow<R> = flow {
collect { value ->
emit(transform(value))
}
}
suspend fun main() {
flowOf(1, 2, 3) // [1, 2, 3]
.map { it * it } // [1, 4, 9]
.collect { print(it) } // 149
}
filter
filter 함수는 Flow에서 조건(predicate)에 맞는 값만 걸러내어 방출하는 중간 연산자이다.
제공된 조건을 만족하지 않는 값은 무시되고, 다음 단계로 전달되지 않는다.
inline fun <T> Flow<T>.filter(
predicate: suspend (T) -> Boolean
): Flow<T> = flow {
collect { value ->
if (predicate(value)) {
emit(value)
}
}
}
suspend fun main() {
(1..10).asFlow() // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
.filter { it <= 5 } // [1, 2, 3, 4, 5]
.filter { isEven(it) } // [2, 4]
.collect { print(it) } // 24
}
fun isEven(num: Int): Boolean = num % 2 == 0
take와 drop
특정 수의 원소만 통과시키기 위한 take 함수이다.
suspend fun main() {
('A'..'Z').asFlow()
.take(5) // [A, B, C, D, E]
.collect { print(it) } // ABCDE
}
drop 함수를 사용하면 특정 수의 원소를 무시할 수 있다.
suspend fun main() {
('A'..'Z').asFlow()
.drop(20) // [U, V, W, X, Y, Z]
.collect { print(it) } // UVWXYZ
}
컬렉션 처리는 어떻게 작동할까?
다음은 플로우 처리 과정 및 map과 flowOf를 간단하게 구현한 예제이다.
suspend fun main() {
flowOf('a', 'b')
.map { it.uppercase() }
.collect { print(it) } // AB
}
fun <T, R> Flow<T>.map(
transform: suspend (value: T) -> R
): Flow<R> = flow {
collect { value ->
emit(transform(value))
}
}
fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
flowOf와 map함수를 인라인으로 구현하면 다음과 같다.
suspend fun main() {
flow map@{ // 1
flow flowOf@{ // 2
for (element in arrayOf('a', 'b')) { // 3
this@flowOf.emit(element) // 4
}
}.collect { value -> // 5
this@map.emit(value.uppercase()) // 6
}
}.collect { // 7
print(it) // 8
}
}
- 1에서 플로우를 시작하고 7에서 원소들은 모은다.
- 7에서 모으기 시작할 때 @map 람다식을 수행한다.
- 이 람다식은 2에서 또 다른 빌더를 호출하고 5에서 원소들을 모은다.
- 이 원소들을 5에서 원소들을 모을때 2에서 시작하는 @flowOf 람다식(3)을 수행한다.
- 이 람다식(3)은 'a'와 'b'를 가진 배열을 탐색한다.
- 첫 번째 값인 'a'를 4에서 내보내며, 5의 람다식이 호출된다.
- 5의 람다식은 값을 'A'로 변경하며 @map 플로우로 내보낸다.
- 이후 7의 람다식이 호출되어 값을 출력하고 7의 람다식이 종료된다.
- 다시 6의 람다식이 재개된다.
- 6의 람다식이 종료되어 4의 @flowOf가 재개된다.
- 3의 탐색이 다시 시작되어 4에서 'b'를 내보낸다.
- 5에서 람다식이 호출되어 'B'로 값을 변형한 뒤 6에서 @map 플로우로 값을 내보낸다.
- 값은 7에서 모이며 8에서출력된다.
- 7의 람다식이 종료되므로 6의 람다식이 재개된다.
- 이 람다식은 종료되었기 때문에 4의 @flowOf 람다식이 재개된다.
- 4도 종료되었기 때문에 5의 collect에서 @map이 재개된다.
- 더 이상 남은 것이 없기 때문에 @map의 마지막 부분에 도달한다.
- 7의 collect에서 다시 시작하면 main 함수의 끝에 도달한다.
merge, zip, combine
n개의 플로우를 하나의 플로우로 합치는 것도 가능하다.
가장 간단한 방법은 n개의 플로우에서 생성된 원소들을 하나로 합치는 것으로, merge를 사용해 이런 과정을 수행할 수 있다.
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
print(together.toList())
// [1, 0.1, 0.2, 0.3, 2, 3]
// or [1, 0.1, 0.2, 0.3, 2, 3]
// or [0.1, 1, 2, 3, 0.2, 0.3]
// or any other combination
}
merge를 사용하면 한 플로우의 원소가 다른 플로우를 기다리지 않는다.
첫 번째 플로우의 원소 생성이 지연된다고 해서 두 번째 플로우의 원소 생성이 중단되지는 않는다.
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
.onEach { delay(1000) }
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
together.collect { println(it) }
}
//0.1
//0.2
//0.3
//(1초 후)
//1
//(1초 후)
//2
//(1초 후)
//3
zip 함수는 두 플로우로부터 쌍을 만든다.
원소가 쌍을 이루는 방법을 정하는 함수도 필요하며, 각 원소는 한 쌍의 일부가 되므로 쌍이 될 원소를 기다린다.
한 쪽 플로우가 완료되면 남은 원소는 쌍을 이루지 못하고 유실된다.
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach { delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
flow1.zip(flow2) { f1, f2 -> "${f1}_${f2}" }
.collect { println(it) }
}
//(1초 후)
//A_1
//(1초 후)
//B_2
//(1초 후)
//C_3
//(4 유실됨)
마지막으로 combine 함수는 zip처럼 원소들로 쌍을 형성하며, 모든 새로운 원소가 전임자를 대체하게 된다.
combine은 zip과 다르게 두 플로우 모두 닫힐 때까지 원소를 내보낸다.
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach { delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
flow1.combine(flow2) { f1, f2 -> "${f1}_${f2}" }
.collect { println(it) }
}
//B_1
//C_1
//C_2
//C_3
//C_4
뷰가 감지 가능한 원소 두 가지 중 하나라도 변경될 때 반응해야 하는 경우 combine을 주로 사용한다.
예를 들어 알림 뱃지가 현재 사용 상태와 알림 모두에 영향을 받는다면, 두 가지 모두를 감시하고 변경된 것을 합쳐 뷰를 갱신할 수 있다.
updateStateFlow
.combine(notificationsFlow) { userState, notifications ->
updateNotificationBadge(userState, notifications)
}
.collect()
fold와 scan
fold 함수는 Flow의 모든 원소를 순차적으로 처리하여 하나의 최종 결과값을 생성하는 최종 연산자이며, Flow의 각 원소를 초기값과 함께 주어진 연산 함수에 적용해 누적값을 계산한다.
fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.fold(0) { acc, i -> acc + i }
println(res) // 10
val res2 = list.fold(1) { acc, i -> acc * i }
println(res2) // 24
}
//(4초 후)
//10
scan은 누적되는 과정의 모든 값을 생성하는 중간 연산이다.
suspend fun main() {
flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
.scan(0) { acc, v -> acc + v }
.collect { println(it) }
}
//0
//(1초 후)
//1
//(1초 후)
//3
//(1초 후)
//6
//(1초 후)
//10
flatMapConcat, flatMapMerge, flatMapLastest
컬렉션의 경우 flatMap은 평탄화된 컬렉션을 반환합니다.
플로우에서 flatMap은 비동기 작업을 수행하며 데이터를 순차적 또는 병렬적으로 스트림을 방출한다.
flatMapConcat 함수는 생성된 플로우를 하나씩 처리한다.
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapConcat { flowFrom(it) }
.collect { println(it) }
}
//(1초 후)
//1_A
//(1초 후)
//2_A
//(1초 후)
//3_A
//(1초 후)
//1_B
//(1초 후)
//2_B
//(1초 후)
//3_B
//(1초 후)
//1_C
//(1초 후)
//2_C
//(1초 후)
//3_C
flatMapMerge는 만들어진 플로우를 동시에 처리한다.
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapMerge { flowFrom(it) }
.collect { println(it) }
}
//(1초 후)
//1_A
//1_B
//1_C
//(1초 후)
//2_B
//2_C
//2_A
//(1초 후)
//3_C
//3_A
//3_B
concurrency 인자를 사용해 동시에 처리할 수 있는 플로우 수를 설정할 수 있으며, 설정하지 않은 기본값은 16이다.
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapMerge(concurrency = 2) { flowFrom(it) }
.collect { println(it) }
}
//(1초 뒤)
//1_B
//1_A
//(1초 뒤)
//2_B
//2_A
//(1초 뒤)
//3_A
//3_B
//(1초 뒤)
//1_C
//(1초 뒤)
//2_C
//(1초 뒤)
//3_C
async 함수를 사용해 flatMapMerge와 같은 처리를 할 수 있다.
suspend getOffers(
categories: List<Category>
): List<Offer> = coroutineScope {
categories
.map { async { api.requestOffers(it) }
.flatMap { it.await() }
}
그러나 flatMapMerge로 구현했을 때 두 가지 이점이 있다.
- flatMapMerge는 concurrency 파라미터를 통해 몇 개의 하위 플로우를 동시에 실행할지 지정할 수 있다.
(이 기능은 병렬 작업의 수를 제한하거나, 시스템 자원을 효율적으로 사용할 때 유용하다) - flatMapMerge는 각 하위 작업이 완료되기를 기다리지 않고, 데이터가 생성되는 즉시 상위 플로우로 방출한다.
(이는 하위 플로우의 실행 순서와 무관하게 데이터를 소비자가 더 빠르게 사용할 수 있게 한다)
suspend getOffers(
categories: List<Category>
): Flow<Offer> = categories
.asFlow()
.flatMapMerge(concurrency = 20) {
flow { emit(api.requestOffers(it)) }
}
마지막으로 flatMapLatest 함수는 새로운 값이 방출되면 이전에 실행 중이던 하위 플로우를 취소하고, 가장 최신값에 대해 새로운 하위 플로우를 실행한다.
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapLatest { flowFrom(it) }
.collect { println(it) }
}
//(1초 후)
//1_C
//(1초 후)
//2_C
//(1초 후)
//3_C
시작 플로우를 생성할 때 지연이 발생하면 다음과 같이 실행된다.
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${elem} " }
suspend fun main() {
flowOf("A", "B", "C")
.onEach { delay(1200) }
.flatMapLatest { flowFrom(it) }
.collect { println(it) }
}
//(2.2초 후)
//1_A
//(1.2초 후)
//1_B
//(1.2초 후)
//1_C
//(1초 후)
//2_C
//(1초 후)
//3_C
재시도(retry)
예외가 발생하면 플로우의 각 단계는 순차적으로 종료되고 비활성화된다.
따라서 예외 이후에 메시지를 방출할 수는 없지만, 각 단계는 이전 단계에 대한 참조를 가지고 있어 플로우를 다시 시작할 수 있다.
'Kotlin > 코틀린 코루틴' 카테고리의 다른 글
[코틀린 코루틴] 공유플로우와 상태플로우 (0) | 2024.12.14 |
---|---|
[코틀린 코루틴] 플로우 생명주기 함수 (0) | 2024.12.05 |
[코틀린 코루틴] 플로우 만들기 (0) | 2024.12.01 |