반응형
Notice
Recent Posts
Recent Comments
Link
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
Tags
more
Archives
Today
Total
관리 메뉴

안드로이드 개발자 노트

[코틀린 코루틴] 채널, 셀렉트 본문

Kotlin/코틀린 코루틴

[코틀린 코루틴] 채널, 셀렉트

어리둥절범고래 2024. 11. 18. 11:52
반응형

채널(Channel)

 

코루틴 채널(coroutine channel)은 비동기 프로그래밍에서 데이터를 코루틴 간의 통신을 위해 사용되는 구조이다.

 

 

  • 채널은 송신자와 수신자의 수에 제한이 없으나, 채널의 양쪽 끝에 각각 하나의 코루틴만 있는 경우가 일반적이다.
  • 채널을 통해 전송된 모든 값은 단 한 번만 받을 수 있다.
  • 채널은 두 개의 인터페이스를 구현한 하나의 인터페이스이다.
interface SendChannel<in E> {
    suspend fun send(element: E)
    fun close(): Boolean
    // ...
}

interface ReceiveChannel<out E> {
    suspend fun receive(): E
    fun cancel(cause: CancellationException?)
    // ...
}

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
  • send(element: E): 원소를 보내는 함수이며, 채널의 용량이 다 찼을 때 중단된다.
  • receive(): 원소를 받는 함수이며, 채널에 원소가 없다면 코루틴은 원소가 들어올 때까지 중단된다.

채널의 가장 간단한 예를 보면 다음과 같다.

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Int>()
    launch {
        repeat(5) { index ->
            println("Producing next one")
            delay(1000)
            channel.send(index * 2)
        }
        channel.close()
    }

    launch {
        for (element in channel) {
            println(element)
        }
        // or
        // channel.consumeEach { element ->
        //     println(element)
        // }
    }
}

close()를 호출하지 않으면, 채널은 종료되지 않기 때문에 소비자 코루틴이 데이터를 기다리면서 무한히 대기 상태에 머물게 된다.

produce 함수는 빌더로 시작된 코루틴이 끝난거나, 중단되거나, 취소되면 채널을 닫는다.

suspend fun main(): Unit = coroutineScope {
    val channel = produce {
        repeat(5) { index ->
            println("Producing next one")
            delay(1000)
            send(index * 2)
        }
    }

    for (element in channel) {
        println(element)
    }
}

 

 

채널 타입

 

설정한 용량 크기에 따라 채널을 네 가지 타입으로 구분할 수 있다.

 

  • 무제한(Unlimited): 제한이 없는 용량 버퍼를 가진 채널
  • 버퍼(Buffered): 특정 용량 크기로 설정된 채널
  • 랑데뷰(Rendezvous): 용량이 0이거나 Channel.RENDEZVOUS인 채널로, 송신자와 수신자가 만날 때만 원소를 교환
  • 융합(Conflated): 버퍼 크기가 1인 채널로, 새로운 원소가 이전 원소를 대채

 

무제한 버퍼 예제

