반응형
Notice
Recent Posts
Recent Comments
Link
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

안드로이드 개발자 노트

[코틀린 코루틴] 공유플로우와 상태플로우 본문

Kotlin/코틀린 코루틴

[코틀린 코루틴] 공유플로우와 상태플로우

어리둥절범고래 2024. 12. 14. 20:59
반응형

공유플로우

 

공유플로우를 통해 메세지를 보내면 대기하고 있는 모든 코루틴이 수신하게 된다.

suspend fun main(): Unit = coroutineScope {
    val mutableSharedFlow =
        MutableSharedFlow<String>(replay = 0)
    // or MutableSharedFlow<String>()

    launch {
        mutableSharedFlow.collect {
            println("#1 received $it")
        }
    }
    launch {
        mutableSharedFlow.collect {
            println("#2 received $it")
        }
    }

    delay(1000)
    mutableSharedFlow.emit("Message1")
    mutableSharedFlow.emit("Message2")
}
//#1 received Message1
//#2 received Message1
//#1 received Message2
//#2 received Message2

 

MutableSharedFlow는 replay 인자를 설정하여 마지막으로 전송한 값들이 정해진 수만큼 저장할 수 있다.

resetReplayCache를 사용하면 캐시를 초기화할 수 있다.

suspend fun main(): Unit = coroutineScope {
    val mutableSharedFlow = MutableSharedFlow<String>(
        replay = 2,
    )
    mutableSharedFlow.emit("Message1")
    mutableSharedFlow.emit("Message2")
    mutableSharedFlow.emit("Message3")

    println(mutableSharedFlow.replayCache)
    // [Message2, Message3]

    launch {
        mutableSharedFlow.collect {
            println("#1 received $it")
        }
        // #1 received Message2
        // #1 received Message3
    }

    delay(100)
    mutableSharedFlow.resetReplayCache()
    println(mutableSharedFlow.replayCache) // []
}

 

SendChannel, ReceiveChannel, Channel과 같이 코틀린에서는 감지만 하는 인터페이스와 변경하는 인터페이스를 구분하는 것이 관행이다.

마찬가지로 MustableSharedFlow는 Flow를 상속하고 감지하는 목적의 SharedFlow와 값을 내보내는 목적의 FlowCollector를 모두 상속한다.

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    override suspend fun emit(value: T)
    fun tryEmit(value: T): Boolean
    val subscriptionCount: StateFlow<Int>
    fun resetReplayCache()
}

interface SharedFlow<out T> : Flow<T> {
    val replayCache: List<T>
    override suspend fun collect(collector: FlowCollector<T>): Nothing
}

fun interface FlowCollector<in T> {
    suspend fun emit(value: T)
}

SharedFlow와 FlowCollector 인터페이스는 값을 내보내거나 수집하는 함수만 노출하기 위해 주로 사용된다.

suspend fun main(): Unit = coroutineScope {
    val mutableSharedFlow = MutableSharedFlow<String>()
    val sharedFlow: SharedFlow<String> = mutableSharedFlow
    val collector: FlowCollector<String> = mutableSharedFlow

    launch {
        mutableSharedFlow.collect {
            println("#1 received $it")
        }
    }
    launch {
        sharedFlow.collect {
            println("#2 received $it")
        }
    }

    delay(1000)
    mutableSharedFlow.emit("Message1")
    collector.emit("Message2")
}

 

 

shareIn

 

shareIn 함수는 콜드 플로우를 공유 가능한 핫 플로우로 변환하기 위한 확장 함수이다.

이를 통해 플로우를 여러 소비자가 동시에 사용할 수 있도록 캐싱하거나 데이터를 재활용할 수 있다.

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B", "C")
        .onEach { delay(1000) }

    val sharedFlow: SharedFlow<String> = flow.shareIn(
        scope = this,
        started = SharingStarted.Eagerly,
        // replay = 0 (default)
    )

    delay(500)

    launch {
        sharedFlow.collect { println("#1 $it") }
    }

    delay(1000)

    launch {
        sharedFlow.collect { println("#2 $it") }
    }

    delay(1000)

    launch {
        sharedFlow.collect { println("#3 $it") }
    }
}
//(1초 후)
//#1 A
//(1초 후)
//#1 B
//#2 B
//(1초 후)
//#1 C
//#2 C
//#3 C

 

shareIn은 다음과 같은 인자를 받는다.

 

  • scope: shareIn으로 변환된 핫 플로우가 실행될 코루틴 범위를 지정하며, 이 범위가 종료되면 핫 플로우도 종료된다.
  • started: 공유 시작 시점과 종료 조건을 결정하는 전략을 설정한다.
  • replay: 가장 최근의 값들을 캐싱할 수 있는 갯수를 설정한다.
fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

두 번째 인자인 started는 다음과 같은 옵션을 지원하며, SharingStarted 인터페이스를 구현하여 커스텀화된 전략을 정의하는 것도 가능하다.

