반응형
Notice
Recent Posts
Recent Comments
Link
«   2024/09   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
Tags
more
Archives
Today
Total
관리 메뉴

안드로이드 개발자 노트

[코루틴의 정석] 5장. async와 Deferred 본문

Kotlin/코루틴의 정석

[코루틴의 정석] 5장. async와 Deferred

어리둥절범고래 2024. 9. 1. 17:37
반응형

코루틴 라이브러리는 비동기 작업으로부터 결과를 수신해야 하는 경우를 위해 async 코루틴 빌더를 통해 결괏값을 수신받을 수 있도록 한다.

 

5.1. async 사용해 결괏값 수신하기

 

5.1.1. async 사용해 Deferred 만들기

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>

async 함수는 CoroutineDispatcher를 설정할 수 있는 context 인자와 지연 시작할 수 있는 start 인자 그리고 코루틴에서 실행할 코드를 작성하는 block 람다식을 가진다.

 

launch와 async가 다른 점은 아래와 같다.

 

launch async
결괏값을 직접 반환할 수 없다. 결괏값을 직접 반환할 수 있다.
Job 객체를 반환한다. Deferred<T> 객체를 반환한다.

 

 

Deferred에 명시적으로 타입<T>를 설정하거나 async 블록의 반환값으로 반환할 결괏값을 설정하면 된다.

val networkDeferred: Deferred<String> = async(Dispatchers.IO) { 
    delay(1000) // 네트워크 요청
    return@async "Dummy Response" // 결괏값 반환
}

 

5.1.2. await를 사용한 결괏값 수신

 

Deferred 객체는 결괏값 수신의 대기를 위해 await 함수를 제공한다.

Deferred 객체의 await 함수는 코루틴이 실행 완료될 때까지 호출부의 코루틴을 일시 중단한다는 점에서 Job 객체의 join 함수와 유사하게 동작한다.

fun main() = runBlocking<Unit> {
  val networkDeferred: Deferred<String> = async(Dispatchers.IO) {
    delay(1000L) // 네트워크 요청
    return@async "Dummy Response" // 결과값 반환
  }
  val result = networkDeferred.await() // networkDeferred로부터 결과값이 반환될 때까지 runBlocking 일시 중단
  println(result) // Dummy Response 출력
}
/*
// 결과:
[DefaultDispatcher-worker-1 @coroutine#2] 토큰 업데이트 시작
[DefaultDispatcher-worker-3 @coroutine#3] 네트워크 요청
[DefaultDispatcher-worker-1 @coroutine#2] 토큰 업데이트 완료
*/

 

 

5.2. Deferred는 특수한 형태의 Job이다

 

Deferred<T>는 인터페이스로 정의되어 있으며, 이는 Job 인터페이스를 확장하고 있다.

즉 Deferred는 Job의 서브타입으로, 결괏값 수신을 위한 몇 가지 기능이 추가됐을 뿐 Job 객체의 일종이다.

public interface Deferred<out T> : Job {

    public suspend fun await(): T

    public val onAwait: SelectClause1<T>

    @ExperimentalCoroutinesApi
    public fun getCompleted(): T

    @ExperimentalCoroutinesApi
    public fun getCompletionExceptionOrNull(): Throwable?
}

 

Deferred 객체는 Job 객체의 모든 함수와 프로퍼티를 사용할 수 있으며, 상태 조회를 위한 isActive, isCancelled, isCompleted 프로퍼티들도 사용할 수 있다.

fun main() = runBlocking<Unit> {
  val networkDeferred: Deferred<String> = async(Dispatchers.IO) {
    delay(1000L) // 네트워크 요청
    "Dummy Response"
  }
  networkDeferred.join() // networkDeferred가 실행 완료될 때까지 대기
  printJobState(networkDeferred) // Job이 입력되어야 할 자리에 Deferred 입력
}

fun printJobState(job: Job) {
  println(
    "Job State\n" +
        "isActive >> ${job.isActive}\n" +
        "isCancelled >> ${job.isCancelled}\n" +
        "isCompleted >> ${job.isCompleted} "
  )
}

/*
// 결과:
Job State
isActive >> false
isCancelled >> false
isCompleted >> true
*/

 

 


5.3. 복수의 코루틴으로부터 결괏값 수신하기

 

5.3.1. await를 사용해 복수의 코루틴으로부터 결괏값 수신하기

 