suspend fun main(): Unit = coroutineScope {
    val channel = produce(capacity = Channel.UNLIMITED) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (1 - 4 * 0.1 = 0.6초 후)
// 0
// (1초 후)
// 2
// (1초 후)
// 4
// (1초 후)
// 6
// (1초 후)
// 8
// (1초 후)

 

특정 크기의 버퍼로 설정된 채널 예제

suspend fun main(): Unit = coroutineScope {
    val channel = produce(capacity = 3) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (1 - 2 * 0.1 = 0.8초 후)
// 0
// Sent
// (1초 후)
// 2
// Sent
// (1초 후)
// 4
// (1초 후)
// 6
// (1초 후)
// 8
// (1초 후)

 

기본 또는 랑데뷰 버퍼를 가진 채널 예제

suspend fun main(): Unit = coroutineScope {
    val channel = produce {
        // or produce(capacity = Channel.RENDEZVOUS) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
// 0
// Sent
// (1초 후)
// 2
// Sent
// (1초 후)
// 4
// Sent
// (1초 후)
// 6
// Sent
// (1초 후)
// 8
// Sent
// (1초 후)


융합 버퍼 용량을 가진 채널 예제

suspend fun main(): Unit = coroutineScope {
    val channel = produce(capacity = Channel.CONFLATED) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (1 - 4 * 0.1 = 0.6초 후)
//8

 

 

버퍼 오버플로일 때

 

버퍼가 꽉 찼을 때(버퍼 오버플로)의 행동을 정의할 수 있다.

 

  • SUSPEND(디폴트 옵션): 버퍼가 가득 찼을 때, send 메서드가 중단된다.
  • DROP_OLDSET: 버퍼가 가득 찼을 때, 가장 오래된 원소가 제거된다.
  • DROP_LASTEST: 버퍼가 가득 찼을 때, 가장 최근의 원소가 제거된다.

Channel.CONFLATED는 채널의 버퍼 용량을 1로 설정하고 onBufferOverflow를 DROP_OLDEST로 설정한 것임을 알 수 있다.

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Int>(
        capacity = 2,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )

    launch {
        repeat(5) { index ->
            channel.send(index * 2)
            delay(100)
            println("Sent")
        }
        channel.close()
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (0.1초 후)
// Sent
// (1 - 4 * 0.1 = 0.6초 후)
// 6
// (1초 후)
// 8

 

 

전달되지 않은 원소 핸들러

 

Channel에는 원소가 어떠한 이유로 처리되지 않았을 때를 설정할 수 있는 onUndeliveredElement 파라미터가 있으며, 주로 채널에서 보낸 자원을 닫을 때 사용한다.

// 채널
val channel = Channel<Resource>(
    capacity,
    onUndeliveredElement = { resource ->
        resource.close()
    }
)

// 생성자 코드
val resourceToSend = openResource()
channel.send(resourceToSend)

// 소비자 코드
val resourceReceived = channel.receive()
try {
    // 수신한 자원으로 작업한다.
} finally {
    resourceReceived.close()
}

 

 

팬 아웃(Fan-out)

 

여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있다.

fun CoroutineScope.produceNumbers() = produce {
    repeat(10) {
        delay(100)
        send(it)
    }
}

fun CoroutineScope.launchProcessor(
    id: Int,
    channel: ReceiveChannel<Int>
) = launch {
    for (msg in channel) {
        println("#$id received $msg")
    }
}

suspend fun main(): Unit = coroutineScope {
    val channel = produceNumbers()
    repeat(3) { id ->
        delay(10)
        launchProcessor(id, channel)
    }
}
//#0 received 0
//#1 received 1
//#2 received 2
//#0 received 3
//#1 received 4
//#2 received 5
//#0 received 6
//#1 received 7
//#2 received 8
//#0 received 9

채널은 원소를 기다리는 코루틴들을 큐(FIFO)로 가지고 있어서 코루틴이 순차적으로 원소를 받는다.

 

 

팬 인(Fan-in)

 

여러 개의 코루틴이 하나의 채널로 원소를 전송할 수도 있다.

suspend fun sendString(
    channel: SendChannel<String>,
    text: String,
    time: Long
) {
    while (true) {
        delay(time)
        channel.send(text)
    }
}

fun main() = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(50) {
        println(channel.receive())
    }
    coroutineContext.cancelChildren()
}
// (0.2초 후)
// foo
// (0.2초 후)
// foo
// (0.2초 후)
// BAR!
// (0.2초 후)
// foo
// (0.2초 후)
// ...

 

다수의 채널을 하나의 채널로 합쳐야 할 경우, 여러 개의 채널을 합치는 fanIn 함수를 사용할 수 있다.

fun <T> CoroutineScope.fanIn(
    vararg channels: ReceiveChannel<T>
): ReceiveChannel<T> = produce {
    for (channel in channels) {
        launch {
            for (value in channel) {
                send(value)
            }
        }
    }
}

 

 

파이프라인

 

한 채널로부터 받은 원소를 다른 채널로 전송하는 것을 파이프라인이라고 한다.

fun CoroutineScope.numbers(): ReceiveChannel<Int> =
    produce {
        repeat(3) { num ->
            send(num + 1)
        }
    }

fun CoroutineScope.square(numbers: ReceiveChannel<Int>) =
    produce {
        for (num in numbers) {
            send(num * num)
        }
    }

suspend fun main() = coroutineScope {
    val numbers = numbers()
    val squared = square(numbers)
    for (num in squared) {
        println(num)
    }
}
// 1
// 4
// 9

 

위 내용을 바탕으로, 카페에서 바리스타가 음료를 제조하는 예시를 만들면 아래와 같다.

fun CoroutineScope.serveOrders(
    orders: ReceiveChannel<Order>,
    barista: Barista
) : ReceiveChannel<CoffeeResult> = produce { 
    for (order in orders) {
        val coffee = prepareCoffee(order.type)
        send(
            CoffeeResult(
                coffee = coffee,
                customer = order.customer,
                barista = barista
            )
        )
    }
}

val coffeeResults = fanIn(
    serveOrders(ordersChannel, Barista("Alex")),
    serveOrders(ordersChannel, Barista("Bob")),
    serveOrders(ordersChannel, Barista("Celine"))
)

 

 

 

 

셀렉트(Select)

 

코틀린 코루틴은 가장 먼저 완료되는 코루틴의 결과를 기다리거나, 여러 개의 채널 중 버퍼에 남은 공간이 있는 채널을 먼저 확인하여 데이터를 보내거나, 이용 가능한 원소가 있는 채널로부터 데이터를 받을 수 있는지 여부 확인할 수 있는 select 함수를 제공한다.

 

지연되는 값 선택하기

 

select 함수를 사용해 여러 개의 소스에 데이터를 요청한 뒤 가장 빠른 응답만 얻을 수 있다.

async와 select를 사용하면 코루틴끼리 경합하는 상황을 구현할 수 있지만, 스코프를 명시적으로 취소해야 한다.

suspend fun requestData1(): String {
    delay(100_000)
    return "Data1"
}

suspend fun requestData2(): String {
    delay(1000)
    return "Data2"
}

suspend fun askMultipleForData(): String = coroutineScope {
    select<String> {
        async { requestData1() }.onAwait { it }
        async { requestData2() }.onAwait { it }
    }.also { coroutineContext.cancelChildren() }
}

suspend fun main(): Unit = coroutineScope {
    println(askMultipleForData())
}

 

명시적으로 취소해야하는 문제 때문에, 예제처럼 헬퍼 함수를 사용하거나 raceOf 함수(외부 라이브러리)를 사용하면 된다.

suspend fun askMultipleForData(): String = raceOf({
    requestData1()
}, {
    requestData2()
})

suspend fun main(): Unit = coroutineScope { 
    println(askMultipleForData())
}

 

 

채널에서 값 선택하기

 

select 함수는 여러 개의 채널로부터 결괏값을 얻을 수 있으며, 주요 함수는 다음과 같다.

 

  • onReceive: 채널에서 값을 받고, 해당 값을 기반으로 람다식을 수행하여 결괏값을 반환한다.
  • onReceiveCatching: 채널에서 값을 받거나 닫혔는지 확인하며, ChannelResult를 통해 값을 처리한다.
  • onSend: 버퍼 공간이 있는 경우 채널에 값을 보내고, Unit을 반환한다.

예시를 보면 아래와 같다.

fun main(): Unit = runBlocking {
    val c1 = Channel<Char>(capacity = 2)
    val c2 = Channel<Char>(capacity = 2)
    
    // Send values
    launch {
        for (c in 'A'..'H') {
            delay(400)
            select<Unit> {
                c1.onSend(c) { println("Sent $c to 1") }
                c2.onSend(c) { println("Sent $c to 2") }
            }
        }
    }
    
    // Receive values
    launch {
        while (true) {
            delay(1000)
            val c = select<String> {
                c1.onReceive { "$it from 1" }
                c2.onReceive { "$it from 2" }
            }
            println("Received $c")
        }
    }
}
//Sent A to 1
//Sent B to 1
//Received A from 1
//Sent C to 1
//Sent D to 2
//Received B from 1
//Sent E to 1
//Sent F to 2
//Received C from 1
//Sent G to 1
//Received E from 1
//Sent H to 1
//Received G from 1
//Received H from 1
//Received D from 2
//Received F from 2

 

반응형