Flow 실행하기
일부 소스에서 오는 비동기 이벤트를 표현하기 위해 flow를 사용하기 쉽다. 이런 경우, 들어오는 이벤트에 대한 반응을 코드로 등록하고 이후의 작업을 계속해서 수행하도록 하는 addEventListener() 함수와 비슷한 역할을 하는 것이 필요하다. 이 역할을 onEach 연산자가 해줄 수 있다. 그러나, onEach는 중간 연산자이다. Flow를 수집하기 위해서는 터미널 연산자 또한 필요하다. 그렇지 않으면 onEach만을 호출하는 것만으로는 효과가 없다.
만약 onEach 이후에 collect 터미널 연산자를 사용하면, 이후의 코드는 Flow가 수집될 때까지 기다릴 것이다 :
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
확인할 수 있듯이 다음과 같이 출력된다 :
Event: 1
Event: 2
Event: 3
Done
launchIn 터미널 연산자가 여기서 편리하게 사용될 수 있다. collect를 launchIn으로 대체함으로써 Flow의 수집을 별도의 Coroutine에서 실행할 수 있으므로, 이후의 코드들이 즉시 계속해서 실행될 수 있다.
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
이는 다음과 같이 출력된다 :
Done
Event: 1
Event: 2
Event: 3
launchIn에서 필요로 하는 CoroutineScope 파라미터는 CoroutineScope을 특정해 Flow가 실행되면 어떤 Coroutine이 수집을 할지를 결정하도록 한다. 위의 예제에서 이 Scope는 runBlocking Coroutine 빌더로부터 와서, flow가 실행되는 동안 runBlocking Scope가 자식 코루틴이 완료될 때까지 기다리도록 하고 main 함수를 반환하는 것을 방지해서 예제가 종료되지 않도록 한다.
실제 어플리케이션들에서는 한정된 생애를 가진 엔티티로부터 Scope를 가져온다*1. 엔터티의 생애가 종료되는 순간 해당 Scope는 취소되며, 해당 Flow의 수집은 중단된다. 이러한 방식으로 onEach { ... }.launchIn(scope) 쌍은 addEventListener과 같이 동작한다. 하지만, 취소와 구조화된 동시성이 removeEventListener 함수에 해당하는 역할을 대신 수행해주기 때문에 필요 없다*2.
launchIn 또한 전체 Scope을 취소하거나 join하지 않고 해당 Flow를 수집하는 Coroutine만을 cancel하기 위해 사용할 수 있는 Job을 반환한다는 점을 명심하자.
Flow 취소 체크
편의를 위해 flow 빌더는 추가적으로 방출된 각 값에 대한 취소 동작을 하기 위한 ensureActive 체크를 수행한다. 이는 flow { ... } 에서 루프를 돌면서 바쁘게 방출되는 것이 취소 가능하다는 것을 뜻한다.
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
숫자를 3까지만 소모하고 4를 방출한 다음에 CancellationException이 발생한다 :
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
하지만, 다른 대부분의 Flow 연산자들은 성능상의 이유로 추가적인 취소 체크를 하지 않는다. 예를 들어, 만약 IntRange.asFlow 확장 함수를 같은 바쁜 루프를 작성하기 위해 사용하고 아무 곳에서도 일시 중단 하지 않는다면, 취소를 위한 체크는 일어나지 않는다.
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
1부터 5까지의 모든 숫자들이 수집되고, runBlocking이 반환되기 전에만 취소가 감지된다.
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
바쁜 Flow를 취소 가능하게 만들기
Coroutine에 바쁜 루프가 존재한다면 명시적으로 취소를 체크해야 한다. .onEach { currentCoroutineContext().ensureActive() } 를 추가할 수도 있지만 cancellable 연산자가 해당 역할을 수행하기 위해 이미 정의되어 있다 :
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
cancellable 연산자를 사용하면 1부터 3까지의 숫자들만 수집된다 :
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
📖 아래 내용은 독자의 이해를 위해 번역자가 추가한 글입니다.
*1. Android Activity의 lifecycleScope이나 ViewModel의 viewModelScope이 이 역할을 수행한다.
*2. Coroutine은 구조화된 동시성의 원칙을 따르기 때문에 Scope이 취소되면 Scope에 속한 Coroutine들은 자동적으로 취소된다. Listener을 별도로 제거할 필요가 없다.
이 글은 Coroutines 공식 문서를 번역한 글입니다.
원문 : Asynchronous Flow - Launching flow
원문 최종 수정 : 2022년 9월 28일
Flow와 Reactive Stream
리액티브 스트림이나 Rxjava나 Project Reactor 같은 리액티브 프레임웍에 익숙한 사람들은 Flow를 설계 하는게 아주 익숙할 것이다.
실제로, Flow의 설계는 리액티브 스트림과 그에 대한 다양한 구현체들에 영감을 받았다. 하지만, Flow의 주요 목표는 가능한 단순하게 디자인을 하는 것이며, Kotlin의 일시중단 친화적이고 구조적인 동시성을 존중하는 것이다. 이러한 목표를 이루는 것은 리액티브 선지자과 그들의 엄청난 작업들이 없으면 불가능할 것이다. 이에 대한 완전한 이야기는 Reactive Streams and Kotlin Flows 기사에서 읽을 수 있다.
개념적으로는 다르지만, Flow는 리액티브 스트림이며 Flow는 리액티브(사양과 TCK*1에 대해 호환되는) 발행자 또는 그 반대로 변환될 수 있다. 이러한 변환기는 기본적으로 kotlinx.coroutines 패키지에 의해 제공되며, 다른 리액티브 모듈에 대한 변환기는 해당 리액티브 모듈에서 찾을 수 있다(리액티브 스트림을 위한 kotlinx-coroutines-reactive, Project Reactor을 위한 kotlinx-coroutines-reactor 과 RxJava2/RxJava3를 위한 kotlinx-coroutines-rx2/kotlinx-coroutines-rx3).
📖 아래 내용은 독자의 이해를 위해 번역자가 추가한 글입니다.
*1. TCK란 Technology Compatibility Kit 의 약자이다. Reactive Stream TCK는 Reactive Stream 구현을 위한 규칙을 정의하며, 구현자가 정의된 규칙에 따라 구현을 잘 했는지를 확인하기 위해 사용된다.
이 글은 Coroutines 공식 문서를 번역한 글입니다.
원문 : Asynchronous Flow - Flow and Reactive Streams
원문 최종 수정 : 2022년 9월 28일
목차로 돌아가기