안드로이드 개발자 노트
[코틀린 완벽 가이드] 13장 : 동시성 본문
1. 코루틴
1. 코루틴과 일시 중단 함수
코루틴 라이브러리의 기본 요소는 일시 중단 함수(suspend function) 이다.
이 함수는 원하는 지점에서 함수의 실행을 중단하거나 다시 실행을 계속 진행하게 할 수 있다.
suspend fun foo() {
println("Task started")
delay(100)
println("Task finished")
}
delay() 함수는 일시 중단 함수다. 이 함수는 현재 스레드를 블럭시키지 않고 자신을 호출한 함수를 일시 중단시키며 스레드를 다른 작업을 수행할 수 있게 풀어준다. 일시 중단 함수가 아닌 일반 함수에서는 사용할 수 없다.
동시성 코드의 동작을 제어할 때 공통적인 생명 주기와 문맥이 정해진 몇몇 작업이 정의된 구체적인 영역 안에서만 동시성 함수를 호출한다. 코루틴을 실행할 때 사용하는 여러 가지 함수를 코루틴 빌더라고 한다. 코루틴 빌더는 CoroutineScope 인스턴스의 확장 함수로 쓰인다. 그 중에 기본적인 것으로 GlobalScope 객체가 있다.
GlobalScope 객체를 사용하면 독립적인 코루틴을 만들 수 있고, 이 코루틴은 자신만의 작업을 내포할 수 있다.
2. 코루틴 빌더
코루틴 빌더에는 자주 사용하는 launch(), async(), runBlocking() 이 있다.
launch() 함수는 코루틴을 시작하고, 코루틴을 실행 중인 작업에서 상태를 추적하고 변경할 수 있는 Job 객체를 돌려준다.
이 함수는 CoroutineScope.() -> Unit 타입의 일시 중단 람다를 받는다. 이 람다는 새 코루틴의 본문에 해당한다.
import kotlinx.coroutines.*
import java.lang.System.currentTimeMillis
fun main() {
val time = currentTimeMillis()
GlobalScope.launch {
delay(100)
println("Task 1 finished in ${currentTimeMillis() - time} ms")
}
GlobalScope.launch {
delay(100)
println("Task 2 finished in ${currentTimeMillis() - time} ms")
}
Thread.sleep(200)
}
/*
Task 1 finished in 274 ms
Task 2 finished in 274 ms
*/
이 코드를 실행하면 다음과 같이 두 작업이 프로그램이 시작한 시점을 기준으로 거의 동시에 끝났다.
이를 통해 두 작업이 실제로 병렬적으로 실행되었지만 어느 한쪽이 더 먼저 표시될 수도 있는 것을 알 수 있다. 코루틴 라이브러리는 필요할 때 실행 순서를 강제할 수 있는 도구도 제공한다. 코루틴을 처리하는 스레드는 데몬 모드(daemon mode)로 실행되기 때문에 main() 스레드가 이 스레드보다 빨리 끝나버리면 자동으로 실행이 중단된다.
launch() 빌더는 동시성 작업이 결과를 만들어내지 않는 경우 적합하다. 그래서 이 빌더는 Unit 타입을 반환하는 람다를 인자로 받는다.
반대로 결과가 필요한 경우에는 async() 빌더 함수를 사용해야 한다.
이 함수는 Deferred의 인스턴스를 돌려주고, 이 인스턴스는 Job의 하위 타입으로 await() 메서드를 통해 계산 결과에 접근할 수 있게 해준다. await() 메서드를 호출하면 await()는 계산이 완료되거나 계산 작업이 취소될 때 까지 현재 코루틴을 일시 중단시킨다. 작업이 취소되는 경우 await()는 예외를 발생시키면서 실패한다.
suspend fun main() {
val message = GlobalScope.async {
delay(100)
"abc"
}
val count = GlobalScope.async {
delay(100)
1 + 2
}
delay(200)
val result = message.await().repeat(count.await())
println(result)
}
/*
abcabcabc
*/
이 경우 main()을 suspend로 표시해서 두 Deferred 작업에 대해 직접 await() 메서드를 호출했다.
launch()와 async() 빌더의 경우 스레드 호출을 블럭시키지는 않지만, 백그라운드 스레드를 공유하는 풀을 통해 작업을 실행한다.
runBlocking() 빌더는 디폴트로 현재 스레드에서 실행되는 코루틴을 만들고 코루틴이 완료될 때까지 현재 스레드의 실행을 블럭시킨다. 코루틴이 성공적으로 끝나면 일시 중단 람다의 결과가 runBlocking() 호출의 결괏값이 된다. 코루틴이 취소되면 runBlocking()은 예외를 던진다.
fun main() {
GlobalScope.launch {
delay(100)
println("Background task: ${Thread.currentThread().name}")
}
runBlocking {
println("Primary task: ${Thread.currentThread().name}")
delay(200)
}
}
/*
Primary task: main
Background task: DefaultDispatcher-worker-1
*/
runBlocking() 내부의 코루틴은 메인 스레드에서 실행되는 반면, launch()로 시작한 코루틴은 공유 풀에서 백그라운드 스레드를 할당 받았음을 알 수 있다.
3. 코루틴 영역과 구조적 동시성
어떤 코루틴을 다른 코루틴의 문맥에서 실행하면 후자가 전자의 부모가 된다. 이 경우 자식의 실행이 모두 끝나야 부모가 끝날 수 있도록 부모와 자식의 생명 주기가 연관된다. 이런 기능을 구조적 동시성(structured concurrency)이라고 부른다.
fun main() {
runBlocking {
println("Parent task started")
launch {
println("Task A started")
delay(200)
println("Task A finished")
}
launch {
println("Task B started")
delay(200)
println("Task B finished")
}
delay(100)
println("Parent task finished")
}
println("Shutting down...")
}
/*
Parent task started
Task A started
Task B started
Parent task finished
Task A finished
Task B finished
Shutting down...
*/
이 코드는 최상위 코루틴(runBlocking)을 시작하고 현재 CoroutineScope 인스턴스 안에서 launch를 호출해 두 가지 자식 코루틴을 시작한다. 지연을 100밀리초만 줬기 때문에 부모 코루틴의 주 본문이 더 빨리 끝난다. 하지만 부모 코루틴 자체는 이 시점에서 실행이 끝나지 않고 일시 중단 상태로 두 자식이 모두 끝날 때까지 기다린다. runBlocking()이 메인 스레드를 블럭하고 있었기 때문에 부모 스레드가 끝나야 메인 스레드의 블럭이 풀리고 마지막 메시지가 출력된다.
coroutineScope() 호출로 코드 블럭을 감싸면 커스텀 영역을 도입할 수 있다. coroutineScope() 호출은 람다의 결과를 반환하고, 자식들이 완료되기 전까지 실행이 완료되지 않는다. coroutineScope()와 runBlocking()의 가장 큰 차이는 coroutineScope()가 일시 중단 함수라 현재 스레드를 블럭시키지 않는다.
fun main() {
runBlocking {
println("Custom scope start")
coroutineScope {
launch {
delay(100)
println("Task 1 finished")
}
launch {
delay(100)
println("Task 2 finished")
}
}
println("Custom scope end")
}
}
/*
Custom scope start
Task 1 finished
Task 2 finished
Custom scope end
*/
두 자식 코루틴 실행이 끝날 때까지 앞의 coroutineScope() 호출이 일시 중단되므로 Custom scope end 메시지가 마지막에 표시된다.
4. 코루틴 문맥
코루틴마다 CoroutineContext 인터페이스로 표현되는 문맥이 연관돼 있으며, 코루틴을 감싸는 변수 영역의 coroutineContext 프로퍼티를 통해 이 문맥에 접근할 수 있다. 문맥은 키-값 쌍으로 이뤄진 불변 컬렉션이며, 코루틴에서 사용할 수 있는 여러 가지 데이터가 들어있다. 이 데이터 중 일부는 코루틴 장치에서 특별한 의미를 가지며, 런타임에 코루틴이 실행되는 방식에 영향을 미친다.
- 코루틴이 실행 중인 취소 가능한 작업을 표시하는 잡(job)
- 코루틴과 스레드의 연관을 제어하는 디스패처(dispatcher)
일반적으로 문맥은 CoroutineContext.Element를 구현하는 아무 데이터나 저장할 수 있다. 특정 원소에 접근하려면 get() 메서드나 인덱스 연산자에 키를 넘겨야 한다.
GlobalScope.launch {
// 현재 잡을 얻고 "Task is active: true"를 출력
println("Task is active: ${coroutineContext[Job.Key]!!.isActive}")
}
기본적으로 launch(), async() 등의 표준 코루틴 빌더에 의해 만들어지는 코루틴은 현재 문맥을 이어받는다. 필요하면 빌더 함수에 context 파라미터를 지정해서 새 문맥을 넘길 수도 있다. 새 문맥을 만들려면 두 문맥의 데이터를 합쳐주는 plus() 함수 연산자를 이용하거나, 주어진 키에 해당하는 원소를 문맥에서 제거해주는 minusKey() 함수를 사용하면 된다.
private fun CoroutineScope.showName() {
println("Current coroutine: ${coroutineContext[CoroutineName]?.name}")
}
fun main() {
runBlocking {
showName() // Current coroutine: null
launch(coroutineContext + CoroutineName("Worker")) {
showName() // Current coroutine: Worker
}
}
}
2. 코루틴 흐름 제어와 잡 생명 주기
잡은 동시성 작업의 생명 주기를 표현하는 객체로, 작업 상태를 추적하고 필요할 때 작업을 취소할 수 있다.
활성화 상태는 작업이 시작됐고 아직 완료나 취소로 끝나지 않았다는 뜻이다. 이 상태가 보통 디폴트이기 때문에 잡은 생성되자마자 활성화 상태가 된다. launch()나 async()는 CoroutineStart 타입의 인자를 지정해서 잡의 초기 상태를 선택하는 기능을 제공하기도 한다.
- CoroutineStart.DEFAULT: 잡을 즉시 시작한다.
- CoroutineStart.LAZY: 잡을 자동으로 시작하지 말라는 뜻이며 이 경우, 잡이 신규 상태가 되고 시작을 기다리게 된다.
이 예제는 자식 코루틴의 시작을 부모 코루틴이 메시지를 호출한 뒤로 미룬다.
fun main() {
runBlocking {
val job = launch(start = CoroutineStart.LAZY) {
println("Job started")
}
delay(100)
println("Preparing to start...")
job.start()
}
}
/*
Preparing to start...
Job started
*/
잡이 다른 잡을 시작할 수도 있는데, 이 경우 새 잡은 기존 잡의 자식이 된다. 따라서 잡의 부모 자식 관계는 동시성 계산 사이에 트리 형태의 의존 구조를 만든다. children 프로퍼티를 통해 완료되지 않은 자식 잡들을 얻을 수 있다.
fun main() {
runBlocking {
val job = coroutineContext[Job.Key]!!
launch { println("This is task A") }
launch { println("This is task B") }
// 2 children running
println("${job.children.count()} children running")
}
}
코루틴이 블록의 실행을 끝내면 잡의 상태는 '완료 중' 상태로 바뀐다. 잡은 모든 자식이 완료될 때까지 이 상태를 유지하고, 모든 자식이 완료되면 잡의 상태가 '완료 중'에서 '완료됨'으로 바뀐다.
잡의 join() 메서드를 사용하면 조인 대상 잡이 완료될 때까지 현재 코루틴을 일시 중단시킬 수 있다.
fun main() {
runBlocking {
val job = coroutineContext[Job.Key]!!
val jobA = launch { println("This is task A") }
val jobB = launch { println("This is task B") }
jobA.join()
jobB.join()
println("${job.children.count()} children running")
}
}
/*
This is task A
This is task B
0 children running
*/
다음 예제는 루트 코루틴의 메시지가 두 자식 메시지의 실행이 끝난 후에 출력되도록 보장해준다.
현재의 잡 상태를 잡의 isActive, isCancelled, isCompleted 프로퍼티로부터 추적할 수 있다.
잡 상태 | isActive | isCompleted | isCancelled |
신규 | false | false | false |
활성화 | true | false | false |
완료 중 | true | false | false |
취소 중 | false | false | true |
취소됨 | false | true | true |
완료됨 | false | true | false |
1. 취소
잡의 cancel() 메서드를 호출하면 잡을 취소할 수 있다. 이 메서드는 더 이상 필요 없는 계산을 중단시킬 수 있는 방법이다.
suspend fun main() {
val squarePrinter = GlobalScope.launch(Dispatchers.Default) {
var i = 1
while (isActive) {
println(i++)
}
}
delay(100) // 자식 잡이 어느 정도 실행될 시간을 준다
squarePrinter.cancel()
}
다음 예제는 코루틴이 취소됐는지 검사하여 잡 취소에 협력한다. isActive 확장 프로퍼티는 현재 잡이 활성화된 상태인지 검사한다.
다른 방법으로 상태를 검사하는 대신에 CancellationException을 발생시키는 것이다. 이는 잡을 취소하는 과정이 진행 중이라는 사실을 전달하는 토큰 역할을 하는 예외다. delay()나 join() 등의 모든 일시 중단 함수가 이 예외를 발생시켜준다.
한 가지 예를 추가하면 yield()를 들 수 있다. 이 함수는 실행 중인 잡을 일시 중단시켜서 자신을 실행 중인 스레드를 다른 코루틴에게 양보한다.
부모 코루틴이 취소되면 자동으로 모든 자식의 실행을 취소한다. 이 과정은 부모에게 속한 모든 잡 계층이 취소될 때까지 계속된다.
fun main() {
runBlocking {
val parentJob = launch {
println("Parent started")
launch {
println("Child 1 started")
delay(500)
println("Child 1 completed")
}
launch {
println("Child 2 started")
delay(500)
println("Child 2 completed")
}
delay(500)
println("Parent completed")
}
delay(100)
parentJob.cancel()
}
}
/*
Parent started
Child 1 started
Child 2 started
*/
이 예제는 한 쌍의 자식을 시작하는 코루틴을 실행한다. 이렇게 만들어진 세 가지 작업은 모두 완료 메시지를 표시하기 전에 500밀리초를 기다린다. 하지만 부모 잡이 100밀리초 만에 취소되어 세 잡 중 어느 하나도 완료 상태에 도달하지 못한다.
2. 타임아웃
경우에 따라 작업이 완료되기를 무작정 기다릴 수 없어서 타임아웃을 설정해야 할 때가 있다. 이럴 때 사용하는 withTimeout() 함수가 있다.
fun main() {
runBlocking {
val asyncData = async { File("data.txt").readText() }
try {
val text = withTimeout(50) { asyncData.await() }
println("Data loaded: $text")
} catch (e: Exception) {
println("Timeout exceeded")
}
}
}
다음 예제는 파일을 50밀리초 안에 읽을 수 있다면 withTimeouit()은 결과를 돌려주기만 한다. 하지만 50밀리초 안에 읽을 수 없으면 TimeoutCancellationException을 던지기 때문에 파일을 읽는 코루틴이 취소된다.
withTimeoutOrNull()이라는 것도 있다. 이 함수는 타임아웃이 발생하면 예외를 던지는 대신 널 값을 돌려준다.
3. 코루틴 디스패치하기
코루틴은 스레드와 무관하게 일시 중단 가능한 계산을 구현할 수 있게 해주지만, 코루틴을 실행하려면 스레드와 연관시켜야 한다. 특정 코루틴을 실행할 때 사용할 스레드를 제어하는 작업을 담당하는 코루틴 디스패처(dispatcher)라는 컴포넌트가 있다.
디스패처는 코루틴 문맥의 일부이며, launch()나 runBlocking() 등의 코루틴 빌더 함수에서 이를 지정할 수 있다.
fun main() {
runBlocking {
// 전역 스레드 풀 디스패처를 사용해 코루틴을 실행
launch(Dispatchers.Default) {
println(Thread.currentThread().name) // DefaultDispatcher-worker-1
}
}
}
코루틴 라이브러리에는 기본적으로 몇 가지 디스패처 구현을 제공한다. 그중 일부를 Dispatchers 객체를 통해 사용할 수 있다.
- Dispatchers.Default: 공유 스레드 풀로, 풀 크기는 디폴트로 사용 가능한 CPU 코어 수이거나 2다(둘 중 큰 값). 이 구현은 일반적으로 작업 성능이 주로 CPU 속도에 의해 결정되는 CPU 위주의 작업에 적합하다.
- Dispatchers.IO: 스레드 풀 기반이며 디폴트 구현과 비슷하지만, 파일을 읽고 쓰는 것처럼 잠재적으로 블러킹될 수 있는 I/O를 많이 사용하는 작업에 최적화돼 있다. 이 디스패처는 스레드 풀을 디폴트 구현과 함께 공유하지만, 필요에 따라 스레드를 추가하거나 종료시켜준다.
- Dispatchers.Main: 사용자 입력이 처리되는 UI 스레드에서만 배타적으로 작동하는 디스패처다.
디스패처를 명시적으로 작성하지 않으면 코루틴을 시작한 영역으로부터 디스패처가 자동으로 상속된다.
fun main() {
runBlocking {
println("Root: ${Thread.currentThread().name}")
launch {
println("Nested, inherited: ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) {
println("Nested, explicit: ${Thread.currentThread().name}")
}
}
}
/*
Root: main
Nested, explicit: DefaultDispatcher-worker-1
Nested, inherited: main
*/
이 예제는 메인 스레드에서 실행되는 최상위 코루틴을 시작한다. 이 코루틴은 내부에 두 가지 코루틴을 포함한다. 첫 번째 코루틴은 문맥을 부모 코루틴으로부터 물려받고, 두 번째 코루틴은 명시적으로 문맥을 전달받는다.
부모 코루틴이 없으면 암시적으로 Dispatchers.Default로 디스패처를 가정한다(runBlocking() 빌더는 현재 스레드를 사용).
4. 예외 처리
예외 처리의 경우, 코루틴 빌더들은 두 가지 전략 중 하나를 따른다. 첫 번째는 예외를 부모 코루틴에게 전달하는 것이다. 이 경우 예외는 다음과 같이 전파된다.
- 부모 코루틴이 똑같은 오류로 취소된다. 이로 인해 부모의 나머지 자식도 모두 취소된다.
- 자식들이 모두 취소되고 나면 부모는 예외를 코루틴 트리의 윗부분으로 전달한다.
전역 영역에 있는 코루틴에 도달할 때까지 이 과정이 반복된다. 그 후 예외가 CoroutineExceptionHandler.Consider에 의해 처리된다.
fun main() {
runBlocking {
launch {
throw java.lang.Exception("Error in task A")
println("Task A completed")
}
launch {
delay(1000)
println("Task B completed")
}
println("Root")
}
}
/*
Root
Exception in thread "main" java.lang.Exception: Error in task A
*/
이 예제는 최상위 코루틴이 한 쌍의 내부 작업을 시작하고, 그중 첫 번째 코루틴은 예외를 던진다. 이로 인해 최상위 작업이 취소되고, 최상위 자식인 두 작업도 취소된다. 최상위에서 아무 커스텀 핸들러도 지정하지 않았기 때문에 예외에 대한 스텍 트레이스가 출력된다.
CoroutineExceptionHandler는 현재 코루틴 문맥(CoroutineContext)과 던져진 예외를 인자로 전달받는다.
fun handlerException(context: CoroutineContext, exception: Throwable)
핸들러를 만드는 가장 간단한 방법은 인자가 두 개인 람다를 받는 CoroutineExceptionHandler() 함수를 쓰는 것이다.
val handler = CoroutineExceptionHandler { _, throwable ->
println("Caught $throwable")
}
이 핸들러의 인스턴스가 예외를 처리하도록 지정하려면 코루틴 문맥에 인스턴스를 넣어야 한다. 핸들러도 그냥 코루틴 문맥이므로 코루틴 빌더의 context 인자로 핸들러를 넘길 수 있다.
suspend fun main() {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
GlobalScope.launch(handler) {
launch {
throw Exception("Error in task A")
println("Task A completed")
}
launch {
delay(1000)
println("Task B completed")
}
println("Root")
}.join()
}
/*
Root
Caught java.lang.Exception: Error in task A
*/
CoroutineExceptionHandler는 전역 영역에서 실행된 코루틴에 대해서만 정의할 수 있고, CoroutineExceptionHandler가 정의된 코루틴의 자식에 대해서만 적용된다. 그래서 runBlocking() 코드를 GlobalScope.launch()로 변경하고 main() 함수를 suspend로 표시하고 join() 호출을 사용해야 한다.
예외를 처리하는 다른 방법은 async() 빌더에서 사용하는 방법으로, 던져진 예외를 저장했다가 예외가 발생한 계산에 대한 await() 호출을 받았을 때 다시 던지는 것이다.
fun main() {
runBlocking {
val deferredA = async {
throw Exception("Error in task A")
println("Task A completed")
}
val deferredB = async {
println("Task B completed")
}
deferredA.await()
deferredB.await()
println("Root")
}
}
/*
Exception in thread "main" java.lang.Exception: Error in task A
*/
deferredA.await()에서 예외가 다시 던져지므로 프로그램이 println("Root") 문장을 실행하지 못한다.
코루틴 데이터에 접근할 때 예외를 다시 던지는 방식을 채용하는 async와 유사한 빌더들의 경우 CoroutineExceptionHandler를 사용하지 않는다. 따라서 코루틴 문맥에 CoroutineExceptionHandler 인스턴스를 설정했더라도 의미가 없으며 디폴트 핸들러가 호출된다.
내포된 코루틴에서 발생한 예외를 전역 핸들러를 통하지 않고 부모 수준에서 처리하고 싶을때 try-catch 블록으로 예외를 처리하려고 시도하면 어떤 일이 일어나는지 살펴보자.
fun main() {
runBlocking {
val deferredA = async {
throw Exception("Error in task A")
println("Task A completed")
}
val deferredB = async {
println("Task B completed")
}
try {
deferredA.await()
deferredB.await()
} catch (e: Exception) {
println("Caught $e")
}
println("Root")
}
}
/*
Caught java.lang.Exception: Error in task A
Root
Exception in thread "main" java.lang.Exception: Error in task A
*/
결과는 다음과 같다. 자식(deferredA)이 실패한 경우에는 부모를 취소시키기 위해 자동으로 예외를 다시 던지기 때문에 프로그램은 여전히 예외와 함께 중단된다.
슈퍼바이저(supervisor) 잡을 사용하면 취소가 아래 방향으로만 전달된다. 슈퍼바이저를 취소하면 슈퍼바이저 잡은 자동으로 자신의 모든 자식을 취소한다. 하지만 슈퍼바이저가 아니라 자식이 취소된 경우, 슈퍼바이저나 슈퍼바이저의 다른 자식들은 아무 영향을 받지 않는다.
fun main() {
runBlocking {
supervisorScope {
val deferredA = async {
throw Exception("Error in task A")
println("Task A completed")
}
val deferredB = async {
println("Task B completed")
}
try {
deferredA.await()
deferredB.await()
} catch (e: Exception) {
println("Caught $e")
}
println("Root")
}
}
}
/*
Task B completed
Caught java.lang.Exception: Error in task A
Root
*/
이제 예외가 발생하더라도 B 작업과 루트 코루틴이 완료된다.
3. 동시성 통신
1. 채널
채널은 임의의 데이터 스트림을 코루틴 사이에 공유할 수 있는 편리한 방법이다. Channel 인터페이스가 제공하는 채널에 대한 기본 연산은 데이터를 보내는 send() 메서드와 데이터를 받는 receiver() 메서드다. 이런 메서드들이 자신의 작업을 완료할 수 없을 때 채널은 코루틴을 일시 중단시키고 나중에 처리가 가능할 때 재개한다.
제네릭 Channel() 함수를 사용해 채널을 만들 수 있다. 이 함수는 채널의 용량을 지정하는 정숫값을 받는다. 버퍼가 꽉 차면 최소 하나 이상의 채널 원소가 상대방에 의해 수신될 때까지 send() 호출이 중단되며 버퍼가 비어있으면 최소 하나 이상의 원소를 채널로 송신할 때까지 receiver() 호출이 중단된다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlin.random.Random
fun main() {
runBlocking {
val streamSize = 5
val channel = Channel<Int>(3) // 채널 용량 = 3
launch {
for (n in 1..streamSize) {
delay(Random.nextLong(100))
val square = n*n
println("Sending: $square")
channel.send(square)
}
}
launch {
for (i in 1..streamSize) {
delay(Random.nextLong(100))
val n = channel.receive()
println("Receiving: $n")
}
}
}
}
/*
Sending: 1
Receiving: 1
Sending: 4
Sending: 9
Receiving: 4
Sending: 16
Receiving: 9
Sending: 25
Receiving: 16
Receiving: 25
*/
이 예제는 첫 번째 코루틴은 정수 제곱 값의 스트림을 만들어내고, 원소를 세 개 저장할 수 있는 채널에 이 스트림을 전송한다. 이와 동시에 두 번째 코루틴이 생성된 수를 수신한다. 두 코루틴 중 어느 한 쪽이 뒤쳐져 채널 버퍼가 꽉 차거나 비는 경우가 생겨서 일시 중단이 발생할 수 있게 일부러 지연 시간을 난수로 지정했다. 그래서 위와 같이 다양한 출력이 생길 수 있다.
출력이 실제 지연 시간 값이나 다른 환경에 의해 달라질 수 있지만, 채널은 모든 값이 송신된 순서 그대로 수신되도록 보장한다.
Channel() 함수는 채널의 동작을 바꿀 수 있는 여러 특별한 값을 받을 수 있다. 이러한 값은 Channel 인터페이스의 동반 객체에 상수로 정의돼 있다.
- Channel.UNLIMITED (= Int.MAX_VALUE): 이 경우 채널의 용량 제한은 없고, 내부 버퍼는 필요에 따라 증가한다. 이런 채널은 send() 시에 결코 일지 중단되는 일이 없다. 다만 receiver()를 하는 경우 버퍼가 비어있으면 일시 중단될 수 있다.
- Channel.SENDEZVOUS (= 0): 이 경우 채널은 아무 내부 버퍼가 없는 랑데부 채널이 된다. send() 호출은 다른 어떤 코루틴이 receiver()를 호출할 때까지 일시 중단된다. 채널 생성 시 용량을 지정하지 않으면 이 방식의 채널이 생성된다.
- Channel.CONFLATED (= -1): 이 경우에는 송신된 값이 합쳐지는 채널이다. 이 말은 send()로 보낸 원소를 최대 하나만 버퍼에 저장하고 이 값이 누군가에 의해 수신되기 전에 다른 send() 요청이 오면 기존의 값을 덮어 쓴다는 뜻이다. 따라서 수신되지 못한 원소 값은 소실된다. 이 채널의 경우 send() 메서드는 결코 일시 중단되지 않는다.
- Channel.UNLIMITED보다 작은 임의의 양수를 지정하면 버퍼 크기가 일정하게 제한된 채널이 생긴다.
랑데부 채널은 생산자와 소비자 코루틴이 교대로 활성화되도록 보장한다. 예를 들어 Channel<Int>(Channel.RENDEZVOUS)처럼 앞 예제의 채널 크기를 0으로 바꾸면 딜레이 시간와 관계없이 안정적인 동작 순서를 볼 수 있다.
Sending: 1
Receiving: 1
Sending: 4
Receiving: 4
Sending: 9
Receiving: 9
Sending: 16
Receiving: 16
Sending: 25
Receiving: 25
값이 합쳐지는 채널은 스트림에 기록한 모든 원소가 도착할 필요가 없고 소비자 코루틴이 뒤처지면 생산자가 만들어낸 값 중 일부를 버려도 되는 경우 쓰인다. 첫 번째 예제의 소비자 지연 시간을 생산자 지연 시간의 두 배로 설정해면,
fun main() {
runBlocking {
val streamSize = 5
val channel = Channel<Int>(Channel.CONFLATED)
launch {
for (n in 1..streamSize) {
delay(100)
val square = n*n
println("Sending: $square")
channel.send(square)
}
}
launch {
for (i in 1..streamSize) {
delay(200)
val n = channel.receive()
println("Receiving: $n")
}
}
}
}
/*
Sending: 1
Receiving: 1
Sending: 4
Sending: 9
Receiving: 9
Sending: 16
Sending: 25
Receiving: 25
*/
다음과 같이 생산된 값 중 대략 절반만 수신돼 처리된다. 이 프로그램은 마지막 줄을 출력한 다음에도 프로그램이 끝나지 않는다. 수신자 코루틴이 1부터 streamSize까지 이터레이션하므로 수신할 것으로 기대하는 원소 개수가 다섯 개이기 때문이다. 실제 수신되는 원소는 streamSize/2 근처이므로 다음과 같은 결과가 나온다.
이 상황에서 필요한 것은 채널이 닫혀서 더 이상 데이터를 보내지 않는다는 사실을 알려주는 일종의 신호다. Channel API는 생산자 쪽에서 close() 메서드를 사용해 이런 신호를 보낼 수 있게 해준다.
fun main() {
runBlocking {
val streamSize = 5
val channel = Channel<Int>(Channel.CONFLATED)
launch {
for (n in 1..streamSize) {
delay(100)
val square = n*n
println("Sending: $square")
channel.send(square)
}
channel.close()
}
launch {
for (n in channel) {
println("Receiving: $n")
delay(200)
}
}
}
}
/*
Sending: 1
Receiving: 1
Sending: 4
Receiving: 4
Sending: 9
Sending: 16
Receiving: 16
Sending: 25
Receiving: 25
*/
이제 데이터 교환이 완료된 후 프로그램이 제대로 끝난다.
소비자 쪽에서는 명시적인 이터레이션을 사용하지 않고 consumeEach() 함수를 통해 모든 채널 콘텐츠를 얻어서 사용할 수 있다.
channel.consumeEach {
println("Receiving $it")
delay(200)
}
채널이 닫힌 후 send()를 호출하면 ClosedSendChannelException 예외가 발생하며 실패한다. 채널이 닫힌 후 receive()를 호출하면 버퍼에 있는 원소가 소진될 때까지 정상적으로 원소가 반횐되지만, 모두 소진된 후 호출하면 마찬가지로 ClosedSendChannelException 예외가 발생한다.
채널 통신에 참여하는 생산자와 소비자가 꼭 하나일 필요는 없으며 한 채널을 여러 코루틴이 동시에 읽을 수도 있다. 이런 경우를 팬 아웃(fan out)이라고 한다.
fun main() {
runBlocking {
val streamSize = 5
val channel = Channel<Int>(2)
launch {
for (n in 1..streamSize) {
val square = n*n
println("Sending: $square")
channel.send(square)
}
channel.close()
}
for (i in 1..3) {
launch {
for (n in channel) {
println("Receiving by consumer #$i: $n")
delay(Random.nextLong(100))
}
}
}
}
}
/*
Sending: 1
Sending: 4
Sending: 9
Receiving by consumer #1: 1
Receiving by consumer #2: 4
Receiving by consumer #3: 9
Sending: 16
Sending: 25
Receiving by consumer #2: 16
Receiving by consumer #1: 25
*/
다음 예제는 생산자 코루틴이 생성한 데이터 스트림을 세 소비자 코루틴이 나눠 받는다. 마찬가지로 여러 생산자 코루틴이 한 채널에 써넣은 데이터를 한 소비자 코루틴이 읽는 팬 인(fan in)도 있다. 일반적인 경우에, 여러 생산자와 여러 소비자가 여러 채널을 공유할 수도 있다.
2. 생산자
sequence() 함수와 비슷하게 동시성 데이터 스트림을 생성할 수 있는 produce()라는 특별한 코루틴 빌더가 있다. 이 빌더는 채널과 비슷한 send() 메서드를 제공하는 ProducerScope 영역을 도입해준다.
fun main() {
runBlocking {
val channel = produce {
for (n in 1..5) {
val square = n*n
println("Sending: $square")
send(square)
}
}
launch {
channel.consumeEach { println("Receiving: $it") }
}
}
}
이 경우 채널을 명시적으로 닫을 필요가 없다. 코루틴이 종료되면 produce() 빌더가 채널을 자동으로 닫아준다.
예외처리 관점에서 produce()는 async() / await()의 정책을 따른다. produce() 안에서 예외가 발생하면 예외를 저장했다가 해당 채널에 대해 receive()를 가장 처음 호출한 코루틴 쪽에 예외가 다시 던져진다.
3. 티커
coroutines 라이브러리에는 티커(ticker)라는 특별한 랑데부 채널이 있다. 이 채널은 Unit 값을 계속 발생시키되 한 원소와 다음 원소의 발생 시점이 주어진 지연 시간만큼 떨어져 있는 스트림을 만든다. 이 채널을 만들려면 ticker() 함수를 사용해야 한다. 이 함수를 호출할 때 다음을 지정할 수 있다.
- delayMillis: 티커 원소의 발생 시간 간격을 밀리초 단위로 지정한다.
- initialDelayMillis: 티커 생성 시점과 원소가 최초로 발생하는 시점 사이의 시간 간격이다. 디폴트 값은 delayMillis와 같다.
- context: 티커를 실행할 코루틴 문맥이며 디폴트는 빈 문맥이다.
- mode: 티커의 행동을 결정하는 TickerMode 이넘이다.
- TickerMode.FIXED_PERIOD: 생성되는 원소 사이의 시간 간격을 지정된 지연 시간에 최대한 맞추기 위해 실제 지연 시간을 조정한다.
- TickerMode.FIXED_DELAY: 실제 흘러간 시간과 관계없이 delayMillis로 지정한 지연 시간만큼 시간을 지연시킨 후 다음 원소를 송신한다.
티커 모드의 차이는 알아보기 위해 다음 코드를 보면,
fun main() = runBlocking {
val ticker = ticker(100)
println(withTimeoutOrNull(50) { ticker.receive()} )
println(withTimeoutOrNull(60) { ticker.receive()} )
delay(250)
println(withTimeoutOrNull(1) { ticker.receive()} )
println(withTimeoutOrNull(60) { ticker.receive()} )
println(withTimeoutOrNull(60) { ticker.receive()} )
}
/*
null
kotlin.Unit
kotlin.Unit
kotlin.Unit
null
*/
실행 과정의 단계는 다음과 같다.
- 50밀리초 간격으로 티커 신호를 받으려고 시도한다. 티커 지연 시간이 100밀리초이므로 withTimeoutOrNull()은 신호를 받지 못하고 타임아웃이 걸려서 널을 반환한다.
- 그 후 다음 60밀리초 안에 신호를 받으려고 시도한다. 이번에는 티커가 시작하고 나서 100밀리초가 확실히 지날것이므로 분명히 널이 아닌 결괏값을 받는다. receive()가 호출되면 티커가 재개된다.
- 그 후 소비자 코루틴이 약 250밀리초 동안 일시 중단된다. 100밀리초 후에 티커는 다른 신호를 보내고 신호가 수신될 때까지 일시 중단된다. 이후 소비자와 티커 코루틴 모두 150밀리초 동안 일시 중단 상태로 남는다.
- 소비자 코루틴이 재개되고 신호를 요청하려고 시도한다. 신호가 이미 보내졌기 때문에 receive()는 즉시 결과를 반환한다. 이제 티커는 마지막 신호를 보내고 나서 얼마나 시간이 지났는지 검사하고, 250밀리초가 지났다는 사실을 알게 된다. 이는 두 번의 완전한 주기(200밀리초)와 50밀리초의 나머지에 해당한다. 티커는 자신의 대기 시간을 조정해서 다음 신호를 보낼 때까지의 지연 시간을 100 - 50 = 50밀리초로 줄인다. 이렇게 줄이는 이유는 한 주기에 해당하는 지연 시간(100밀리초)이 지난 후 새 신호를 보내기 위해서다.
- 소비자는 50밀리초 타임아웃 안에 신호를 받으려고 시도한다. 다음 신호가 50밀리초 이전에 보내졌기 때문에 이 시도는 거의 확실히 성공할 것이다.
- 마지막으로, 신호를 받으려는 receive() 호출이 거의 즉시 일어난다. 따라서 티커는 전체 지연 시간(100밀리초)을 다시 기다린다. 그 결과, 마지막 receive() 호출은 60밀리초 타임아웃 안에 티커로부터 신호를 받지 못하기 때문에 널을 받는다.
4. 액터
액터 모델은 가변 상태를 공유하는 방법을 구현한다. 액터는 내부 상태와 다른 액터에게 메시지를 보내서 동시성 통신을 진행할 수 있는 수단을 제공하는 객체다. 액터는 자신에게 들어오는 메시지를 listen하고, 자신의 상태를 바꾸면서 메시지에 응답할 수 있으며, 다른 메시지를 자기 자신이나 다른 엑터에게 보낼 수 있고, 새로운 액터를 시작할 수 있다. 액터의 상태는 액터 내부에 감춰져 있으므로 다른 액터가 직접 이 상태에 접근할 수 없다. 다른 액터는 메시지를 보내고 응답을 받아서 상태를 알 수 있을 뿐이다.
actor() 코루틴 빌더를 사용해 액터를 만든다. 액터는 특별한 영역(ActorScope)을 만들며, 이 영역은 기본 코루틴 영역에 자신에게 들어오는 메시지에 접근할 수 있는 수신자 채널이 추가된 것이다. actor() 빌더는 결과가 아닌 잡을 생성한다는 점에서 launch()와 비슷하다.
아래 예제에서는 은행 계좌 잔고를 유지하고 어떤 금액을 저축하거나 인출할 수 있는 액터를 만든다.
sealed class AccountMessage
class GetBalance(
val amount: CompletableDeferred<Long>
) : AccountMessage()
class Deposit(val amount: Long) : AccountMessage()
class Withdraw(
val amount: Long,
val isPermitted: CompletableDeferred<Boolean>
) : AccountMessage()
먼저, 메시지를 표현하는 클래스를 몇 가지 정의했다. 봉인된 클래스를 사용하면 AccountMessage 타입을 처리하는 when 식에서 else를 쓰지 않아도 된다. GetBalance 인스턴스에는 CompletableDeferred라는 타입의 amount 프로퍼티가 있다. 액터는 이 프로퍼티를 사용해서 GetBalance 메시지를 보낸 코루틴에게 현재 잔고를 돌려준다. Withdraw 클래스에도 인출에 성공하면 true를 돌려주고 그렇지 않으면 false를 돌려주기 위한 isPermitted라는 프로퍼티가 있다.
이제 계좌 잔고를 유지하는 액터를 구현할 수 있다. 메시지가 들어오는 채널을 계속 폴링(polling)하면서 수신한 메시지의 종류에 따라 적절한 동작을 수행한다.
fun CoroutineScope.accountManager(
initialBalance: Long
) = actor<AccountMessage> {
var balance = initialBalance
for (message in channel) {
when (message) {
is GetBalance -> message.amount.complete(balance)
is Deposit -> {
balance += message.amount
println("Deposited ${message.amount}")
}
is Withdraw -> {
val canWithdraw = balance >= message.amount
if (canWithdraw) {
balance -= message.amount
println("Withdrawn ${message.amount}")
}
message.isPermitted.complete(canWithdraw)
}
}
}
}
actor() 빌더는 produce()에 대응한다. 액터는 기본적으로 랑데부 채널을 사용하지만 actor() 함수를 호출하면서 용량을 변경하면 채널의 성격을 바꿀 수 있다.
액터 클라이언트에게 요쳥 결과를 돌려줄 때는 CompletableDeferred에서 complete() 메서드를 사용한다.
다음은 이 액터와 통신하는 코루틴 구현이다.
private suspend fun SendChannel<AccountMessage>.deposit(
name: String,
amount: Long
) {
send(Deposit(amount))
println("$name: deposit $amount")
}
private suspend fun SendChannel<AccountMessage>.tryWithdraw(
name: String,
amount: Long
) {
val status = CompletableDeferred<Boolean>().let {
send(Withdraw(amount, it))
if (it.await()) "OK" else "DENIED"
}
println("$name: withdraw $amount ($status)")
}
private suspend fun SendChannel<AccountMessage>.printBalance(
name: String
) {
val balance = CompletableDeferred<Long>().let {
send(GetBalance(it))
it.await()
}
println("$name: balance is $balance")
}
fun main() {
runBlocking {
val manager = accountManager(100)
withContext(Dispatchers.Default) {
launch {
manager.deposit("Client $1", 50)
manager.printBalance("Client #1")
}
launch {
manager.tryWithdraw("Client #2", 100)
manager.printBalance("Client #2")
}
}
manager.tryWithdraw("Client #0", 1000)
manager.printBalance("Client #0")
manager.close()
}
}
/*
Withdrawn 100
Deposited 50
Client $1: deposit 50
Client #2: withdraw 100 (OK)
Client #1: balance is 50
Client #2: balance is 50
Client #0: withdraw 1000 (DENIED)
Client #0: balance is 50
*/
액터에게 메시지를 보내려면 액터가 사용하는 채널에 대해 send() 메서드를 호출해야 한다. 연산 순서는 달라질 수 있지만 결과는 일관성이 있다. 공개적으로 접근 가능한 가변 상태가 없기 때문에 락이나 임계 영역 같은 동기화 요소를 사용하지 않아도 된다.
4. 자바 동시성 사용하기
1. 스레드 시작하기
스레드를 시작하려면 Runnable 객체에 대응하는 람다와 스레드 프로퍼티들을 지정해서 thread() 함수를 사용하면 된다.
- start: 스레드를 생성하자마자 시작할지 여부(디폴트는 true)
- isDaemon: 스레드를 데몬 모드로 시작할지 여부(디폴트는 false). 데몬 스레드는 JVM 종료를 방해하지 않고 메인 스레드가 종료될 때 자동으로 함께 종료된다.
- contextClassLoader: 스레드 코드가 클래스 자원을 적재할 때 사용한 클래스 로더(디폴트는 null)
- name: 커스텀 스레드 이름. 디폴트는 널인데, 이는 JVM이 이름을 자동으로 지정한다는 뜻이다.(Thread-1, Thread-2 등으로 정해진다).
- priority: Thread.MIN_PRIORITY(=1)부터 Thread.MAX_PRIORITY(=10) 사이의 값으로 정해지는 순서로, 어떤 스레드가 다른 스레드에 비해 얼마나 많은 CPU 시간을 배정받는지 결정한다. 디폴트 값은 -1이며, 이 값은 자동으로 우선순위를 정하라는 뜻이다.
- block: ()->Unit 타입의 함숫값으로 새 스레드가 생성되면 실행할 코드다.
다음 예제는 150밀리초마다 메시지를 출력하는 스레드를 시작한다.
fun main() {
println("Starting a thread...")
thread(name = "Worker", isDaemon = true) {
for (i in 1..5) {
println("${Thread.currentThread().name}: $i")
Thread.sleep(150)
}
}
Thread.sleep(500)
println("Shutting down...")
}
/*
Starting a thread...
Worker: 1
Worker: 2
Worker: 3
Worker: 4
Shutting down...
*/
새 스레드가 데몬으로 시작했으므로, 메인 스레드가 500밀리초 동안 슬립한 다음 실행을 끝낼 때 이 스레드도 함께 끝나기 때문에 메시지가 네 개만 출력된다.
timer() 함수는 어떤 작업을 이전 작업이 끝난 시점을 기준으로 고정된 시간 간격으로 실행하는 타이머를 설정한다. 즉, 어떤 작업이 시간이 오래 걸리면 이후 모든 실행이 연기된다. 이 타이머는 FIXED_RATE 모드로 작동하는 코루틴 티커에 비유할 수 있다.
timer() 에 다음 옵션을 지정할 수 있다.
- name: 타이머 스레드의 이름(디폴트는 널)
- daemon: 타이머 스레드를 데몬 스레드로 할지 여부(디폴트는 false)
- startAt: 최초로 타이머 이벤트가 발생하는 시간을 나타내는 Date 객체
- period: 연속된 타이머 이벤트 사이의 시간 간격(밀리초 단위)
- action: 타이머 이벤트가 발생할 때마다 실행될 TimeTask.()->Unit 타입의 람다
앞 예제에 타이머를 사용한다면 다음과 같다.
fun main() {
println("Starting a thread...")
var counter = 0
timer(period = 150, name = "Worker", daemon = true) {
println("${Thread.currentThread().name}: ${++counter}")
}
Thread.sleep(500)
println("Shutting down...")
}
2. 동기화와 락
동기화는 특정 코드 조각이 한 스레드에서만 실행되도록 보장하기 위한 공통적인 기본 요소다. 이런 코드 조각을 다른 스레드가 실행하고 있다면 해당 코드에 진입하려고 시도하는 다른 스레드들은 모두 대기해야 한다.
자바에서는 코드에 동기화를 도입하는 두 가지 방법이 있다.
- 락으로 사용하려는 어떤 객체를 지정하는 특별한 동기화 블록을 사용해 동기화해야 하는 코드를 감싼다.
- 메서드에 synchronized 변경자를 붙인다. 이럴 경우 메서드 본문 전체가 현재의 클래스 인스턴스나 Class 인스턴스 자체에 의해 동기화된다. 코틀린에서는 @Synchronized 애너테이션을 통해 같은 목적을 달성할 수 있다.
동기화 블록과 비슷한 어떤 Lock 객체를 사용해 주어진 람다를 실행하게 해주는 withLock() 함수도 있다. withLock()을 사용하면 함수가 알아서 락을 풀어준다. 그 외에 ReentrantReadWriterLock의 읽기와 쓰기 락을 사용해 주어진 작업을 수행하는 read()와 write()함수도 있다. write() 함수는 기존 읽기 락을 쓰기 락으로 자동 승격시켜줌으로써 재진입 가능한 락의 의미를 유지한다.
'Kotlin > 코틀린 완벽 가이드' 카테고리의 다른 글
[코틀린 완벽 가이드] 14장 : 코틀린 테스팅 (0) | 2022.12.12 |
---|---|
[코틀린 완벽 가이드] 11장 : 도메인 특화 언어 (0) | 2022.11.20 |
[코틀린 완벽 가이드] 10장 : 애너테이션과 리플렉션 (0) | 2022.11.15 |