안드로이드 개발자 노트
[코틀린 코루틴] 플로우 만들기 본문
원시 값을 가지는 플로우
플로우가 어떤 값을 가져야 하는지 정의하는 flowOf 함수는 listOf함수처럼 사용할 수 있다.
suspend fun main() {
flowOf(1, 2, 3, 4, 5)
.collect { print(it) } // 12345
}
값이 없는 플로우가 필요한 경우도 있다.
suspend fun main() {
emptyFlow<Int>()
.collect { print(it) } // (nothing)
}
컨버터
asFlow 함수를 사용해서 Iterable, Iterator, Sequence를 Flow로 바꿀 수 있다.
suspend fun main() {
listOf(1, 2, 3, 4, 5)
// or setOf(1, 2, 3, 4, 5)
// or sequenceOf(1, 2, 3, 4, 5)
.asFlow()
.collect { print(it) } // 12345
}
함수를 플로우로 바꾸기
중단 함수를 플로우로 변환하는 것이 가능하며, 중단 함수의 결과가 플로우의 값이 된다.
suspend fun main() {
val function = suspend {
// 중단 함수를 람다식으로 만든 것이다.
delay(1000)
"UserName"
}
function.asFlow()
.collect { println(it) }
}
// (1초 후)
// UserName
일반 함수를 변경하려면 함수 참조값이 필요하다.
suspend fun getUserName(): String {
delay(1000)
return "UserName"
}
suspend fun main() {
::getUserName
.asFlow()
.collect { println(it) }
}
// (1초 후)
// UserName
플로우와 리액티브 스트림
리액티브 스트림(Reator, RxJava)을 활용하고 있다면 코드를 크게 바꾸지 않고 플로우를 적용할 수 있다.
suspend fun main() = coroutineScope {
Flux.range(1, 5).asFlow()
.collect { print(it) } // 12345
Flowable.range(1, 5).asFlow()
.collect { print(it) } // 12345
Observable.range(1, 5).asFlow()
.collect { print(it) } // 12345
}
Flux, Flowable, Observable은 kotlinx-coroutines-reactive 라이브러리의 asFlow 함수를 사용해 Flow로 변환 가능한 Publisher 인터페이스를 구현하고 있다.
역으로 변환하려면 kotlinx-coroutines-rx3(또는 rx2) 라이브러리를 사용하면 Flow를 Flowable이나 Observable로 변환이 가능하다.
suspend fun main() = coroutineScope {
val flow = flowOf(1, 2, 3, 4, 5)
flow.asFlux()
.doOnNext { print(it) } // 12345
.subscribe()
flow.asFlowable()
.subscribe { print(it) } // 12345
flow.asObservable()
.subscribe { print(it) } // 12345
}
플로우 빌더
빌더는 flow 함수를 먼저 호출하고, 람다식 내부에서 emit 함수를 사용해 다음 값을 방출한다.
fun makeFlow(): Flow<Int> = flow {
repeat(3) { num ->
delay(1000)
emit(num)
}
}
suspend fun main() {
makeFlow()
.collect { println(it) }
}
// (1초 후)
// 0
// (1초 후)
// 1
// (1초 후)
// 2
플로우 빌더 이해하기
플로우 빌더는 내부적으로 collect 메서드 내부에서 block 함수를 호출하는 Flow 인터페이스를 구현한다.
fun <T> flow(
block: suspend FlowCollector<T>.() -> Unit
): Flow<T> = object : Flow<T>() {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
interface Flow<out T> {
abstract suspend fun collect(collector: FlowCollector<T>): Unit
}
fun interface FlowCollector<in T> {
suspend fun emit(value: T)
}
빌더의 원리를 이해한 뒤 다음 코드가 어떻게 작동하는지 분석하면 다음과 같다.
fun main() = runBlocking {
flow { // 1
emit("A")
emit("B")
emit("C")
}.collect { value -> // 2
println(value)
}
}
// A
// B
// C
- flow 빌더를 호출하면 객체를 만든다.
- collect를 호출하면 collector 인터페이스의 block 함수를 호출하게 된다.
- 이 예제의 block 함수는 1에서 정의된 람다식이다.
- 리시버는 2에서 정의된 람다식인 collect이다.
- 람다식으로 정의된 함수형 인터페이스에서는, 람다식의 본체가 해당 인터페이스의 메서드 구현으로 사용된다(여기서는 emit).
- 그러므로 emit 함수의 본체는 println(value)가 된다.
- 따라서 collect를 호출하면 1에서 정의된 람다식을 실행하기 시작하고, emit을 호출했을 때 2에서 정의된 람다식을 호출한다.
채널 플로우(channelFlow)
Flow는 콜드 데이터 스트림이므로 필요할 때만 값을 생성한다.
채널 플로우(Channel Flow)는 코틀린의 flow와 채널(Channel)의 특성을 결합한 개념으로, 동시성과 비동기 데이터 스트림 처리를 효과적으로 결합하기 위해 사용된다.
이를 통해 플로우 내부에서 여러 생산자와 소비자를 동적으로 활용할 수 있다.
예를 들어 특정 사용자를 찾는 상황에서, 사용자가 첫 번째 페이지에서 있다면 더 많은 페이지를 요청하지 않아도 된다.
아래 예제에서 flow 빌더를 사용해 다음 원소를 생성한다.
다음 페이지는 필요할 때만 지연 요청한다.
data class User(val name: String)
interface UserApi {
suspend fun takePage(pageNumber: Int): List<User>
}
class FakeUserApi : UserApi {
private val users = List(20) { User("User$it") }
private val pageSize: Int = 3
override suspend fun takePage(
pageNumber: Int
): List<User> {
delay(1000) // suspending
return users
.drop(pageSize * pageNumber)
.take(pageSize)
}
}
fun allUsersFlow(api: UserApi): Flow<User> = flow {
var page = 0
do {
println("Fetching page $page")
val users = api.takePage(page++) // suspending
emitAll(users.asFlow())
} while (users.isNotEmpty())
}
suspend fun main() {
val api = FakeUserApi()
val users = allUsersFlow(api)
val user = users
.first {
println("Checking $it")
delay(1000) // suspending
it.name == "User3"
}
println(user)
}
//Fetching page 0
//(1초 후)
//Checking User(name=User0)
//(1초 후)
//Checking User(name=User1)
//(1초 후)
//Checking User(name=User2)
//(1초 후)
//Fetching page 1
//(1초 후)
//Checking User(name=User3)
//(1초 후)
//User(name=User3)
반면 원소를 처리하고 있을 때 미리 페이지를 받아올 수도 있다.
네트워크 호출을 더 빈번하게 하는 단점이 있지만 결과를 더 빠르게 얻어올 수 있다.
이렇게 하려면 데이터를 생성하고 소비하는 과정이 별개로 진행되어야 하며, 이는 채널과 같은 핫 스트림 데이터의 특징이다.
채널 플로우 빌더는 한 번 시작하면 리시버를 기다릴 필요 없이 분리된 코루틴에서 값을 생성한다는 점이 채널과 비슷하다고 할 수 있다.
data class User(val name: String)
interface UserApi {
suspend fun takePage(pageNumber: Int): List<User>?
}
class FakeUserApi : UserApi {
private val users = List(20) { User("User$it") }
private val pageSize: Int = 3
override suspend fun takePage(
pageNumber: Int
): List<User>? {
delay(1000)
return users
.drop(pageSize * pageNumber)
.take(pageSize)
}
}
fun allUsersFlow(api: UserApi): Flow<User> = channelFlow {
var page = 0
do {
println("Fetching page $page")
val users = api.takePage(page++) // suspending
users?.forEach { send(it) }
} while (!users.isNullOrEmpty())
}
suspend fun main() {
val api = FakeUserApi()
val users = allUsersFlow(api)
val user = users
.first {
println("Checking $it")
delay(1000)
it.name == "User3"
}
println(user)
}
//Fetching page 0
//(1초 후)
//Checking User(name=User0)
//Fetching page 1
//(1초 후)
//Checking User(name=User1)
//Fetching page 2
//(1초 후)
//Checking User(name=User2)
//Fetching page 3
//(1초 후)
//Checking User(name=User3)
//Fetching page 4
//(1초 후)
//User(name=User3)
이처럼 여러 개의 값을 독립적으로 계산해야 할 때 channelFlow를 주로 사용한다.
콜백 플로우(callbackFlow)
사용자의 클릭이나 활동 변화를 감지해야 하는 이벤트 플로우가 필요할 경우, channelFlow를 사용해도 되지만 이 경우에는 callbackFlow를 사용하는 것이 좋다.
- 콜백 플로우(Callback Flow)는 콜백 기반 비동기 API를 플로우(Flow)로 변환하여 사용하기 위한 코틀린 플로우의 기능이다.
- 콜백을 사용하는 기존의 비동기 작업 패턴을 플로우와 같은 선언적 방식으로 처리할 수 있도록 돕는다.
다음은 콜백을 래핑하는데 유용한 함수들이다.
- awaitClose { _ }: 채널이 닫힐 때까지 중단되는 함수, 채널이 닫힌 다음에 인자로 들어온 함수가 실행된다.
- trySendBlocking(value): send와 비슷하지만 중단하는 대신 블로킹하여 중단 함수가 아닌 함수에서도 사용할 수 있다.
- close(): 채널을 닫는다.
- cancel(throwable): 채널을 종료하고 플로우에 예외를 던진다.
다음은 callbackFlow 예제이다.
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback {
override fun onNextValue(value: T) {
trySendBlocking(value)
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
}
api.register(callback)
awaitClose { api.unregister(callback) }
}
'Kotlin > 코틀린 코루틴' 카테고리의 다른 글
[코틀린 코루틴] 플로우 생명주기 함수 (0) | 2024.12.05 |
---|---|
[코틀린 코루틴] 플로우의 실제 구현 (0) | 2024.11.29 |
[코틀린 코루틴] 플로우란 무엇인가? (0) | 2024.11.24 |