병행 컴퓨팅을 위한 Actor model - Sun, Jun 12, 2022
Actor model
Actor model(이하 액터 모델)은 가변 상태를 스레드 안전하게 공유하는 방법으로, 상태에 접근할 수 있는 스레드를 하나로 제한 및 내부에 감춰 자신만이 관리할 수 있고, 다른 스레드는 상태 변경이 필요한 경우 해당 스레드에게 메세지를 보내 요청하게 된다.
- 락을 소유한 스레드의 작업이 끝나기 전 까지 다른 스레드는 대기해야 한다.
- 우선순위가 낮은 프로세스의 경우 기아 현상(Starvation)이 발생할 수 있다.
- 락을 소유한 스레드가 반환되지 않고 종료된 경우 데드락(Dead lock)이 발생한다.
액터 모델을 사용하게 되면 동시성 프로그래밍에서 상태 변경을 안전하게 제어하는 방법 중 하나인 락(Lock) 기반의 동기화와 관련한 여러 가지 문제로부터 자유로워질 수 있다.
액터 모델은 사진에서 표현된 Mailbox(또는 buffer, channel, queue, etc..)에 요청받은 메세지를 보관하고 이를 읽어 적절한 행위를 수행하며 자신의 상태를 변경한다. 메세지를 통해 수신자와 발신자의 때문에 다른 스레드는 요청 후 대기하지 않고 자신의 작업을 수행할 수 있고, 락을 얻기 위한 경쟁 상태(race condition)를 피할 수 있다.
코틀린에서의 Actor
The problem
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
동시성 프로그래밍에서 공유 상태의 안전한 연산을 하기란 쉬운 일이 아니다. 위 코드의 경우 100개의 코루틴이 각각 1000번 연산한 결과는 실행마다 결과가 달라진다.
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
이를 해결하고자 @Volatile
어노테이션을 추가해도 원자성이 보장되지 않기 때문에 문제를 해결되지 않는다.
Actors
코틀린에서는 동시성 프로그래밍을 쉽게 개발하기 위해 다양한 언어적 장치를 제공한다. 그 중 하나가 액터 모델을 사용할 수 있도록 도와주는 actor
키워드로 아래와 같이 사용될 수 있다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
fun main() = runBlocking<Unit> {
val counter = counterActor() // create the actor
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
액터는 메세지를 받을 수 있는 채널을 통해 요청을 기다리고, 요청의 종류에 따라 자신의 상태를 변경한다. 위 코드를 보면 액터는 자신의 상태(counter)의 상태를 변경하기 전 어떠한 임계 영역을 만들지도 않고 일반적인 단일 스레드 형식의 코드를 작성할 수 있다.
참고문헌
Shared mutable state and concurrency https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html#actors
Introduction To Actor Model https://medium.com/@nguyenthanhquang.cse/introduction-to-actor-model-2c82d25d1b83
Actor Model of Computation: Scalable Robust Information Systems - Carl Hewitt
Actor model https://en.wikipedia.org/wiki/Actor_model