반응형
Notice
Recent Posts
Recent Comments
Link
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

안드로이드 개발자 노트

[코틀린 코루틴] 플로우의 실제 구현 본문

Kotlin/코틀린 코루틴

[코틀린 코루틴] 플로우의 실제 구현

어리둥절범고래 2024. 11. 29. 00:53
반응형

코틀린 코루틴의 플로우는 생각하는 것보다 간단한 개념이며, 중간 가능한 람다식에 몇 가지 요소를 추가한 거라고 생각하면 된다.

 

 

Flow 이해하기

 

간단한 람다식을 실제 플로우처럼 만들어보는 예제이다.

먼저, 간단한 람다식이다.

fun main() {
    val f: () -> Unit = {
        print("A")
        print("B")
        print("C")
    }
    f() // ABC
    f() // ABC
}

여기서 내부에 지연이 있는 람다식 suspend로 만들면 다음과 같다.

suspend fun main() {
    val f: suspend () -> Unit = {
        print("A")
        delay(1000)
        print("B")
        delay(1000)
        print("C")
    }
    f()
    f()
}
//A
//(1초 후)
//B
//(1초 후)
//C
//A
//(1초 후)
//B
//(1초 후)
//C

람다식은 순차적으로 호출되기 때문에, 이전 호출이 완료되기 전에 같은 람다식을 추가적으로 호출할 수 없다.

다음으로 파라미터를 받는 람다식으로 수정하고, 이 파라미터를 emit이라고 해보면 다음과 같다.

suspend fun main() {
    val f: suspend ((String) -> Unit) -> Unit = { emit ->
        emit("A")
        emit("B")
        emit("C")
    }
    f { print(it) } // ABC
    f { print(it) } // ABC
}

이때 emit은 중단함수가 되어야 한다.

emit이라는 중단 가능한 추상 메서드를 가진 FlowCollector 함수형 인터페이스를 정의하면 람다식으로 구현할 수 있으며, 다음과 같이 구현할 수 있다.

fun interface FlowCollector {
    suspend fun emit(value: String)
}

suspend fun main() {
    val f: suspend (FlowCollector) -> Unit = {
        it.emit("A")
        it.emit("B")
        it.emit("C")
    }
    f { print(it) } // ABC
    f { print(it) } // ABC
}

it에서 emit을 호출하는 것 또한 불편하므로, FlowCollector를 리시버로 만든다.

fun interface FlowCollector {
    suspend fun emit(value: String)
}

suspend fun main() {
    val f: suspend FlowCollector.() -> Unit = {
        emit("A")
        emit("B")
        emit("C")
    }
    f { print(it) } // ABC
    f { print(it) } // ABC
}

람다식을 전달하는 대신에, 인터페이스를 구현한 객체를 만드는 편이 구조적 설계, 확장성, 재사용성 면에서 더 낫다.

이때 인터페이스를 Flow라 하고, 해당 인터페이스의 정의는 객체 표현식으로 래핑하면 된다.

fun interface FlowCollector {
    suspend fun emit(value: String)
}

interface Flow {
    suspend fun collect(collector: FlowCollector)
}

suspend fun main() {
    val builder: suspend FlowCollector.() -> Unit = {
        emit("A")
        emit("B")
        emit("C")
    }
    val flow: Flow = object : Flow {
        override suspend fun collect(
            collector: FlowCollector
        ) {
            collector.builder()
        }
    }
    flow.collect { print(it) } // ABC
    flow.collect { print(it) } // ABC
}

다음으로 플로우 생성을 간단하게 만들기 위한 flow 빌더를 정의하면 다음과 같다.

fun interface FlowCollector {
    suspend fun emit(value: String)
}

interface Flow {
    suspend fun collect(collector: FlowCollector)
}

fun flow(
    builder: suspend FlowCollector.() -> Unit
) = object : Flow {
    override suspend fun collect(collector: FlowCollector) {
        collector.builder()
    }
}

suspend fun main() {
    val f: Flow = flow {
        emit("A")
        emit("B")
        emit("C")
    }
    f.collect { print(it) } // ABC
    f.collect { print(it) } // ABC
}

마지막으로 타입에 상관없이 값을 방출하고 모으기 위해 String을 제네릭 타입으로 바꾸면 다음과 같다.

fun interface FlowCollector<T> {
    suspend fun emit(value: T)
}

interface Flow<T> {
    suspend fun collect(collector: FlowCollector<T>)
}

fun <T> flow(
    builder: suspend FlowCollector<T>.() -> Unit
) = object : Flow<T> {
    override suspend fun collect(
        collector: FlowCollector<T>
    ) {
        collector.builder()
    }
}

