안드로이드 개발자 노트
[코틀린 코루틴] 채널, 셀렉트 본문
채널(Channel)
코루틴 채널(coroutine channel)은 비동기 프로그래밍에서 데이터를 코루틴 간의 통신을 위해 사용되는 구조이다.
- 채널은 송신자와 수신자의 수에 제한이 없으나, 채널의 양쪽 끝에 각각 하나의 코루틴만 있는 경우가 일반적이다.
- 채널을 통해 전송된 모든 값은 단 한 번만 받을 수 있다.
- 채널은 두 개의 인터페이스를 구현한 하나의 인터페이스이다.
interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
// ...
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
fun cancel(cause: CancellationException?)
// ...
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
- send(element: E): 원소를 보내는 함수이며, 채널의 용량이 다 찼을 때 중단된다.
- receive(): 원소를 받는 함수이며, 채널에 원소가 없다면 코루틴은 원소가 들어올 때까지 중단된다.
채널의 가장 간단한 예를 보면 다음과 같다.
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(5) { index ->
println("Producing next one")
delay(1000)
channel.send(index * 2)
}
channel.close()
}
launch {
for (element in channel) {
println(element)
}
// or
// channel.consumeEach { element ->
// println(element)
// }
}
}
close()를 호출하지 않으면, 채널은 종료되지 않기 때문에 소비자 코루틴이 데이터를 기다리면서 무한히 대기 상태에 머물게 된다.
produce 함수는 빌더로 시작된 코루틴이 끝난거나, 중단되거나, 취소되면 채널을 닫는다.
suspend fun main(): Unit = coroutineScope {
val channel = produce {
repeat(5) { index ->
println("Producing next one")
delay(1000)
send(index * 2)
}
}
for (element in channel) {
println(element)
}
}
채널 타입
설정한 용량 크기에 따라 채널을 네 가지 타입으로 구분할 수 있다.
- 무제한(Unlimited): 제한이 없는 용량 버퍼를 가진 채널
- 버퍼(Buffered): 특정 용량 크기로 설정된 채널
- 랑데뷰(Rendezvous): 용량이 0이거나 Channel.RENDEZVOUS인 채널로, 송신자와 수신자가 만날 때만 원소를 교환
- 융합(Conflated): 버퍼 크기가 1인 채널로, 새로운 원소가 이전 원소를 대채
무제한 버퍼 예제
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = Channel.UNLIMITED) {
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (1 - 4 * 0.1 = 0.6초 후)
// 0
// (1초 후)
// 2
// (1초 후)
// 4
// (1초 후)
// 6
// (1초 후)
// 8
// (1초 후)
특정 크기의 버퍼로 설정된 채널 예제
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = 3) {
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (1 - 2 * 0.1 = 0.8초 후)
// 0
// Sent
// (1초 후)
// 2
// Sent
// (1초 후)
// 4
// (1초 후)
// 6
// (1초 후)
// 8
// (1초 후)
기본 또는 랑데뷰 버퍼를 가진 채널 예제
suspend fun main(): Unit = coroutineScope {
val channel = produce {
// or produce(capacity = Channel.RENDEZVOUS) {
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// 0
// Sent
// (1초 후)
// 2
// Sent
// (1초 후)
// 4
// Sent
// (1초 후)
// 6
// Sent
// (1초 후)
// 8
// Sent
// (1초 후)
융합 버퍼 용량을 가진 채널 예제
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = Channel.CONFLATED) {
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (1 - 4 * 0.1 = 0.6초 후)
//8
버퍼 오버플로일 때
버퍼가 꽉 찼을 때(버퍼 오버플로)의 행동을 정의할 수 있다.
- SUSPEND(디폴트 옵션): 버퍼가 가득 찼을 때, send 메서드가 중단된다.
- DROP_OLDSET: 버퍼가 가득 찼을 때, 가장 오래된 원소가 제거된다.
- DROP_LASTEST: 버퍼가 가득 찼을 때, 가장 최근의 원소가 제거된다.
Channel.CONFLATED는 채널의 버퍼 용량을 1로 설정하고 onBufferOverflow를 DROP_OLDEST로 설정한 것임을 알 수 있다.
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>(
capacity = 2,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
launch {
repeat(5) { index ->
channel.send(index * 2)
delay(100)
println("Sent")
}
channel.close()
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (1 - 4 * 0.1 = 0.6초 후)
// 6
// (1초 후)
// 8
전달되지 않은 원소 핸들러
Channel에는 원소가 어떠한 이유로 처리되지 않았을 때를 설정할 수 있는 onUndeliveredElement 파라미터가 있으며, 주로 채널에서 보낸 자원을 닫을 때 사용한다.
// 채널
val channel = Channel<Resource>(
capacity,
onUndeliveredElement = { resource ->
resource.close()
}
)
// 생성자 코드
val resourceToSend = openResource()
channel.send(resourceToSend)
// 소비자 코드
val resourceReceived = channel.receive()
try {
// 수신한 자원으로 작업한다.
} finally {
resourceReceived.close()
}
팬 아웃(Fan-out)
여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있다.
fun CoroutineScope.produceNumbers() = produce {
repeat(10) {
delay(100)
send(it)
}
}
fun CoroutineScope.launchProcessor(
id: Int,
channel: ReceiveChannel<Int>
) = launch {
for (msg in channel) {
println("#$id received $msg")
}
}
suspend fun main(): Unit = coroutineScope {
val channel = produceNumbers()
repeat(3) { id ->
delay(10)
launchProcessor(id, channel)
}
}
//#0 received 0
//#1 received 1
//#2 received 2
//#0 received 3
//#1 received 4
//#2 received 5
//#0 received 6
//#1 received 7
//#2 received 8
//#0 received 9
채널은 원소를 기다리는 코루틴들을 큐(FIFO)로 가지고 있어서 코루틴이 순차적으로 원소를 받는다.
팬 인(Fan-in)
여러 개의 코루틴이 하나의 채널로 원소를 전송할 수도 있다.
suspend fun sendString(
channel: SendChannel<String>,
text: String,
time: Long
) {
while (true) {
delay(time)
channel.send(text)
}
}
fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(50) {
println(channel.receive())
}
coroutineContext.cancelChildren()
}
// (0.2초 후)
// foo
// (0.2초 후)
// foo
// (0.2초 후)
// BAR!
// (0.2초 후)
// foo
// (0.2초 후)
// ...
다수의 채널을 하나의 채널로 합쳐야 할 경우, 여러 개의 채널을 합치는 fanIn 함수를 사용할 수 있다.
fun <T> CoroutineScope.fanIn(
vararg channels: ReceiveChannel<T>
): ReceiveChannel<T> = produce {
for (channel in channels) {
launch {
for (value in channel) {
send(value)
}
}
}
}
파이프라인
한 채널로부터 받은 원소를 다른 채널로 전송하는 것을 파이프라인이라고 한다.
fun CoroutineScope.numbers(): ReceiveChannel<Int> =
produce {
repeat(3) { num ->
send(num + 1)
}
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>) =
produce {
for (num in numbers) {
send(num * num)
}
}
suspend fun main() = coroutineScope {
val numbers = numbers()
val squared = square(numbers)
for (num in squared) {
println(num)
}
}
// 1
// 4
// 9
위 내용을 바탕으로, 카페에서 바리스타가 음료를 제조하는 예시를 만들면 아래와 같다.
fun CoroutineScope.serveOrders(
orders: ReceiveChannel<Order>,
barista: Barista
) : ReceiveChannel<CoffeeResult> = produce {
for (order in orders) {
val coffee = prepareCoffee(order.type)
send(
CoffeeResult(
coffee = coffee,
customer = order.customer,
barista = barista
)
)
}
}
val coffeeResults = fanIn(
serveOrders(ordersChannel, Barista("Alex")),
serveOrders(ordersChannel, Barista("Bob")),
serveOrders(ordersChannel, Barista("Celine"))
)
셀렉트(Select)
코틀린 코루틴은 가장 먼저 완료되는 코루틴의 결과를 기다리거나, 여러 개의 채널 중 버퍼에 남은 공간이 있는 채널을 먼저 확인하여 데이터를 보내거나, 이용 가능한 원소가 있는 채널로부터 데이터를 받을 수 있는지 여부 확인할 수 있는 select 함수를 제공한다.
지연되는 값 선택하기
select 함수를 사용해 여러 개의 소스에 데이터를 요청한 뒤 가장 빠른 응답만 얻을 수 있다.
async와 select를 사용하면 코루틴끼리 경합하는 상황을 구현할 수 있지만, 스코프를 명시적으로 취소해야 한다.
suspend fun requestData1(): String {
delay(100_000)
return "Data1"
}
suspend fun requestData2(): String {
delay(1000)
return "Data2"
}
suspend fun askMultipleForData(): String = coroutineScope {
select<String> {
async { requestData1() }.onAwait { it }
async { requestData2() }.onAwait { it }
}.also { coroutineContext.cancelChildren() }
}
suspend fun main(): Unit = coroutineScope {
println(askMultipleForData())
}
명시적으로 취소해야하는 문제 때문에, 예제처럼 헬퍼 함수를 사용하거나 raceOf 함수(외부 라이브러리)를 사용하면 된다.
suspend fun askMultipleForData(): String = raceOf({
requestData1()
}, {
requestData2()
})
suspend fun main(): Unit = coroutineScope {
println(askMultipleForData())
}
채널에서 값 선택하기
select 함수는 여러 개의 채널로부터 결괏값을 얻을 수 있으며, 주요 함수는 다음과 같다.
- onReceive: 채널에서 값을 받고, 해당 값을 기반으로 람다식을 수행하여 결괏값을 반환한다.
- onReceiveCatching: 채널에서 값을 받거나 닫혔는지 확인하며, ChannelResult를 통해 값을 처리한다.
- onSend: 버퍼 공간이 있는 경우 채널에 값을 보내고, Unit을 반환한다.
예시를 보면 아래와 같다.
fun main(): Unit = runBlocking {
val c1 = Channel<Char>(capacity = 2)
val c2 = Channel<Char>(capacity = 2)
// Send values
launch {
for (c in 'A'..'H') {
delay(400)
select<Unit> {
c1.onSend(c) { println("Sent $c to 1") }
c2.onSend(c) { println("Sent $c to 2") }
}
}
}
// Receive values
launch {
while (true) {
delay(1000)
val c = select<String> {
c1.onReceive { "$it from 1" }
c2.onReceive { "$it from 2" }
}
println("Received $c")
}
}
}
//Sent A to 1
//Sent B to 1
//Received A from 1
//Sent C to 1
//Sent D to 2
//Received B from 1
//Sent E to 1
//Sent F to 2
//Received C from 1
//Sent G to 1
//Received E from 1
//Sent H to 1
//Received G from 1
//Received H from 1
//Received D from 2
//Received F from 2
'Kotlin > 코틀린 코루틴' 카테고리의 다른 글
[코틀린 코루틴] 핫 데이터 소스와 콜드 데이터 소스 (0) | 2024.11.24 |
---|---|
[코틀린 코루틴] 8장. 잡과 자식 코루틴 기다리기 (0) | 2024.03.30 |
[코틀린 코루틴] 7장. 코루틴 컨텍스트 (0) | 2024.03.24 |