안드로이드 개발자 노트
[코틀린 코루틴] 플로우의 실제 구현 본문
코틀린 코루틴의 플로우는 생각하는 것보다 간단한 개념이며, 중간 가능한 람다식에 몇 가지 요소를 추가한 거라고 생각하면 된다.
Flow 이해하기
간단한 람다식을 실제 플로우처럼 만들어보는 예제이다.
먼저, 간단한 람다식이다.
fun main() {
val f: () -> Unit = {
print("A")
print("B")
print("C")
}
f() // ABC
f() // ABC
}
여기서 내부에 지연이 있는 람다식 suspend로 만들면 다음과 같다.
suspend fun main() {
val f: suspend () -> Unit = {
print("A")
delay(1000)
print("B")
delay(1000)
print("C")
}
f()
f()
}
//A
//(1초 후)
//B
//(1초 후)
//C
//A
//(1초 후)
//B
//(1초 후)
//C
람다식은 순차적으로 호출되기 때문에, 이전 호출이 완료되기 전에 같은 람다식을 추가적으로 호출할 수 없다.
다음으로 파라미터를 받는 람다식으로 수정하고, 이 파라미터를 emit이라고 해보면 다음과 같다.
suspend fun main() {
val f: suspend ((String) -> Unit) -> Unit = { emit ->
emit("A")
emit("B")
emit("C")
}
f { print(it) } // ABC
f { print(it) } // ABC
}
이때 emit은 중단함수가 되어야 한다.
emit이라는 중단 가능한 추상 메서드를 가진 FlowCollector 함수형 인터페이스를 정의하면 람다식으로 구현할 수 있으며, 다음과 같이 구현할 수 있다.
fun interface FlowCollector {
suspend fun emit(value: String)
}
suspend fun main() {
val f: suspend (FlowCollector) -> Unit = {
it.emit("A")
it.emit("B")
it.emit("C")
}
f { print(it) } // ABC
f { print(it) } // ABC
}
it에서 emit을 호출하는 것 또한 불편하므로, FlowCollector를 리시버로 만든다.
fun interface FlowCollector {
suspend fun emit(value: String)
}
suspend fun main() {
val f: suspend FlowCollector.() -> Unit = {
emit("A")
emit("B")
emit("C")
}
f { print(it) } // ABC
f { print(it) } // ABC
}
람다식을 전달하는 대신에, 인터페이스를 구현한 객체를 만드는 편이 구조적 설계, 확장성, 재사용성 면에서 더 낫다.
이때 인터페이스를 Flow라 하고, 해당 인터페이스의 정의는 객체 표현식으로 래핑하면 된다.
fun interface FlowCollector {
suspend fun emit(value: String)
}
interface Flow {
suspend fun collect(collector: FlowCollector)
}
suspend fun main() {
val builder: suspend FlowCollector.() -> Unit = {
emit("A")
emit("B")
emit("C")
}
val flow: Flow = object : Flow {
override suspend fun collect(
collector: FlowCollector
) {
collector.builder()
}
}
flow.collect { print(it) } // ABC
flow.collect { print(it) } // ABC
}
다음으로 플로우 생성을 간단하게 만들기 위한 flow 빌더를 정의하면 다음과 같다.
fun interface FlowCollector {
suspend fun emit(value: String)
}
interface Flow {
suspend fun collect(collector: FlowCollector)
}
fun flow(
builder: suspend FlowCollector.() -> Unit
) = object : Flow {
override suspend fun collect(collector: FlowCollector) {
collector.builder()
}
}
suspend fun main() {
val f: Flow = flow {
emit("A")
emit("B")
emit("C")
}
f.collect { print(it) } // ABC
f.collect { print(it) } // ABC
}
마지막으로 타입에 상관없이 값을 방출하고 모으기 위해 String을 제네릭 타입으로 바꾸면 다음과 같다.
fun interface FlowCollector<T> {
suspend fun emit(value: T)
}
interface Flow<T> {
suspend fun collect(collector: FlowCollector<T>)
}
fun <T> flow(
builder: suspend FlowCollector<T>.() -> Unit
) = object : Flow<T> {
override suspend fun collect(
collector: FlowCollector<T>
) {
collector.builder()
}
}
suspend fun main() {
val f: Flow<String> = flow {
emit("A")
emit("B")
emit("C")
}
f.collect { print(it) } // ABC
f.collect { print(it) } // ABC
}
위에서 플로우를 구현한 방식은 실제 Flow, FlowCollector, flow와 거의 동일하며, 다른 빌더 내부에서도 flow를 사용한다.
Flow 처리 방식
플로우의 각 원소를 반환하는 map 함수는 새로운 플로우를 만들기 때문에 flow 빌더를 사용하며, 플로우의 각 방출된 값(emit)을 다른 값으로 변환한다.
fun <T, R> Flow<T>.map(
transformation: suspend (T) -> R
): Flow<R> = flow {
collect {
emit(transformation(it))
}
}
suspend fun main() {
flowOf("A", "B", "C")
.map {
delay(1000)
it.lowercase()
}
.collect { println(it) }
}
// (1초 후)
// a
// (1초 후)
// b
// (1초 후)
// c
동기로 작동하는 Flow
플로우는 중단 함수처럼 동작하며, 플로우가 완료될 때까지 collect 호출은 중단됩니다.
또한, 플로우의 각 처리 단계는 동기적으로 실행되며, onEach 함수는 모든 원소가 처리된 후가 아니라, 각 원소 사이에서 연산을 수행한다.
suspend fun main() {
flowOf("A", "B", "C")
.onEach { delay(1000) }
.collect { println(it) }
}
플로우와 공유 상태
플로우를 구현할 때 변수의 동기화 시점을 정확히 이해하는 것이 중요하다.
변수나 상태가 여러 코루틴에서 동시에 접근 및 수정될 가능성이 있을 때 동기화가 필요하며, 이를 간과하면 경합 상태나 데이터 불일치 문제가 발생할 수 있다.
플로우 내부에 변경 가능한 변수를 정의하면 플로우가 수집될 때마다 새로운 변수를 생성하기 때문에, 변수의 값이 다른 코루틴과 공유되지 않으므로 동기화가 필요 없다.
fun Flow<*>.counter() = flow<Int> {
var counter = 0
collect {
counter++
List(100) { Random.nextLong() }.shuffled().sorted()
emit(counter)
}
}
suspend fun main(): Unit = coroutineScope {
val f1 = List(1000) { "$it" }.asFlow()
val f2 = List(1000) { "$it" }.asFlow()
.counter()
launch { println(f1.counter().last()) } // 1000
launch { println(f1.counter().last()) } // 1000
launch { println(f2.last()) } // 1000
launch { println(f2.last()) } // 1000
// 정상적으로 출력됨.
}
외부 변수는 모든 코루틴이 공유할 수 있으므로 동기화가 필수이다.
fun Flow<*>.counter(): Flow<Int> {
var counter = 0
return this.map {
counter++
// to make it busy for a while
List(100) { Random.nextLong() }.shuffled().sorted()
counter
}
}
suspend fun main(): Unit = coroutineScope {
val f1 = List(1_000) { "$it" }.asFlow()
val f2 = List(1_000) { "$it" }.asFlow()
.counter()
launch { println(f1.counter().last()) } // 1000
launch { println(f1.counter().last()) } // 1000
launch { println(f2.last()) } // less than 2000
launch { println(f2.last()) } // less than 2000
}
위 예제와 같은 경우 변수가 플로우 컬렉션이 아니라 플로우에 종속되며 두 개의 코루틴이 병렬로 원소를 세게 되어 f2.last()는 1000이 아니라 2000을 반환하게 된다.
마지막으로 플로우에서 사용하는 변수가 함수 외부, 클래스의 스코프, 최상위 레벨에서 정의되어 있으면 마찬가지로 동기화가 필요하다.
var counter = 0
fun Flow<*>.counter(): Flow<Int> = this.map {
counter++
// to make it busy for a while
List(100) { Random.nextLong() }.shuffled().sorted()
counter
}
suspend fun main(): Unit = coroutineScope {
val f1 = List(1_000) { "$it" }.asFlow()
val f2 = List(1_000) { "$it" }.asFlow()
.counter()
launch { println(f1.counter().last()) } // less than 4000
launch { println(f1.counter().last()) } // less than 4000
launch { println(f2.last()) } // less than 4000
launch { println(f2.last()) } // less than 4000
}
'Kotlin > 코틀린 코루틴' 카테고리의 다른 글
[코틀린 코루틴] 플로우 만들기 (0) | 2024.12.01 |
---|---|
[코틀린 코루틴] 플로우란 무엇인가? (0) | 2024.11.24 |
[코틀린 코루틴] 핫 데이터 소스와 콜드 데이터 소스 (0) | 2024.11.24 |