여러 Flow 하나로 합치기
복수의 Flow를 합치는 다양한 방법이 있다.
Zip
Kotlin 표준 라이브러리 상의 Sequence.zip 확장 함수처럼, Flow는 두 개의 Flow의 값을 결합하는 zip 연산자를 가지고 있다.
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
📌 전체 코드는 이곳에서 확인할 수 있습니다.
이 예제는 다음을 출력한다.
1 -> one
2 -> two
3 -> three
Combine
Flow가 가장 최신의 값 혹은 연산을 표시할 때(conflation에 관한 관련된 섹션 참조), 해당 Flow의 가장 최신 값에 의존적인 연산의 수행을 필요로 하거나 업스트림이 새로운 값을 방출 할 때 다시 연산하도록 해야할 수 있다. 해당 연산을 수행하는 연산자의 집합을 combine이라 부른다.
예를 들어, 이전 예제에서 숫자들이 300ms 마다 업데이트 되지만 문자열이 400ms마다 업데이트 되는 경우, 그들을 zip 연산자를 사용해 zip연산을 수행하면 결과가 400ms마다 출력되기는 하지만 동일한 결과가 생성된다.
📖 이 예제에서는 onEach 중간 연산자를 사용해 각 요소들에 대해 지연을 주도록 하고, 샘플 Flow를 방출하는 코드를 선언적이고 짧게 만든다.
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
하지만, zip 대신에 combine을 사용해보면 :
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
nums 또는 strs Flow의 각 방출에 따라 줄이 인쇄되는 상당히 다른 결과를 얻을 수 있다
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
📖 아래 내용은 독자의 이해를 위해 번역자가 추가한 글입니다.
* zip은 두 개의 값을 압축시켜서 하나의 값으로 내보내는 반면, combine은 각 값이 방출 될때마다 합쳐서(combine) 하나의 값으로 내보낸다.
이 글은 Coroutines 공식 문서를 번역한 글입니다.
원문 : Asynchronous Flow - Composing multiple flows
원문 최종 수정 : 2022년 9월 28일
Flow를 Flatten*1하기
Flow는 비동기적으로 수신된 값의 시퀀스를 나타내고, 각 값이 다른 값들의 시퀀스에 대한 요청을 하기 매우 쉽다. 예를 들어, 두개의 문자열을 500ms 차이로 반환하는 다음의 함수가 있다고 해보자 :
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
만약 3개의 정수를 방출하는 flow가 있고, 다음과 같이 각각이 requestFlow를 호출한다고 해보자.
(1..3).asFlow().map { requestFlow(it) }
이는 추후 처리를 위해 단일 Flow로 Flatten해야 하는 flow의 flow(Flow<Flow<String>>)가 된다. Collection과 Sequence는 이런 상황을 위해 flatten과 flatMap 연산자가 있다. 하지만, Flow의 비동기 환경 때문에 Flow는 flattening을 위한 다른 방법이 필요하며, Flow에 대한 flattening 연산자의 집합이 존재한다.
flatMapConcat*2
flatMapConcat과 flattenConcat은 flow의 flow*3에 대한 연결을 제공한다. 이들은 해당 시퀀스 연산자들과 가장 직접적인 유사체*4이다. 이들은 다음 예제처럼 새로운 값을 수집하기 전에 안쪽의 Flow의 처리가 완료되기를 기다린다.
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
flatMapConcat의 순차적인 성질은 출력에서 명확하게 드러난다 :
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
flatMapMerge*5
다른 flattening 연산 방식은 수집되는 값을 모두 동시적으로 수집한 후, 수집된 값들을 하나의 Flow로 만들어 값이 최대한 빠르게 방출될 수 있도록 하는 것이다. 이는 flatMapMerge, flattenMerge 연산자에 의해 구현된다. 이 둘 모두 선택적으로 concurrency 파라미터를 받아 동시에 수집되는 Flows의 수를 제한할 수 있도록 한다(이 값은 기본값이 DEFAULT_CONCURRENCY 로 설정된다).
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
flatMapMerge의 동시적인 성질은 명확하게 드러난다 :
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
📖 flatMapMerge는 내부의 코드 블록(이 예제에서는 { requestFlow(it) })을 순차적으로 호출하지만 결과 flow를 동시적으로 수집한다. 이는 map { requestFlow(it) } 를 먼저 호출하고 flattenMerge를 순차적으로 호출하는 것과 같다.*6
flatMapLatest*7
Buffering - 최신 값 처리하기 섹션에서 설명한 collectLatest 연산자와 비슷하게, 새로운 flow의 Collection이 방출되면 이전 flow의 Collection의 처리가 취소되는 "최신(Latest)" flattening 방식이 있다. 이는 flatMapLatest 연산자에 의해 구현된다.
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
📌 전체 코드는 이곳에서 확인할 수 있습니다.
이 예제의 출력은 flatMapLatest가 어떻게 동작하는지에 대한 좋은 설명이 된다.
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
📖 새로운 값이 수집되었을 때, flatMapLatest는 블록(이 예제에서는 { requestFlow(it) }) 내부의 모든 코드를 취소한다. 이는 이 예제에서는 requestFlow가 일시중단되지 않고 취소되지 않도록 빠르게 호출되기 때문에 아무런 변화를 만들어내지 못한다. 하지만 requestFlow 내부에 delay와 같은 일시중단 함수가 있다면 달라진 결과가 보일 것이다.
📖 아래 내용은 독자의 이해를 위해 번역자가 추가한 글입니다.
*1. Flatten에 대한 적합한 단어를 찾지 못해 Flatten은 영어를 사용합니다.
*2. flatMapConcat에 대한 더욱 자세한 내용을 보고 싶다면 다음글을 참고하자.
*3. "flow의 flow"는 Flow<Flow<T>> 를 뜻한다.
*4. 문맥상 flatten과 flatMap 연산자와 flattenConcat, flatMapConcat이 가장 유사한 연산자라는 뜻으로 보입니다.
*5. flatMapMerge에 대해 더 자세히 알고 싶다면 다음글을 참조하자.
*6. map { requestFlow(it) } 이 호출되면 Flow<Flow<String>>이 생성된다. 이후 flattenMerge가 호출되면 내부 flow를 동시적으로 수집하게 되는데, 이는 flatMapMerge { requestFlow(it) } 을 하는 것과 같게 된다.
*7. flatMapLatest에 대한 더욱 자세한 내용을 보고 싶다면 다음글을 참고하자.
이 글은 Coroutines 공식 문서를 번역한 글입니다.
원문 : Asynchronous Flow - Flattening flows
원문 최종 수정 : 2022년 9월 28일
목차로 돌아가기
'공식 문서 번역 > Coroutines 공식 문서' 카테고리의 다른 글
Coroutines Flow 7편 - Flow 수집 완료 처리하기, Flow 명령적으로 다루기 vs 선언적으로 다루기 (0) | 2023.03.03 |
---|---|
Coroutines Flow 6편 - Flow 예외 처리, Flow의 예외 투명성 (0) | 2023.03.02 |
Coroutines Flow 4편 - Flow의 수집이 일어나는 Context, Flow Buffering - buffer, conflate, collectLatest (0) | 2023.02.28 |
Coroutines Flow 3편 - Flow 터미널 연산자, Flow는 순차적이다 (0) | 2023.02.27 |
Coroutines Flow 2편 - Flow 취소하기, Flow 빌더, Flow 중간 연산자 (0) | 2023.02.26 |