여러 비동기 작업으로부터 결괏값을 반환받아 병합해야 하는 경우가 자주 있다.

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val networkDeferred1: Deferred<String> = async(Dispatchers.IO) {
        delay(1000)
        return@async "Dummy Response1"
    }
    val networkResponse1 = networkDeferred1.await()
    val networkDeferred2: Deferred<String> = async(Dispatchers.IO) {
        delay(1000)
        return@async "Dummy Response2"
    }
    val networkResponse2 = networkDeferred2.await()
    print("${getElapsedTime(startTime)} ,$networkResponse1/$networkResponse2")
    /*
    // 결과:
    지난 시간: 2023ms ,Dummy Response1/Dummy Response2
    */
}

서버의 호출에 2초의 시간이 걸리는 이유는 await를 호출하면 결괏값이 반환될 때까지 호출부의 코루틴이 일시 중단되기 때문이다.

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val networkDeferred1: Deferred<String> = async(Dispatchers.IO) {
        delay(1000)
        return@async "Dummy Response1"
    }
    val networkDeferred2: Deferred<String> = async(Dispatchers.IO) {
        delay(1000)
        return@async "Dummy Response2"
    }
    val networkResponse1 = networkDeferred1.await()
    val networkResponse2 = networkDeferred2.await()
    print("${getElapsedTime(startTime)} ,$networkResponse1/$networkResponse2")
    /*
    // 결과:
    지난 시간: 1016ms ,Dummy Response1/Dummy Response2
    */
}

이 코드에서는 networkDeferred1.await()가 호출되기 전에 networkDeferred2 코루틴이 실행되므로 networkDeferred1 코루틴과 networkDeferred2 코루틴이 동시에 실행된다.

 


5.3.2. awaitAll을 사용한 결괏값 수신

 

코루틴 라이브러리는 복수의 Deferred 객체로부터 결괏값을 수신하기 위한 awaitAll 함수를 제공한다.

public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T>

 

awaitAll 함수는 가변 인자로 Deferred 타입의 객체를 받아 인자로 모든 Deferred 코루틴으로부터 결과가 수신될 때까지 호출부의 코루틴을 일시중단한 후 결과가 모두 수신되면 결괏값들을 List로 만들어 반환하고 호출부의 코루틴을 재개한다.

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val networkDeferred1: Deferred<String> = async(Dispatchers.IO) {
        delay(1000)
        return@async "Dummy Response1"
    }
    val networkDeferred2: Deferred<String> = async(Dispatchers.IO) {
        delay(1000)
        return@async "Dummy Response2"
    }
    val response = awaitAll(networkDeferred1, networkDeferred2)
    print("${getElapsedTime(startTime)} ,${response.joinToString()}")
    /*
    // 결과:
    지난 시간: 1016ms ,Dummy Response1, Dummy Response2
    */
}


5.3.3. 컬렉션에 대해 awaitAll 사용하기

 

코루틴 라이브러리는 awaitAll 함수를 Collection 인터페이스에 대한 확장 함수로도 제공한다.

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val networkDeferred1: Deferred<String> = async(Dispatchers.IO) {
        delay(1000)
        return@async "Dummy Response1"
    }
    val networkDeferred2: Deferred<String> = async(Dispatchers.IO) {
        delay(1000)
        return@async "Dummy Response2"
    }

    val results = listOf(networkDeferred1, networkDeferred2).awaitAll()
    print("${getElapsedTime(startTime)} ,${results.joinToString()}")
    /*
    // 결과:
    지난 시간: 1016ms ,Dummy Response1, Dummy Response2
    */
}

 


5.4. withContext


5.4.1. withContext로 async-await 대체하기

 

withContext 함수가 호출되면 함수의 인자로 설정된 CoroutineContext 객체를 사용해 block 람다식을 실행하고, 완료되면 그 결과를 반환한다.

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T

async 함수를 호출한 후 연속적으로 await 함수를 호출해 결괏값 수신을 대기하는 코드는 withContext 험수로 대체될 수 있다.

