Buffered channels*1
지금까지 보여진 Channel에는 Buffer가 없다. Buffer되지 않은 채널은 발신자와 수신자가 서로 만날 때 값을 전송한다. 이는 랑데뷰라고도 불린다.*2 만약 send가 먼저 실행되면, receive가 실행될 때까지 일시 중단된다. 만약 receive가 먼저 실행되면, send가 실행될 때까지 일시 중단된다.
Channel() 팩토리 함수와 produce 빌더 모두 Buffer 크기를 정하기 위해 선택적으로 capacity 파라미터를 받는다. BlockingQueue와 비슷하게, Buffer은 지정된 capacity만큼의 용량을 두고 발신자가 일시 중단 전에 복수의 원소들을 보낼 수 있도록 하고*3, Buffer가 꽉 차면 중단*4한다.
다음 코드의 동작을 살펴보자 :
val channel = Channel<Int>(4) // create buffered channel
val sender = launch { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
📌 전체 코드는 이곳에서 확인할 수 있습니다.
이는 capacity가 4인 Buffered Channel을 사용하므로 sending을 다섯 번 출력한다.
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
첫 4개의 원소는 buffer에 추가되고, 발신자는 5번째 것을 보내려고 할 때 일시 중단 한다.
📖 아래 내용은 독자의 이해를 위해 번역자가 추가한 글입니다.
*1. Buffer은 먼저 들어온 입력이 처리되지 않았을 때 다음 입력을 저장하기 위한 공간이다. Buffered Channel은 먼저 들어온 입력이 처리되지 않았을 때 send를 통해 받은 다음 입력을 저장해놓는 공간이 있는 채널을 뜻한다.
*2. 원문은 (aka rendezvous)지만, 이해를 위해 별도로 풀어썼다. 랑데뷰는 프랑스어로 '만나다' 라는 뜻이다.
*3. capacity=0이 기본 값인데 하나가 들어올 때마다 일시중단 된다.
*4. 원문은 block이지만, Channel은 일시중단 하므로 중단으로 번역하였다
이 글은 Coroutines 공식 문서를 번역한 글입니다.
원문 : Channels - Buffered channels
원문 최종 수정 : 2022년 9월 28일
Channel은 평등하다
Channel로의 보내고 받는 작업은 복수의 Coroutine을 호출하는 순서에 대해 공정하다. Channel은 FIFO구조로 제공되며, 먼저 receive를 호출하는 Coroutine이 원소를 갖게 된다. 다음 예제에서 "ping"과 "pong"이라 불리는 두 Coroutine은 공유된 table Channel을 통해 "ball" 객체를 수신한다.
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
"ping" Coroutine이 먼저 시작한다. 따라서 이 Coroutine이 ball을 먼저 수신한다. "ping" Coroutine이 ball을 table로 돌려보낸 후, 즉시 다시 receive를 시작하더라도 "pong" Coroutine이 이미 수신 대기 하고 있기 때문에 ball은 "pong" Coroutine이 받는다.
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
때때로 채널은 사용중인 실행기의 특성으로 인해 불공평해보이게 실행될 수 있다. 자세한 사항은 이 이슈에서 확인하자. *1
📖 아래 내용은 독자의 이해를 위해 번역자가 추가한 글입니다.
*1. 이 문제는 실무에서도 가끔 보이는 문제이다. 하나의 스레드에서 여러 Coroutine이 실행될 때 생긴 문제로 보인다. Coroutine이 작업 도중 yield를 하면 먼저 Thread를 점유하는 Coroutine이 실행이 먼저 된다. 따라서 하나의 스레드에서 Coroutine이 여러개 동작하면 Channel에 대해 누가 먼저 receive를 수행할지는 랜덤이 되어버린다. 이유는 receive를 통해 일시 중단이 되지 않았기 때문이다.
이 글은 Coroutines 공식 문서를 번역한 글입니다.
원문 : Channels - Debugging coroutines and threads
원문 최종 수정 : 2022년 9월 28일
Ticker channels
Ticker Channel은 Channel에서 마지막으로 소비가 일어나고 일정 시간이 지난 이후에 Unit을 생성해내는 특별한 랑데뷰 채널이다. 이 자체로는 쓸모 없어 보일지 모르지만, 시간을 기반으로한 복잡한 produce 파이프라인 블록을 구축하거나 Windowing*1이나 시간에 의존적인 처리를 하는데 유용하다. Ticker Channel은 "on tick" 동작을 수행하기 위해 선택될 수 있다.
이러한 Channel을 만들기 위해 ticker 라 불리는 팩토리 메서드를 사용한다. 더이상 원소를 받을 필요가 없음을 나타내기 위해서는 ReceiveChannel.cancel 메서드를 사용하면 된다.
실제로 어떻게 동작하는지 살펴보자 :
fun main() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // no initial delay
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
이는 다음 줄들을 출력한다 :
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
ticker는 소비자가 일시중지 하는 것을 알고, 기본 동작으로 일시중지가 발생하면 다음 원소가 생산되는 것을 지연시켜, 원소가 일정 비율로 생성되도록 유지한다.
선택적으로 mode 매개변수를 TickerMode.FIXED_DELAY 로 설정해서 두 원소 간에 일정한 지연이 발생하도록 할 수 있다.
📖 아래 내용은 독자의 이해를 위해 번역자가 추가한 글입니다.
*1. Windowing이란 송신측에서 수신 확인을 받지 않고 지속적으로 보내는 기법이다.
이 글은 Coroutines 공식 문서를 번역한 글입니다.
원문 : Channels - Ticker channels
원문 최종 수정 : 2022년 9월 28일
목차로 돌아가기