- SharingStarted.Eagerly: 즉시 값을 감지하기 시작하며, 시작하기 전에 값이 나오면 유실될 수 있다.

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B", "C")

    val sharedFlow: SharedFlow<String> = flow.shareIn(
        scope = this,
        started = SharingStarted.Eagerly,
    )

    delay(100)
    launch {
        sharedFlow.collect { println("#1 $it") }
    }
    print("Done")
}
//(0.1초 후)
//Done

 

- SharingStarted.Lazily: 첫 구독자가 생길 때 감지하기 시작하며, 모든 값을 수신하는 것이 보장된다.

suspend fun main(): Unit = coroutineScope {
    val flow1 = flowOf("A", "B", "C")
    val flow2 = flowOf("D")
        .onEach { delay(1000) }

    val sharedFlow = merge(flow1, flow2).shareIn(
        scope = this,
        started = SharingStarted.Lazily,
    )

    delay(100)
    launch {
        sharedFlow.collect { println("#1 $it") }
    }
    delay(1000)
    launch {
        sharedFlow.collect { println("#2 $it") }
    }
}
//(0.1초 후)
//#1 A
//#1 B
//#1 C
//(1초 후)
//#1 D
//#2 D

 

- SharingStarted.WhileSubscribed(timeoutMillis: Long): 첫 번재 구독자가 나올때 감지하기 시작, 마지막 구독자가 사라지면 플로

우도 멈춘다. 멈추고 난 후 timeoutMillis 이전에 새로운 구독자가 나오면 플로우가 다시 시작된다.

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B", "C", "D")
        .onStart { println("Started") }
        .onCompletion { println("Finished") }
        .onEach { delay(1000) }

    val sharedFlow = flow.shareIn(
        scope = this,
        started = SharingStarted.WhileSubscribed(),
    )

    delay(3000)
    launch {
        println("#1 ${sharedFlow.first()}")
    }
    launch {
        println("#2 ${sharedFlow.take(2).toList()}")
    }
    delay(3000)
    launch {
        println("#3 ${sharedFlow.first()}")
    }
}
//(3초 후)
//Started
//(1초 후)
//#1 A
//(1초 후)
//#2 [A, B]
//Finished
//(1초 후)
//Started
//(1초 후)
//#3 A
//Finished

 

 

 

상태플로우

 

상태플로우는 공유플로우의 개념을 확장시킨 것으로, 항상 최신 상태를 유지하며 value 프로퍼티로 접근 가능한 값을 가지고 있다.

interface StateFlow<out T> : SharedFlow<T> {
    val value: T
}

interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    override var value: T
    fun compareAndSet(expect: T, update: T): Boolean
}

생성자를 통해 초기값을 설정하며, value 프로퍼티로 값을 얻어오거나 설정할 수 있다.

suspend fun main() = coroutineScope {
    val state = MutableStateFlow("A")
    println(state.value) // A
    launch {
        state.collect { println("Value changed to $it") }
        // Value changed to A
    }

    delay(1000)
    state.value = "B" // Value changed to B

    delay(1000)
    launch {
        state.collect { println("and now it is $it") }
        // and now it is B
    }

    delay(1000)
    state.value = "C" // Value changed to C and now it is C
}

상태플로우는 데이터가 덮어 씌워지기 때문에, 관찰이 느릴 경우 상태의 중간 변화를 받을 수 없는 경우도 있다.

모든 이벤트를 다 받으려면 공유플로우를 사용해야 한다.

suspend fun main(): Unit = coroutineScope {
    val state = MutableStateFlow('X')

    launch {
        for (c in 'A'..'E') {
            state.value = c
        }
    }

    state.collect {
        println(it)
    }
}
//E

 

 

stateIn

 

stateIn은 Flow<T>를 StateFlow<T>로 변환하는 함수이며 두 가지 형태가 있다.

suspend fun <T> Flow<T>.stateIn(
    scope: CoroutineScope
): StateFlow<T>

fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

StateFlow는 항상 값을 가져야 한다.

중단 함수 형태의 stateIn을 사용할 때는, 값을 명시하지 않았다면 첫 번째 값이 계산될 때까지 기다려야 한다.

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B", "C")
        .onEach { delay(1000) }
        .onEach { println("Produced $it") }
    val stateFlow: StateFlow<String> = flow.stateIn(this)

    println("Listening")
    println(stateFlow.value)
    stateFlow.collect {
        println("Received $it")
    }
}
//(1초 후)
//Produced A
//Listening
//A
//Received A
//(1초 후)
//Produced B
//Received B
//(1초 후)
//Produced C
//Received C

stateIn의 두 번째 형태는 초기 값과 started 모드를 지정해야 한다.

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B")
        .onEach { delay(1000) }
        .onEach { println("Produced $it") }

    val stateFlow: StateFlow<String> = flow.stateIn(
        scope = this,
        started = SharingStarted.Lazily,
        initialValue = "Empty"
    )

    println(stateFlow.value)

    delay(2000)
    stateFlow.collect {
        println("Received $it")
    }
}
//Empty
//(2초 후)
//Received Empty
//(1초 후)
//Produced A
//Received A
//(1초 후)
//Produced B
//Received B
반응형