fun main() = runBlocking<Unit> {
  val networkDeferred: Deferred<String> = async(Dispatchers.IO) {
    delay(1000L) // 네트워크 요청
    return@async "Dummy Response" // 문자열 반환
  }
  val result = networkDeferred.await() // networkDeferred로부터 결과값이 반환될 때까지 대기
  println(result)
}
/*
// 결과:
Dummy Response
*/
fun main() = runBlocking<Unit> {
  val result: String = withContext(Dispatchers.IO) {
    delay(1000L) // 네트워크 요청
    return@withContext "Dummy Response" // 문자열 반환
  }
  println(result)
}
/*
// 결과:
Dummy Response
*/

 


5.4.2. withContext의 동작 방식

 

async-await 쌍은 새로운 코루틴을 생성해 작업을 처리하지만 withContext 함수는 실행 중이던 코루틴을 그대로 유지시킨 채로 코루틴의 실행 환경만 변경해 작업을 처리한다.

fun main() = runBlocking<Unit> {
  println("[${Thread.currentThread().name}] runBlocking 블록 실행")
  withContext(Dispatchers.IO) {
    println("[${Thread.currentThread().name}] withContext 블록 실행")
  }
}
/*
// 결과:
[main @coroutine#1] runBlocking 블록 실행
[DefaultDispatcher-worker-1 @coroutine#1] withContext 블록 실행
*/

 

CoroutineDispatcher 객체는 변경되었지만 코루틴은 coroutin#1 으로 같은 것을 볼 수 있다.

withContext 함수가 호출되면 실행 중인 코루틴의 실행 환경이 withContext 함수의 context 인자 값으로 변경돼 실행되며, 이를 컨텍스트 스위칭(Context Switching)이라고 부른다. 만약 context 인자로 CoroutineDispatcher 객체가 넘어온다면 코루틴은 해당 CoroutineDispatcher 객체를 사용해 다시 실행된다. 따라서 앞의 코드에서 withContext(Dispatchers.IO)가 호출되면 해당 코루틴은 다시 Dispatchers.IO의 작업 대기열로 이동한 후 Dispatchers.IO가 사용할 수 있는 스레드 중 하나로 보내져 실행된다.

 

fun main() = runBlocking<Unit> {
  println("[${Thread.currentThread().name}] runBlocking 블록 실행")
  async(Dispatchers.IO) {
    println("[${Thread.currentThread().name}] async 블록 실행")
  }.await()
}
/*
// 결과:
[main @coroutine#1] runBlocking 블록 실행
[DefaultDispatcher-worker-1 @coroutine#2] async 블록 실행
*/

코드의 실행 결과를 보면 async 블록을 실행하는 코루틴은 coroutine#2인 것을 볼 수 있다.

 



5.4.3. withContext 사용 시 주의점

 

withContext 함수는 새로운 코루틴을 만들지 않기 때문에 하나의 코루틴에서 withContext 함수가 여러 번 호출되면 순차적으로 실행된다.

fun main() = runBlocking<Unit> {
  val startTime = System.currentTimeMillis()
  val helloString = withContext(Dispatchers.IO) {
    delay(1000L)
    return@withContext "Hello"
  }

  val worldString = withContext(Dispatchers.IO) {
    delay(1000L)
    return@withContext "World"
  }

  println("[${getElapsedTime(startTime)}] ${helloString} ${worldString}")
}
/*
// 결과:
[지난 시간: 2018ms] Hello World
*/

withContext 함수가 새로운 코루틴을 만들지 않는다는 것을 명심하고 사용하자.

 

이러한 점을 이용해 withContext 함수를 사용해 코루틴의 스레드를 전환할 수도 있다.

private val myDispatcher1 = newSingleThreadContext("MyThread1")
private val myDispatcher2 = newSingleThreadContext("MyThread2")

fun main() = runBlocking<Unit> {
  println("[${Thread.currentThread().name}] 코루틴 실행")
  withContext(myDispatcher1) {
    println("[${Thread.currentThread().name}] 코루틴 실행")
    withContext(myDispatcher2) {
      println("[${Thread.currentThread().name}] 코루틴 실행")
    }
    println("[${Thread.currentThread().name}] 코루틴 실행")
  }
  println("[${Thread.currentThread().name}] 코루틴 실행")
}
/*
// 결과:
[main @coroutine#1] 코루틴 실행
[MyThread1 @coroutine#1] 코루틴 실행
[MyThread2 @coroutine#1] 코루틴 실행
[MyThread1 @coroutine#1] 코루틴 실행
[main @coroutine#1] 코루틴 실행
*/

 

 

 

경쟁 상태(Race Condition)

 

