안드로이드 개발자 노트
[코틀린 코루틴] 공유플로우와 상태플로우 본문
공유플로우
공유플로우를 통해 메세지를 보내면 대기하고 있는 모든 코루틴이 수신하게 된다.
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow =
MutableSharedFlow<String>(replay = 0)
// or MutableSharedFlow<String>()
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
mutableSharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
}
//#1 received Message1
//#2 received Message1
//#1 received Message2
//#2 received Message2
MutableSharedFlow는 replay 인자를 설정하여 마지막으로 전송한 값들이 정해진 수만큼 저장할 수 있다.
resetReplayCache를 사용하면 캐시를 초기화할 수 있다.
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>(
replay = 2,
)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
mutableSharedFlow.emit("Message3")
println(mutableSharedFlow.replayCache)
// [Message2, Message3]
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
// #1 received Message2
// #1 received Message3
}
delay(100)
mutableSharedFlow.resetReplayCache()
println(mutableSharedFlow.replayCache) // []
}
SendChannel, ReceiveChannel, Channel과 같이 코틀린에서는 감지만 하는 인터페이스와 변경하는 인터페이스를 구분하는 것이 관행이다.
마찬가지로 MustableSharedFlow는 Flow를 상속하고 감지하는 목적의 SharedFlow와 값을 내보내는 목적의 FlowCollector를 모두 상속한다.
interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
override suspend fun emit(value: T)
fun tryEmit(value: T): Boolean
val subscriptionCount: StateFlow<Int>
fun resetReplayCache()
}
interface SharedFlow<out T> : Flow<T> {
val replayCache: List<T>
override suspend fun collect(collector: FlowCollector<T>): Nothing
}
fun interface FlowCollector<in T> {
suspend fun emit(value: T)
}
SharedFlow와 FlowCollector 인터페이스는 값을 내보내거나 수집하는 함수만 노출하기 위해 주로 사용된다.
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>()
val sharedFlow: SharedFlow<String> = mutableSharedFlow
val collector: FlowCollector<String> = mutableSharedFlow
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
sharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
collector.emit("Message2")
}
shareIn
shareIn 함수는 콜드 플로우를 공유 가능한 핫 플로우로 변환하기 위한 확장 함수이다.
이를 통해 플로우를 여러 소비자가 동시에 사용할 수 있도록 캐싱하거나 데이터를 재활용할 수 있다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach { delay(1000) }
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
// replay = 0 (default)
)
delay(500)
launch {
sharedFlow.collect { println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#3 $it") }
}
}
//(1초 후)
//#1 A
//(1초 후)
//#1 B
//#2 B
//(1초 후)
//#1 C
//#2 C
//#3 C
shareIn은 다음과 같은 인자를 받는다.
- scope: shareIn으로 변환된 핫 플로우가 실행될 코루틴 범위를 지정하며, 이 범위가 종료되면 핫 플로우도 종료된다.
- started: 공유 시작 시점과 종료 조건을 결정하는 전략을 설정한다.
- replay: 가장 최근의 값들을 캐싱할 수 있는 갯수를 설정한다.
fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
두 번째 인자인 started는 다음과 같은 옵션을 지원하며, SharingStarted 인터페이스를 구현하여 커스텀화된 전략을 정의하는 것도 가능하다.
- SharingStarted.Eagerly: 즉시 값을 감지하기 시작하며, 시작하기 전에 값이 나오면 유실될 수 있다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
)
delay(100)
launch {
sharedFlow.collect { println("#1 $it") }
}
print("Done")
}
//(0.1초 후)
//Done
- SharingStarted.Lazily: 첫 구독자가 생길 때 감지하기 시작하며, 모든 값을 수신하는 것이 보장된다.
suspend fun main(): Unit = coroutineScope {
val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D")
.onEach { delay(1000) }
val sharedFlow = merge(flow1, flow2).shareIn(
scope = this,
started = SharingStarted.Lazily,
)
delay(100)
launch {
sharedFlow.collect { println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it") }
}
}
//(0.1초 후)
//#1 A
//#1 B
//#1 C
//(1초 후)
//#1 D
//#2 D
- SharingStarted.WhileSubscribed(timeoutMillis: Long): 첫 번재 구독자가 나올때 감지하기 시작, 마지막 구독자가 사라지면 플로
우도 멈춘다. 멈추고 난 후 timeoutMillis 이전에 새로운 구독자가 나오면 플로우가 다시 시작된다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart { println("Started") }
.onCompletion { println("Finished") }
.onEach { delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(),
)
delay(3000)
launch {
println("#1 ${sharedFlow.first()}")
}
launch {
println("#2 ${sharedFlow.take(2).toList()}")
}
delay(3000)
launch {
println("#3 ${sharedFlow.first()}")
}
}
//(3초 후)
//Started
//(1초 후)
//#1 A
//(1초 후)
//#2 [A, B]
//Finished
//(1초 후)
//Started
//(1초 후)
//#3 A
//Finished
상태플로우
상태플로우는 공유플로우의 개념을 확장시킨 것으로, 항상 최신 상태를 유지하며 value 프로퍼티로 접근 가능한 값을 가지고 있다.
interface StateFlow<out T> : SharedFlow<T> {
val value: T
}
interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
override var value: T
fun compareAndSet(expect: T, update: T): Boolean
}
생성자를 통해 초기값을 설정하며, value 프로퍼티로 값을 얻어오거나 설정할 수 있다.
suspend fun main() = coroutineScope {
val state = MutableStateFlow("A")
println(state.value) // A
launch {
state.collect { println("Value changed to $it") }
// Value changed to A
}
delay(1000)
state.value = "B" // Value changed to B
delay(1000)
launch {
state.collect { println("and now it is $it") }
// and now it is B
}
delay(1000)
state.value = "C" // Value changed to C and now it is C
}
상태플로우는 데이터가 덮어 씌워지기 때문에, 관찰이 느릴 경우 상태의 중간 변화를 받을 수 없는 경우도 있다.
모든 이벤트를 다 받으려면 공유플로우를 사용해야 한다.
suspend fun main(): Unit = coroutineScope {
val state = MutableStateFlow('X')
launch {
for (c in 'A'..'E') {
state.value = c
}
}
state.collect {
println(it)
}
}
//E
stateIn
stateIn은 Flow<T>를 StateFlow<T>로 변환하는 함수이며 두 가지 형태가 있다.
suspend fun <T> Flow<T>.stateIn(
scope: CoroutineScope
): StateFlow<T>
fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T>
StateFlow는 항상 값을 가져야 한다.
중단 함수 형태의 stateIn을 사용할 때는, 값을 명시하지 않았다면 첫 번째 값이 계산될 때까지 기다려야 한다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach { delay(1000) }
.onEach { println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(this)
println("Listening")
println(stateFlow.value)
stateFlow.collect {
println("Received $it")
}
}
//(1초 후)
//Produced A
//Listening
//A
//Received A
//(1초 후)
//Produced B
//Received B
//(1초 후)
//Produced C
//Received C
stateIn의 두 번째 형태는 초기 값과 started 모드를 지정해야 한다.
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B")
.onEach { delay(1000) }
.onEach { println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(
scope = this,
started = SharingStarted.Lazily,
initialValue = "Empty"
)
println(stateFlow.value)
delay(2000)
stateFlow.collect {
println("Received $it")
}
}
//Empty
//(2초 후)
//Received Empty
//(1초 후)
//Produced A
//Received A
//(1초 후)
//Produced B
//Received B
'Kotlin > 코틀린 코루틴' 카테고리의 다른 글
[코틀린 코루틴] 플로우 테스트하기 (0) | 2024.12.15 |
---|---|
[코틀린 코루틴] 플로우 처리 (2) | 2024.12.09 |
[코틀린 코루틴] 플로우 생명주기 함수 (0) | 2024.12.05 |