Channel로 파이프라인 만들기
파이프라인은 하나의 Coroutine이 값의 스트림을 생성하는 것을 뜻한다. 값의 스트림은 무한할 수도 있다.
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
그리고 다른 Coroutine이나 Coroutines 들이 그 스트림을 소비하고, 작업을 수행하고, 다른 결과를 생성한다. 아래의 예시에서 숫자들은 단순히 제곱된다.
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
메인 코드는 모든 파이프라인을 연결하기 시작한다.
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
repeat(5) {
println(squares.receive()) // print first five
}
println("Done!") // we are done
coroutineContext.cancelChildren() // cancel children coroutines
📌 전체 코드는 이곳에서 확인할 수 있습니다.
📖 Coroutine을 생성하는 모든 함수들은 CoroutineScope의 확장함수로 정의되어 있다. 이를 통해 구조화된 동시성의 원칙에 의존하도록 해서 어플리케이션에 글로벌하게 남아있는 코루틴이 없도록 할 수 있다.*1
📖 아래 내용은 독자의 이해를 위해 번역자가 추가한 글입니다.
*1. 위 코드에서 produce { ... } 내부를 보면 다음과 같이 CoroutineScope의 확장함수로 선언되어 있다.
public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> =
produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)
이 글은 Coroutines 공식 문서를 번역한 글입니다.
원문 : Channels - Pipelines
원문 최종 수정 : 2022년 9월 28일
파이프라인으로 소수 만들기
Coroutine의 파이프라인을 사용해서 소수를 생성하는 예제를 통해 파이프라인을 극한으로 사용해보겠다. 숫자의 무한한 시퀀스로 시작해보자 :
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
다음 파이프라인 단계에서는 들어오는 숫자의 스트림을 필터링해서, 주어진 소수로 나눌 수 있는 모든 숫자들을 제거한다.
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
이제 숫자 스트림을 2에서 부터 시작하고, 현재 Channel에서 소수를 가져오고, 각 발견된 소수에 대해 새로운 파이프라인 단계를 실행하는 새로운 파이프라인을 구축한다.
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
다음 예제는 메인 스레드의 Context에서 모든 파이프라인을 실행해서, 첫 10개의 소수를 출력한다. Scope 내의 모든 Coroutine이 main 함수의 runBlocking Coroutine에서 실행되었으므로 시작한 코루틴들의 명시적인 리스트를 가지고 있을 필요가 없다. 처음 10개의 소수를 출력한 후, cancelChildren 확장 함수를 이용해서 모든 자식 Coroutine을 취소한다.
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
📌 전체 코드는 이곳에서 확인할 수 있습니다.
코드의 출력은 다음과 같다 :
2
3
5
7
11
13
17
19
23
29
표준 라이브러리의 iterator Coroutine 빌더를 사용해 같은 파이프라인을 빌드할 수 있다. produce를 iterator로 send를 yield로, receive를 next로, ReceiveChannel을 Iterator로 바꾸고 Coroutine Scope을 제거하자. runBlocking 또한 필요 없어졌다. 하지만, 위에서 다룬 채널을 사용하는 파이프라인의 이점은 Dispatcher.Default Context 상에서 실행할 경우 복수의 CPU 코어를 사용할 수 있다는 점이다.
어쨌든, 이것은 소수를 찾는 매우 비현실적인 방법이다. 실제로 파이프라인은 다른 원격 서비스에 대한 비동기 호출과 같은 일시중단 호출이 포함되며, 이 파이프라인은 sequence / iterator 을 사용해서 만들어질 수 없다. 완전히 비동기적인 produce와 달리 임의의 일시 중단을 포함할 수 없기 때문이다.
이 글은 Coroutines 공식 문서를 번역한 글입니다.
원문 : Channels - Prime numbers with pipeline
원문 최종 수정 : 2022년 9월 28일
목차로 돌아가기