경쟁 상태는 두 개 이상의 프로세스나 스레드(또는 코루틴)가 동시에 같은 자원에 접근하려고 할 때 발생하는 문제이다.

fun main() =
    runBlocking {
        val sharedList = mutableListOf<Int>() // 공유 자원

        // async를 사용한 비동기 작업에서 동기화 없이 공유 자원에 접근
        val time =
            measureTimeMillis {
                val jobs =
                    List(1_000) { i ->
                        async(Dispatchers.Default) {
                            sharedList.add(i) // 동기화 없이 공유 자원에 접근
                        }
                    }
                jobs.awaitAll() // 모든 작업 대기
            }

        println("async로 수행한 시간: $time ms")
        println("리스트의 크기: ${sharedList.size}")
    }

/**
 * 결과
 * async로 수행한 시간: 27 ms
 * 리스트의 크기: 982 // 1000보다 작은 값이 나옴
 */

여러 개의 코루틴이 공유 자원에 접근 시 경쟁 상태가 발생해 결과가 비결정적이거나 예상치 못한 동작을 하게 될 수 있다.

공유 자원에 접근하는 경우 동기화를 해줘야 한다.

fun main() =
    runBlocking {
        val sharedList = mutableListOf<Int>() // 공유 자원

        // async를 사용한 비동기 작업에서 동기화 없이 공유 자원에 접근
        val time =
            measureTimeMillis {
                val jobs =
                    List(1_000) { i ->
                        async(Dispatchers.Default) {
                            synchronized(sharedList) {
                                sharedList.add(i)
                            }
                        }
                    }
                jobs.awaitAll() // 모든 작업 대기
            }

        println("async로 수행한 시간: $time ms")
        println("리스트의 크기: ${sharedList.size}")
    }

/**
 * 결과
 * async로 수행한 시간: 27 ms
 * 리스트의 크기: 1000
 */

withContext 함수는 새로운 코루틴을 만들지 않기 때문에 하나의 코루틴에서 withContext 함수가 여러 번 호출되면 순차적으로 실행되기 때문에 공유 자원 접근에 경쟁 상태가 발생하지 않아 sharedList의 크기가 1000인 것을 확인할 수 있다.

 

fun main() =
    runBlocking {
        val sharedList = mutableListOf<Int>() // 공유 자원

        // async를 사용한 비동기 작업에서 동기화 없이 공유 자원에 접근
        val time =
            measureTimeMillis {
                    List(1_000) { i ->
                        withContext(Dispatchers.Default) {
                            sharedList.add(i)
                        }
                    }
            }

        println("async로 수행한 시간: $time ms")
        println("리스트의 크기: ${sharedList.size}")
    }

/**
 * 결과
 * async로 수행한 시간: 33 ms
 * 리스트의 크기: 1000
 */

 


요약

 

1. async 함수를 사용해 코루틴을 실행하면 코루틴의 결과를 감싸는 Deferred 객체를 반환받는다.  
2. Deferred는 Job의 서브타입으로 Job 객체에 결괏값을 감싸는 기능이 추가된 객체이다.  
3. Deferred 객체에 대해 await 함수를 호출하면 결괏값을 반환받을 수 있다. await 함수를 호출한 코루틴은 Deferred 객체가 결괏값을 반환할 때까지 일시 중단 후 대기한다  
4. awaitAll 함수를 사용해 복수의 Deferred 코루틴이 결괏값을 반환할 때까지 대기할 수 있다.  
5. awaitAll 함수는 컬렉션에 대한 확장 함수로도 제공된다.  
6. withContext 함수를 사용해 async-await 쌍을 대체할 수 있다.  
7. withContext 함수는 코루틴을 새로 생성하지 않는다. 코루틴의 실행 환경을  담는 CoroutineContext만 변경해 코루틴을 실행하므로 이를 활용해 코루틴이 실행되는 스레드를 변경할 수 있다.  
8. withContext 함수는 코루틴을 새로 생성하지 않으므로 병렬로 실행돼야 하는 복수의 작업을 withContext로 감싸 실행하면 순차적으로 실행된다. 이럴 때는 withContext 대신 async를 사용해 작업이 병렬로 실행될 수 있도록 해야 한다.  
9. withContext로 인해 실행 환경이 변경돼 실행되는 코루틴은 withContext의 작업을 모두 실행하면 다시 이전의 실행 환경으로 돌아온다.  

반응형