Flow의 수집이 일어나는 Context
Flow의 수집은 언제나 Coroutine을 호출하는 Context상에서 일어난다. 예를 들어 만약 simple이라 불리는 Flow가 있다면, 다음의 코드의 simple Flow는 구체적인 구현과 상관없이 코드 작성자가 지정한 Context상에서 실행된다 :
withContext(context) {
simple().collect { value ->
println(value) // run in the specified context
}
}
Flow의 이러한 성질은 컨텍스트 보존(context preservation)이라 불린다.
따라서 기본적으로 flow { ... } 빌더 내부의 코드는 해당 Flow의 collector가 제공하는 Context 상에서 실행된다. 예를 들어, simple 함수의 구현이 호출되는 스레드를 출력하고 3개의 숫자들을 방출한다고 해보자 :
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
이 코드를 실행하면 다음과 같은 출력이 나온다.
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
simple().collect가 메인 스레드에서 호출되므로, simple Flow의 body 또한 메인 스레드에서 호출된다. 이것은 실행 Context를 신경 쓰지 않고 호출자를 차단하지 않도록 하는 비동기 코드 혹은 빠르게 실행되는 코드에 대한 완벽한 기본값이다.
withContext를 사용할 때 일반적으로 겪을 수 있는 함정
하지만 오래 걸리는 CPU를 사용하는 코드는 Dispatchers.Default Context에서 실행되어야 할 수 있고, UI를 업데이트하는 코드는 Dispatchers.Main Context에서 실행되어야 할 수 있다.*1
일반적으로 withContext는 Kotlin Coroutines를 사용하는 코드의 Context 변경하는데 사용되지만, flow { ... } 빌더의 코드는 컨텍스트 보존 특성을 준수해야해서 다른 컨텍스트에서 방출하는 것은 허용되지 않는다.
다음 코드를 실행해보자 :
fun simple(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
이 코드는 다음의 Exception을 생성한다.
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
flowOn 연산자
이 Exception은 Flow에서 값 방출을 위한 Context를 변경하는데 사용할 수 있는 flowOn 함수를 가리킨다. Flow의 Context를 변경하는 올바른 방법은 아래 예제에 나와있다. 또한 이는 해당 스레드들의 이름을 인쇄하여 이것이 어떻게 작동하는지를 보여준다.*2
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
메인 스레드에서 수집이 일어날 때, flow { ... } 가 백그라운드 스레드에서 동작 방식하는 방식에 대해 주목하자 :
여기서 관찰해야 하는 또 다른 사항은 flowOn 연산자가 Flow의 기본적인 순차처리 특성을 변경했다는 점이다. 현재 수집은 하나의 Coroutine("coroutine#1")에서 발생하고, 수집 Coroutine과 동시에 다른 스레드에서 실행중인 Coroutine("coroutine#2")에서 방출이 일어난다. flowOn 연산자는 Context에서 CoroutineDispatcher을 변경해야 할 때 업스트림 Flow를 위한 다른 코루틴을 생성한다.
📖 아래 내용은 독자의 이해를 위해 번역자가 추가한 글입니다.
*1. UI가 있는 프레임웍에서는 스레드를 잘못 사용함으로써 일어날 수 있는 데드락 등을 방지하기 위해 UI를 Main Thread에서만 그릴 수 있도록 하는 Main Thread Model을 사용한다. 대표적으로는 안드로이드 프레임웍이 있다. 또한 안드로이드에서는 Computation을 위한 별도의 스레드풀을 관리하는 Dispatchers.Default를 지정해 놓아 이 Dispatcher에서 오래 걸리는 CPU 작업을 수행한다.
*2. 출력은 다음과 같다. DefaultDispatcher에서 값들이 발행되고 Main Dispatcher에서 Collect되는 것을 확인 할 수 있다.
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
이 글은 Coroutines 공식 문서를 번역한 글입니다.
원문 : Asynchronous Flow - Flow context
원문 최종 수정 : 2022년 9월 28일
Flow Buffering
다른 Coroutine 속의 Flow의 다른 부분들을 실행하는 것은, Flow를 수집하는데 걸리는 전체 시간의 관점에서 유용할 수 있다. 특히 오래 걸리는 비동기 작업이 포함된 경우에 유용하다. 예를 들어, simple Flow의 방출이 하나의 값을 방출하는데 100ms 이 걸릴 정도로 느리고 수집 또한 수집된 값을 처리하는데 300ms이 걸릴 정도로 느린 경우를 생각해보자. 세 개의 숫자를 방출하는 Flow에서 이러한 숫자들을 수집하는데 얼마나 많은 시간이 걸리는지 살펴보자.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
이는 전체 수집 작업이 1200ms(각각이 400ms 걸리는 세개의 숫자들) 정도 걸리고, 다음과 같은 결과를 생성한다.
1
2
3
Collected in 1220 ms
buffer 연산자*1를 Flow에 사용해, simple Flow의 방출 코드가 수집 코드와 순차적으로 실행되도록 하는 대신 동시에 실행되도록 할 수 있다.
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
📌 전체 코드는 이곳에서 확인할 수 있습니다.
첫 숫자를 위해 100ms만을 기다리고 다른 값들을 처리하는데 각각 300ms의 시간이 걸리는 효율적인 처리 파이프라인을 만들어, 같은 숫자들을 더 빠르게 생성한다. 이런 방식으로 실행하는데 1000ms 정도의 시간이 걸린다.
1
2
3
Collected in 1071 ms
📖 flowOn 연산자는 CoroutineDispatcher을 변경해야 할 때 동일한 buffering 메커니즘을 사용한다. 하지만 여기서는 실행 Context를 변경하지 않고 명시적으로 buffering을 요청한다.
Conflation
flow가 연산 혹은 연산의 상태 갱신에 대한 일부 결과를 나타내는 경우 각 값을 처리할 필요가 없이 최신값만을 처리하면 된다. 이러한 경우, 수집자가 너무 느리게 값들을 처리하는 경우 중간 발행 값들을 건너 뛰기 위해 conflate 연산자*2를 사용할 수 있다. 이전 예제 위에서 만들어보자 :
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
📌 전체 코드는 이곳에서 확인할 수 있습니다.
첫 숫자가 처리되는 동안 두번째, 세번째 숫자가 이미 생성되어 두번째 숫자가 합쳐져(conflated) 가장 최근에 발행된 세번째 숫자가 수집기에 전달된 것을 확인할 수 있다.
1
3
Collected in 758 ms
최신 값 처리하기
결합(Conflation)은 방출기와 수집기 양쪽이 모두 느린 경우에 처리를 빠르게 하기 위해 사용할 수 있는 방법이다. 결합은 방출된 값들을 삭제하여 처리를 빠르게 한다. 다른 방법은 느린 수집기의 실행을 취소하고 새로운 값이 발행될 때마다 다시 시작하는 것이다. 필수 로직인 xxx 연산자와 동일한 연산을 수행하지만, 새로운 값이 발행되면 이전 코드를 취소하는 xxxLatest 연산자 집합이 있다. 이전 예제에서 conflate를 collectLatest*3로 변경해보자 :
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
📌 전체 코드는 이곳에서 확인할 수 있습니다.
collectLatest의 body가 300ms의 시간이 걸리는 반면 새로운 값은 100ms 마다 발행되기 때문에, 블록이 모든 값들에 대해 실행되지만 마지막 값에 대해서만 완료되는 것을 확인할 수 있다 :
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
📖 아래 내용은 독자의 이해를 위해 번역자가 추가한 글입니다.
*1. buffer과 관련된 시각적인 자료는 아래 링크에서 확인할 수 있다.
*2. conflate와 관련된 시각적인 자료는 아래 링크에서 확인할 수 있다.
*3. collectLatest와 관련된 시각적인 자료는 아래 링크에서 확인할 수 있다.
이 글은 Coroutines 공식 문서를 번역한 글입니다.
원문 : Asynchronous Flow - Buffering
원문 최종 수정 : 2022년 9월 28일
목차로 돌아가기
'공식 문서 번역 > Coroutines 공식 문서' 카테고리의 다른 글
Coroutines Flow 6편 - Flow 예외 처리, Flow의 예외 투명성 (0) | 2023.03.02 |
---|---|
Coroutines Flow 5편 - 여러 Flow 하나로 합치기, Flow를 Flatten하기 - flatMapConcat, flatMapMerge, flatMapLatest (0) | 2023.03.01 |
Coroutines Flow 3편 - Flow 터미널 연산자, Flow는 순차적이다 (0) | 2023.02.27 |
Coroutines Flow 2편 - Flow 취소하기, Flow 빌더, Flow 중간 연산자 (0) | 2023.02.26 |
Coroutines Flow 1편 - 복수의 값들 표현하기, Flow는 차갑다 (0) | 2023.02.25 |