suspend fun main() {
    val f: Flow<String> = flow {
        emit("A")
        emit("B")
        emit("C")
    }
    f.collect { print(it) } // ABC
    f.collect { print(it) } // ABC
}

위에서 플로우를 구현한 방식은 실제 Flow, FlowCollector, flow와 거의 동일하며, 다른 빌더 내부에서도 flow를 사용한다.

 

 

Flow 처리 방식

 

플로우의 각 원소를 반환하는 map 함수는 새로운 플로우를 만들기 때문에 flow 빌더를 사용하며, 플로우의 각 방출된 값(emit)을 다른 값으로 변환한다.

fun <T, R> Flow<T>.map(
    transformation: suspend (T) -> R
): Flow<R> = flow {
    collect {
        emit(transformation(it))
    }
}

suspend fun main() {
    flowOf("A", "B", "C")
        .map {
            delay(1000)
            it.lowercase()
        }
        .collect { println(it) }
}
// (1초 후)
// a
// (1초 후)
// b
// (1초 후)
// c

 

동기로 작동하는 Flow

 

플로우는 중단 함수처럼 동작하며, 플로우가 완료될 때까지 collect 호출은 중단됩니다.
또한, 플로우의 각 처리 단계는 동기적으로 실행되며, onEach 함수는 모든 원소가 처리된 후가 아니라, 각 원소 사이에서 연산을 수행한다.

suspend fun main() {
    flowOf("A", "B", "C")
        .onEach { delay(1000) }
        .collect { println(it) }
}

 

플로우와 공유 상태

 

플로우를 구현할 때 변수의 동기화 시점을 정확히 이해하는 것이 중요하다.

변수나 상태가 여러 코루틴에서 동시에 접근 및 수정될 가능성이 있을 때 동기화가 필요하며, 이를 간과하면 경합 상태나 데이터 불일치 문제가 발생할 수 있다.

 

플로우 내부에 변경 가능한 변수를 정의하면 플로우가 수집될 때마다 새로운 변수를 생성하기 때문에, 변수의 값이 다른 코루틴과 공유되지 않으므로 동기화가 필요 없다.

fun Flow<*>.counter() = flow<Int> {
    var counter = 0
    collect {
        counter++
        List(100) { Random.nextLong() }.shuffled().sorted()
        emit(counter)
    }
}

suspend fun main(): Unit = coroutineScope {
    val f1 = List(1000) { "$it" }.asFlow()
    val f2 = List(1000) { "$it" }.asFlow()
        .counter()
    
    launch { println(f1.counter().last()) } // 1000
    launch { println(f1.counter().last()) } // 1000
    launch { println(f2.last()) } // 1000
    launch { println(f2.last()) } // 1000
    // 정상적으로 출력됨.
}

 

외부 변수는 모든 코루틴이 공유할 수 있으므로 동기화가 필수이다.

fun Flow<*>.counter(): Flow<Int> {
    var counter = 0
    return this.map {
        counter++
        // to make it busy for a while
        List(100) { Random.nextLong() }.shuffled().sorted()
        counter
    }
}

suspend fun main(): Unit = coroutineScope {
    val f1 = List(1_000) { "$it" }.asFlow()
    val f2 = List(1_000) { "$it" }.asFlow()
        .counter()
    
    launch { println(f1.counter().last()) } // 1000
    launch { println(f1.counter().last()) } // 1000
    launch { println(f2.last()) } // less than 2000
    launch { println(f2.last()) } // less than 2000
}

 

위 예제와 같은 경우 변수가 플로우 컬렉션이 아니라 플로우에 종속되며 두 개의 코루틴이 병렬로 원소를 세게 되어 f2.last()는 1000이 아니라 2000을 반환하게 된다.

 

마지막으로 플로우에서 사용하는 변수가 함수 외부, 클래스의 스코프, 최상위 레벨에서 정의되어 있으면 마찬가지로 동기화가 필요하다.

var counter = 0

fun Flow<*>.counter(): Flow<Int> = this.map {
    counter++
    // to make it busy for a while
    List(100) { Random.nextLong() }.shuffled().sorted()
    counter
}

suspend fun main(): Unit = coroutineScope {
    val f1 = List(1_000) { "$it" }.asFlow()
    val f2 = List(1_000) { "$it" }.asFlow()
        .counter()
    
    launch { println(f1.counter().last()) } // less than 4000
    launch { println(f1.counter().last()) } // less than 4000
    launch { println(f2.last()) } // less than 4000
    launch { println(f2.last()) } // less than 4000
}
반응형