Developing Myself Everyday
article thumbnail

시작하기에 앞서 사실 Kotlin에서 어떤 게 직접 중간 연산자라고 말하지는 않았습니다. 다만 공부할 때 도움이 되기 위해서, Flow에서 어떠한 작업을 한 다음, Flow를 반환하는 확장 함수들을 중간 연산자로 분류해 보겠습니다.

 

다만, 중간 연산자의 종류가 워낙 많기에 그 역할에 따라 이해하기 위해 제 나름대로 분류를 해봤습니다. 꼭 이렇게 분류를 해야한다는 것은 아니니까 각자 이해하기 쉽게 분류하는 것이 좋을 수 있습니다. 분류는 다음과 같습니다.

  • 변형 연산자
  • 제한 연산자
  • 지연 연산자
  • 병합 연산자
  • Side-Effect 연산자
  • 에러 처리 및 복구 연산자
  • Context를 전환하는 연산자

 

 

 

 

1. 변형 연산자


1.1. transform

먼저 값을 변형하는 중간 연산자를 이해하는데 기본이 되는 `transform` 연산자를 보겠습니다. 

<kotlin />
public inline fun <T, R> Flow<T>.transform( @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R> = flow { collect { value -> return@collect transform(value) } }

 

`transform` 연산자의 가장 큰 목적은 T 타입을 R 타입으로 변환하는 것입니다. 이를 위해 아래와 같이 동작합니다.

  1. flow 빌더를 사용하여 새로운 Flow를 생성합니다. 
  2. flow 블록 내에서 collect를 호출하여 원본 `Flow <T>`를 수집합니다.
  3. transform 함수는 FlowCollector <R>를 확장하는 함수로, FlowCollector <R>의 emit 메서드를 통해 변환된 값을 방출합니다.

 

`transform` 연산자는 transform 블록이 `FlowCollector`의 확장 함수로 정의되어 있기 때문에, 내부에서 emit 메서드를 직접 호출할 수 있다는 점이 중요합니다.

 

이 특징은 아래의 예시에서 어떠한 차이가 있는지 알 수 있습니다. 

<kotlin />
fun main() = runBlocking { val flow = flow { emit(1) emit(2) } val transformedFlow = flow.transform { value -> emit("Transformed: $value") emit(value) } transformedFlow.collect { println(it) } } // 결과 // Transformed: 1 // 1 // Transformed: 2 // 2

위의 예시는 단순히 타입을 변환하는 것을 넘어서 2번뿐이었던 데이터의 emit을 4번으로 늘렸습니다. 

 

 

1.2. transformLatest

위에서 말한 `transformLastest` 연산자는 Flow의 각 요소를 변환하는 동안, 새로운 값이 emit 되면 기존 변환 작업을 취소하고 새로운 값으로 다시 변환을 시작하는 연산자입니다.

 

즉, 이전 변환이 끝나기도 전에 새로운 값이 들어오면 기존 변환을 중단하고 새로운 값에 대한 변환을 실행합니다.

<kotlin />
public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> = ChannelFlowTransformLatest(transform, this)

 

예시와 동작을 보면 쉽게 이해가 됩니다.

<kotlin />
fun main() = runBlocking { flow { emit("a") delay(100) emit("b") }.transformLatest { value -> emit(value) delay(200) emit(value + "_last") }.collect(::println) } // 결과 // a // b // b_last

 

위의 코드의 실행 흐름은 다음과 같습니다.

  1. "a"가 emit 됨 → transformLatest에서 emit("a") 실행
  2. transformLatest의 delay(200) 수행 중에 "b"가 emit 됨
     "a_last"가 emit 되기 전에 "a"의 transform이 취소됨
  3. "b"가 emit 됨 → emit("b") 실행
  4. delay(200)이 끝난 후 "b_last"가 emit 됨

 

 

1.3. map, mapLatest

다음은 가장 많이 사용하는 `map` 연산자입니다. 사실 `transform` 연산자에 대해 이해했다면 `map` 연산자에 대해 이해하는 것은 매우 단순합니다. 

<code />
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value -> return@transform emit(transform(value)) }

 

`map` 연산자는`transform` 메서드를 활용하고 있기 때문입니다. `transform` 메서드가 더 유연한 변환 작업을 지원하는 반면, `map`은 이러한 변환을 단일 값으로 간단하게 처리합니다.

 

 즉, `map`은 `transform`을 사용하여 T 타입을 R로 바꾸는 과정을 단일 값으로 변환하며, emit 메서드를 직접 사용하지 않고 결과를 반환합니다.

 

`mapLastest` 연산자는 `transformLastest` 연산자를 단일 값으로 처리하며, 이전값을 처리하던 와중에 새로운 값이 방출되면 이전 작업을 취소하고 최신 작업만 진행합니다.

<kotlin />
public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> = transformLatest { emit(transform(it)) }

 

 

1.4. scan, runningFold

`scan`과 `runningFold`는 이름만 다를 뿐 동일한 역할을 하는 연산자입니다. 이 연산자는 초기값을 설정하고 내부의 accumulator를 통해 flow의 각 값을 처리할 때마다 누적된 결과를 방출합니다. 

<kotlin />
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = runningFold(initial, operation) public fun <T, R> Flow<T>.runningFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow { var accumulator: R = initial emit(accumulator) collect { value -> accumulator = operation(accumulator, value) emit(accumulator) } }

 

아래의 예시에서 작동방식을 확인할 수 있습니다.

<kotlin />
fun main() = runBlocking { val flow = flow { emit(1) emit(2) } val scanFlow = flow.scan(0) { acc, value -> acc + value } scanFlow.collect { println(it) } } // 결과 // 0 // 1 // 3

 

`runningFold`란 이름을 통해 알 수 있듯이 이 연산자는 종단 연산자인 fold의 중간 연산자 버전이라고 보면 됩니다.

 

 

1.5. runningReduce

`runningReduce`는 종단 연산자인 reduce의 중간 연산자 버전입니다. `runningFold`와 거의 유사하지만 초기값을 지정하지 않고 첫 번째 값이 자동으로 초기값으로 사용됩니다.

<kotlin />
public fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow { var accumulator: Any? = NULL collect { value -> accumulator = if (accumulator === NULL) { value } else { operation(accumulator as T, value) } emit(accumulator as T) } }

 

 

1.6. chunked

`chunked`는 Flow <T>를 일정 크기의 청크로 나누어 새로운 Flow <List <T>>로 반환하는 연산자입니다. 

<kotlin />
@ExperimentalCoroutinesApi public fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> { require(size >= 1) { "Expected positive chunk size, but got $size" } return flow { var result: ArrayList<T>? = null // Do not preallocate anything collect { value -> // Allocate if needed val acc = result ?: ArrayList<T>(size).also { result = it } acc.add(value) if (acc.size == size) { emit(acc) // Cleanup, but don't allocate -- it might've been the case this is the last element result = null } } result?.let { emit(it) } } }

 

아래와 같이 작동합니다.

<code />
fun main() = runBlocking { flowOf("a", "b", "c", "d", "e") .chunked(2) // ["a", "b"], ["c", "d"], ["e"] .map { it. joinToString(separator = "") } .collect { println(it) // Prints "ab", "cd", e" } }

 

 

 

 

2. 지연 연산자


2.1. debounce

`debounce`는 연속적으로 발생하는 이벤트 중에서 가장 마지막 이벤트만 방출하도록 지연(debounce)하는 연산자입니다. 주로 빠르게 발생하는 이벤트를 조절하는데 유용합니다.

<code />
@FlowPreview public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> { require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" } if (timeoutMillis == 0L) return this return debounceInternal { timeoutMillis } }

 

쉽게 설명하자면 `debounce`는 최초 이벤트가 방출된 이후로 아무런 이벤트가 발생하지 않는 시간이 timeoutMillis 만큼 지나서야 가장 마지막 이벤트를 방출합니다.

 

아래의 예시를 보면 이해가 잘 됩니다. 아래의 예시에서는 timeoutMillis를 100으로 설정하고 delay를 90으로 주고 있습니다. 100이 되기 전에 반복해서 emit이 이뤄지므로 가장 마지막으로 emit한 5만 출력되는 것을 확인할 수 있습니다.

<kotlin />
fun main() = runBlocking { flow { emit(1) delay(90) emit(2) delay(90) emit(3) delay(90) emit(4) }.debounce( 100 ).collect { value -> print("$value ") // 4 } }

 

 

2.2. sample

`sample`는 주어진 간격 동안 가장 최신의 값을 방출하는 연산자입니다. 여러 이벤트가 연속적으로 발생하는 경우, 클릭마다 바로 처리하는 것이 아니라, 일정 시간마다 가장 최신 이벤트만 반영합니다.

 

예를 들어 버튼 클릭이 100ms 마다 발생하고, sample(200)을 설정한다면 200ms마다 한 번 버튼 클릭을 처리합니다.

<kotlin />
@FlowPreview public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> { require(periodMillis > 0) { "Sample period should be positive" } return scopedFlow { downstream -> val values = produce(capacity = Channel.CONFLATED) { collect { value -> send(value ?: NULL) } } var lastValue: Any? = null val ticker = fixedPeriodTicker(periodMillis) while (lastValue !== DONE) { select<Unit> { values.onReceiveCatching { result -> result .onSuccess { lastValue = it } .onFailure { it?.let { throw it } ticker.cancel(ChildCancelledException()) lastValue = DONE } } // todo: shall be start sampling only when an element arrives or sample aways as here? ticker.onReceive { val value = lastValue ?: return@onReceive lastValue = null // Consume the value downstream.emit(NULL.unbox(value)) } } } } }

 

예시를 보면서 이해해 보겠습니다. sample은 100ms 마다 값을 방출하기에 아래의 예시에서는 100ms, 200ms, 300ms와 같은 주기로 이벤트를 방출합니다. 

<kotlin />
fun main() = runBlocking { flow { emit(1) // 0 delay(90) emit(2) // 90 delay(90) emit(3) // 180 delay(90) emit(4) // 270 }.sample( 100 ).collect { value -> print("$value ") // 2 3 } }

 

첫번째 emit(1)은 0ms, 두번쨰 emit(2)은 90ms에 실행되었기에 100ms가 되었을 시점에서는 2만 방출됩니다.

마지막 emit(4)가 실행되는 시간은 270ms입니다. sample은 200ms때 emit을 수행한 다음 300ms에 emit을 해야하는데 이 경우에는 270ms에서 flow가 종료되므로, 300ms 때 emit은 이뤄지지 않습니다. 

 

아래와 같이 마지막에 delay(30)을 추가하여 300ms 때 emit이 이뤄지게 한다면 4까지 방출되는 것을 확인할 수 있습니다.

<kotlin />
fun main() = runBlocking { flow { emit(1) // 0 delay(90) emit(2) // 90 delay(90) emit(3) // 180 delay(90) emit(4) // 270 delay(30) // 딜레이 30 추가 }.sample( 100 ).collect { value -> print("$value ") // 2 3 4 } }

 

 

2.3. timeout

`timeout`은 주어진 시간안에 작업이 완료되지 않으면 예외를 발생시키는 함수입니다.

<code />
@FlowPreview public fun <T> Flow<T>.timeout( timeout: Duration ): Flow<T> = timeoutInternal(timeout)

 

emit이 이뤄지고 그 다음 emit이 이뤄지거나 flow가 종료될 때까지 `timeout`보다 더 많은 시간이 걸리면 예외가 발생합니다. 아래의 예시는 이를 보여줍니다.

<kotlin />
fun main() = runBlocking { flow { emit(1) delay(110) emit(2) }.timeout( 100.milliseconds ).collect { value -> println(value) } } // 1 // Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 100m

 

hot flow의 경우에는 모든 emit이 제대로 이뤄졌다고 가정하더라도, 마지막 emit 이후에 flow가 종료되지 않으니 무조건적으로 예외가 발생하게 됩니다. 그렇기 때문에 hot flow에서 `timeout`를 사용할 때는 많은 주의가 필요합니다

<code />
fun main() = runBlocking { flow { emit(1) delay(90) emit(2) }.stateIn( scope = this, started = SharingStarted.WhileSubscribed(500), initialValue = 0 ).timeout( 100.milliseconds ).collect { value -> println("$value ") } } // 0 // 1 // 2 // Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 100m

 

개인적인 생각으로는 `timeout`은 테스트 외에는 사용하기 쉽지 않을 것 같습니다.

 

 

 

 

3. 제한 연산자


3.1. filter

`filter` 또한 `transform` 메서드를 활용하여 조건에 해당될 때에만 결과를 반환하도록 합니다.

<code />
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value -> if (predicate(value)) return@transform emit(value) }

 

아래와 같이 사용할 수 있습니다.

<kotlin />
fun main() = runBlocking { flowOf(1, 2, 3, 4, 5, 6) .filter { it % 2 == 0 } .collect { print("$it ") } // 2 4 6 }

 

이 외에도 조건에 실패하는 경우만 결과를 반환하는 `filterNot`, 특정 타입의 값만 필터링하는 `filterIsInstance`, null 이 아닌 타입만 반환하는 `filterNotNull` 메서드가 존재합니다.

 

 

3.2. distinctUntilChanged

`distinctUntilChanged`는 연속된 동일 값을 걸리내어 중복 값을 방출하지 않도록하는 연산자입니다. 아래의 코드에서도 알 수 있듯이 StateFlow의 경우에는 항상 동일 값을 방출하지 않으므로 그냥 this를 반환하는 것을 알 수 있습니다.

<kotlin />
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T> = when (this) { is StateFlow<*> -> this // state flows are always distinct else -> distinctUntilChangedBy(keySelector = defaultKeySelector, areEquivalent = defaultAreEquivalent) }

 

아래의 예시에서 이전에 emit 값과 중복된 값은 emit되지 않는것을 알 수 있습니다.

<code />
fun main() = runBlocking { flow { emit(1) emit(1) }.distinctUntilChanged() .collect { value -> print("$value ") // 1 } }

 

이전에 emit된 값이라도 바로 이전의 값과 동일하지 않으면 emit될 수 있습니다.

<kotlin />
fun main() = runBlocking { flow { emit(1) emit(2) emit(1) }.distinctUntilChanged() .collect { value -> print("$value ") // 1 2 1 } }

 

 

3.3. drop, dropWhile

`drop`은 내부에 `skipped`라는 카운터를 두고, 매개변수만큼 emit을 무시하고 그 이후부터 emit이 이뤄질 수 있게 합니다.

<kotlin />
public fun <T> Flow<T>.drop(count: Int): Flow<T> { require(count >= 0) { "Drop count should be non-negative, but had $count" } return flow { var skipped = 0 collect { value -> if (skipped >= count) emit(value) else ++skipped } } }

 

`dropWhile`은 주어진 조건(predicate)을 만족하는 동안 emit을 건너뛰고, 처음으로 조건을 만족하지 않으면 그 이후부터 emit이 이뤄질 수 있게 합니다.

<code />
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow { var matched = false collect { value -> if (matched) { emit(value) } else if (!predicate(value)) { matched = true emit(value) } } }

 

 

3.4. take, takeWhile

`take`는 `drop`와 반대로 작동한다고 생각하면 됩니다. `drop`은 count 만큼 emit을 건너뛰었다면, `take`는 count 만큼만 emit을 진행합니다.

<code />
public fun <T> Flow<T>.take(count: Int): Flow<T> { require(count > 0) { "Requested element count $count should be positive" } return flow { val ownershipMarker = Any() var consumed = 0 try { collect { value -> // Note: this for take is not written via collectWhile on purpose. // It checks condition first and then makes a tail-call to either emit or emitAbort. // This way normal execution does not require a state machine, only a termination (emitAbort). // See "TakeBenchmark" for comparision of different approaches. if (++consumed < count) { return@collect emit(value) } else { return@collect emitAbort(value, ownershipMarker) } } } catch (e: AbortFlowException) { e.checkOwnership(owner = ownershipMarker) } } }

 

`takeWhile`도 마찬가지로 `dropWhile`와 반대로 작동합니다. `takeWhile`은 조건이 만족할 동안 emit을 진행하고, 조건을 만족하지 못하면 flow를 그냥 종료합니다.

<code />
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow { // This return is needed to work around a bug in JS BE: KT-39227 return@flow collectWhile { value -> if (predicate(value)) { emit(value) true } else { false } } }

 

 

3.5. transformWhile

`transformWhile` 확장함수는 Flow의 각 요소에 대해 변환을 수행하면서, 변환 함수가 true를 반환하는 동안만 값을 방출하고 false를 반환하면 Flow를 종료하는 연산자입니다.

 

`transform`과 `takeWhile`의 기능을 한 번에 수행한다고 생각하면 됩니다.

<code />
public fun <T, R> Flow<T>.transformWhile( @BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Boolean ): Flow<R> = safeFlow { // Note: safe flow is used here, because collector is exposed to transform on each operation // This return is needed to work around a bug in JS BE: KT-39227 return@safeFlow collectWhile { value -> transform(value) } } // Internal building block for non-tailcalling flow-truncating operators internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) { val collector = object : FlowCollector<T> { override suspend fun emit(value: T) { // Note: we are checking predicate first, then throw. If the predicate does suspend (calls emit, for example) // the resulting code is never tail-suspending and produces a state-machine if (!predicate(value)) { throw AbortFlowException(this) } } } try { collect(collector) } catch (e: AbortFlowException) { e.checkOwnership(collector) } }

 

사용 방식이 조금 독특한데, `transform` 블록 안에서 emit을 수행함과 동시에, Boolean을 반환해줘야 합니다.

<code />
fun main() = runBlocking { val flow = flowOf(1, 2, 3, 4, 5) val transformedFlow = flow.transformWhile { value -> emit(value * 10) // 값 변환 후 방출 value < 3 // 3보다 작은 동안만 계속 } transformedFlow.collect { println(it) } }

 

개인적인 생각으로는 `transformWhile`에 대해 모든 구성원들이 이해하지 못한다면, 해당 방식을 사용하는 것은 좋지 않을 것 같습니다. `map`과 `takeWhile`를 함께 쓰는 것이 가독성 측면에서 훨씬 더 좋다고 생각합니다.

 

 

 

 

4. 병합 연산자


4.1. merge

`merge`는 여러 개의 Flow를 하나의 Flow로 합치지만, 요소의 순서를 보장하지 않습니다. 즉, 모든 Flow가 동시에 수집(collect)되며, 각 요소가 준비되는 대로 즉시 방출(emit)됩니다. 

<kotlin />
public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge() public fun <T> Iterable<Flow<T>>.merge(): Flow<T> { return ChannelLimitedFlowMerge(this) }

 

아래와 같이 사용할 수 있습니다.

<kotlin />
fun main() = runBlocking { val innerFlow1: Flow<Int> = flow { emit(1) delay(100) emit(2) } val innerFlow2: Flow<Int> = flow { emit(3) delay(100) emit(4) } val listOfFlows: List<Flow<Int>> = listOf(innerFlow1, innerFlow2) val mergeFlow: Flow<Int> = listOfFlows .merge() // val mergeFlow: Flow<Int> = merge(innerFlow1, innerFlow2) mergeFlow.collect { println(it) } } // 결과 // 1 // 3 // 2 // 4

 

 

4.2. flattenConcat

`flattenConcat`은 flatten(평평하게 하다)이라는 이름에 걸맞게 중첩된 Flow를 하나의 Flow로 평탄화하는 연산자입니다. 각 Flow를 순차적으로(concat) 실행하며, 이전 Flow가 종료된 후 다음 Flow를 처리합니다. 

<code />
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow { collect { value -> emitAll(value) } }

 

아래의 예시에서 중첩된 Flow를 평탄화하고 있습니다.

<kotlin />
fun main() = runBlocking { val innerFlow1: Flow<Int> = flow { emit(1) delay(100) emit(2) } val innerFlow2: Flow<Int> = flow { emit(3) delay(100) emit(4) } val flowOfFlows: Flow<Flow<Int>> = flow { emit(innerFlow1) emit(innerFlow2) } val flattenFlow: Flow<Int> = flowOfFlows .flattenConcat() flattenFlow.collect { println(it) } } // 결과 // 1 // 2 // 3 // 4

 

 

4.3. flattenMerge

`flattenMerge`는 중첩된 Flow를 하나의 Flow로 병합(Merge)하는 연산자입니다. 순차적이었던 `flattenConcat`와 달리 여러 개의 Flow를 병렬적으로 수집할 수 있으며, concurrency라는 매개변수를 통해 수집 개수를 조절할 수 있습니다.

<code />
@ExperimentalCoroutinesApi public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> { require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" } return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency) }

 

위의 코드에서도 알 수 있듯 concurrency가 1일 경우에는`flattenConcat`와 동일한 동작을 하기에 그냥  `flattenConcat`를 실행한다는 것을 확인할 수 있습니다.

 

`flattenConcat`은 delay가 있던 없던 순차적으로 출력을 했지만, `flattenMerge`는 병렬적으로 출력을 하고 있다는 것을 아래의 예시에서 확인할 수 있습니다.

<kotlin />
fun main() = runBlocking { val innerFlow1: Flow<Int> = flow { emit(1) delay(100) emit(2) } val innerFlow2: Flow<Int> = flow { emit(3) delay(100) emit(4) } val flowOfFlows: Flow<Flow<Int>> = flow { emit(innerFlow1) emit(innerFlow2) } val flattenFlow: Flow<Int> = flowOfFlows .flattenMerge() flattenFlow.collect { println(it) } } // 결과 // 1 // 3 // 2 // 4

 

 

3개의 Flow를 병합할 때 concurrency를 2로 제한하면, 2개의 Flow가 병렬로 실행되고 전체적으로는 순차적으로 진행됩니다.

<kotlin />
fun main() = runBlocking { val flowOfFlows = flow { emit( flow { emit("A1") delay(100) emit("A2") } ) emit( flow { emit("B1") delay(100) emit("B2") } ) emit( flow { emit("C1") delay(100) emit("C2") } ) } flowOfFlows.flattenMerge(concurrency = 2).collect { println(it) } } // 결과 // A1 // B1 // A2 // B2 // C1 <- A,B가 다 완료된 후에 // C2

 

이는 엘리베이터의 탑승을 최대 2명으로 제한한 것과 같습니다. 2명씩 동시에 이동하지만 전체적으로는 순차적으로 진행됩니다.

 

 

4.4. flatMapConcat

`flatMapConcat`는 각 요소를 변환하여 새로운 Flow를 생성한 후, 변환된 Flow들을 순차적으로 이어 붙이는 연산자입니다. 내부적으로는 `map`을 수행하고 그다음 `flattenConcat`을 하게 됩니다.

<kotlin />
@ExperimentalCoroutinesApi public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> = map(transform).flattenConcat()

 

기존에 중첩된 Flow에서 `flattenConcat`을 직접 사용하던 방식과 달리, flatMapConcat Flow <A>의 내부 블록에서 Flow <B>를 반환하여 중첩 구조를 만든 후 이를 순차적으로(flattenConcat) 실행한다는 특징이 있습니다.

 

즉 Flow에서 map을 사용해 중첩 구조를 만들고 이를 순차적으로 실행합니다.

 

아래의 예시에서 어떤 식으로 사용되는지 알 수 있습니다.

<code />
fun main() = runBlocking { val flow = flowOf(1, 2) val flatMapConcatFlow = flow.flatMapConcat { value -> flow { emit("$value: A") delay(100) emit("$value: B") } } flatMapConcatFlow.collect { println(it) } } // 결과 // 1: A // 1: B // 2: A // 2: B

 

 

4.5. flatMapLatest

`flatMapLatest`는 Flow <T>에서 각 요소를 Flow <R>로 변환한 후, 가장 최신의 변환된 Flow만 활성 상태로 유지하는 연산자입니다. 위에서 봤던 `transformLastest` 연산자를 활용합니다.

<kotlin />
@ExperimentalCoroutinesApi public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> = transformLatest { emitAll(transform(it)) }

 

`flatMapLatest`는 `flatMapConcat`와 동일하게 순차적으로 실행합니다. `flatMapConcat`는 이전에 시작한 Flow가 완료될 때까지 대기한 후, 그다음 Flow를 실행하지만 `flatMapLatest`는 이전의 작업을 취소하고 최신 값만 반영한다는 차이가 있습니다.

 

 

4.6. flatMapMerge

`flatMapMerge`도 크게 다르지 않습니다. 중첩 구조를 만든 후 이를 병렬적으로(flattenMerge) 실행한다는 특징이 있습니다.

<code />
@ExperimentalCoroutinesApi public fun <T, R> Flow<T>.flatMapMerge( concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R> ): Flow<R> = map(transform).flattenMerge(concurrency)

 

 

4.7. combine

`combine` 연산자는 여러 개의 Flow를 결합하여 변환하는 연산자입니다. 여러 개의 Flow가 반환하는 값을 받아서 변환하는 방식으로 동작합니다.

<code />
public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow.combine(flow2, transform)

 

여기까지 들었을 때 위에서 말했던 `merge` 연산자와 똑같다고 생각하실 수 있습니다. 다만 이 두 연산자의 동작 방식에는 중요한 차이가 있습니다.

 

`combine` 연산자의 중요한 점은 각 Flow가 방출할 때마다 모든 Flow에서 값을 결합해 새로운 값을 생성한다는 점입니다. 즉, 각 Flow에서 발생하는 값들을 서로 결합하는 방식으로 처리됩니다.

<kotlin />
fun main() = runBlocking { val flow1 = flowOf(1, 2, 3) val flow2 = flowOf("A", "B", "C") combine(flow1, flow2) { a, b -> "$a$b" }.collect { value -> println(value) } }

 

반면에 `merge` 연산자는 Flow의 값을 순서 없이 병합하는 연산자입니다. 이 연산자는 오직 값을 동시에 처리하는데만 중점을 두고 있습니다. 특정 작업을 하거나 새로운 값을 생성할 수는 없습니다.

<code />
fun main() = runBlocking { val flow1 = flowOf(1, 2, 3) val flow2 = flowOf("A", "B", "C") merge(flow1, flow2).collect { value -> println(value) } } // 결과 // 1 // 2 // 3 // A // B // C

 

 

4.8. combineTransform

`combineTransform` 연산자는 `combine`과 비슷하지만, 값 결합의 과정에서 좀 더 유연하게 변환할 수 있도록 허용하는 연산자입니다. 위에서 계속 사용했던 `transform` 연산자의 특성과 유사합니다.

<code />
public fun <T1, T2, R> combineTransform( flow: Flow<T1>, flow2: Flow<T2>, @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit ): Flow<R> = combineTransformUnsafe(flow, flow2) { args: Array<*> -> transform( args[0] as T1, args[1] as T2 ) }

 

아래의 예시에서 알 수 있듯 Flow 2개를 결합하고, origin과 reverse로 나눠서 자유롭게 emit 할 수 있습니다.

<code />
fun main() = runBlocking { val flow1 = flowOf(1, 2) val flow2 = flowOf("a", "b") val result = flow1.combineTransform(flow2) { num, str -> emit("origin: $num$str") emit("reverse: $str$num") } result.collect { value -> println(value) } } // 결과 // origin: 1a // reverse: a1 // origin: 2b // reverse: b2

 

 

4.9. zip

`zip` 연산자는 현재 Flow(this)의 값과 매개변수로 들어온 Flow(other)의 값을 순서대로 조합하여 주어진 transform 함수를 적용한 결과를 방출하는 연산자입니다.

<code />
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)

 

`zip` 연산자의 가장 큰 특징은 바로 1:1 이라는 것입니다. 두 Flow의 개수가 달라 하나의 Flow가 먼저 완료되었다면, 다른 Flow도 함께 취소됩니다.

 

아래와 같이 "d"는 출력되지 않습니다.

<code />
fun main() = runBlocking { val flow = flowOf(1, 2, 3) val flow2 = flowOf("a", "b", "c", "d") flow.zip(flow2) { i, s -> i.toString() + s }.collect { println(it) // "1a", "2b", "3c"가 출력됨 } }

 

이러한 특성은 버퍼링을 사용하는 `combine` 연산자와 매우 큰 차이를 나타냅니다. 

 

 

 

 

5. Side-Effect 연산자


5.1. onStart

`onStart` 연산자는 Flow가 collect 되기 전에 주어진 action을 실행하도록 하는 연산자입니다. 이 연산자는 Flow가 시작되기 전에 일부 작업을 수행하거나 초기화하는데 유용합니다. 또한, action은 FlowCllector의 확장함수로 되어 있어 내부에서 emit을 수행할 수 있습니다.

<code />
public fun <T> Flow<T>.onStart( action: suspend FlowCollector<T>.() -> Unit ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action val safeCollector = SafeCollector<T>(this, currentCoroutineContext()) try { safeCollector.action() } finally { safeCollector.releaseIntercepted() } collect(this) // directly delegate }

 

아래의 예시에서는 flow를 collect하기 전에 먼저 "start"를 emit 시키고 있습니다.

<code />
fun main() = runBlocking { flowOf("a", "b", "c") .onStart { emit("Start") } .collect { println(it) } } // 결과 // Start // a // b // c

 

 

다만, `onStart`는 collect 시작 전에 작업을 하기 때문에, SharedFlow처럼 이미 방출된 데이터가 있을 경우, 그 데이터는 `onStart`에서 설정한 액션 전에 emit되기 때문에 무조건적으로 먼저 시작된다는 것은 보장할 수는 없습니다.

<code />
fun main() = runBlocking { val sharedFlow = MutableSharedFlow<String>() sharedFlow.emit("첫 번째 데이터") sharedFlow .onStart { println("onStart: 수집 전 작업 수행") emit("onStart에서 방출한 데이터") } .collect { value -> println("수집한 값: $value") } } // 결과 // onStart: 수집 전 작업 수행 // 수집한 값: onStart에서 방출한 데이터

 

위의 예시에서 알 수 있듯이 먼저 방출한 "첫 번째 데이터"는 이미 emit된 다음 `onStart`의 액션이 실행됩니다.

 

 

5.2. onCompletion

`onCompletion` 연산자는 Flow의 collect가 완료되었을 때나 취소되었을 때 호출되는 작업을 수행하는 연산자입니다. 이 연산자는 finally 블록처럼, Flow가 완료된 후 실행되는 추가적인 작업을 정의하는 데 사용됩니다.

<code />
public fun <T> Flow<T>.onCompletion( action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action try { collect(this) } catch (e: Throwable) { /* * Use throwing collector to prevent any emissions from the * completion sequence when downstream has failed, otherwise it may * lead to a non-sequential behaviour impossible with `finally` */ ThrowingCollector(e).invokeSafely(action, e) throw e } // Normal completion val sc = SafeCollector(this, currentCoroutineContext()) try { sc.action(null) } finally { sc.releaseIntercepted() } }

 

아래와 같이 finally를 사용할 때와 동일하게 사용할 수 있습니다.

<kotlin />
fun main() { runBlocking { val myFlow = flowOf("a", "b", "c") try { myFlow.collect { value -> println(value) } } finally { println("Done") } } } fun main() { runBlocking { flowOf("a", "b", "c") .onCompletion { emit("Done") }.collect { println(it) // prints a, b, c, Done } } }

 

다만 주의해야할 점이 있습니다. 바로 Cold Flow가 아닌 Hot Flow인 StateFlow나 SharedFlow를 사용할 때입니다.

이 때는 collect가 완료되지 않으므로 `onCompletion` 연산자는 실행되지가 않습니다. 

 

아래의 예시에서  `onCompletion` 연산자는 실행되지 않았습니다.

<code />
fun main() { runBlocking { val sharedFlow = MutableSharedFlow<String>(replay = 1) sharedFlow.emit("첫 번째 데이터") sharedFlow .onCompletion { emit("Done") } .collect { value -> println("수집한 값: $value") } } } // 결과 // 수집한 값: 첫 번째 데이터

 

 

5.3. onEmpty

`onEmpty` 연산자는 Flow가 빈 상태로 완료되었을 때, 즉 요소가 하나도 방출되지 않았을 때 주어진 작업을 실행하는 연산자입니다.

<code />
public fun <T> Flow<T>.onEmpty( action: suspend FlowCollector<T>.() -> Unit ): Flow<T> = unsafeFlow { var isEmpty = true collect { isEmpty = false emit(it) } if (isEmpty) { val collector = SafeCollector(this, currentCoroutineContext()) try { collector.action() } finally { collector.releaseIntercepted() } } }

 

아래의 예시에서는 Flow가 비었을 경우에 0을 emit합니다.

<kotlin />
fun main() { runBlocking { emptyFlow<Int>().onEmpty { emit(0) }.collect { println(it) // prints 0 } } }

 

 

5.4. `onEach` 

`onEach` 또한 `transform` 메서드를 활용합니다. 다만, 이 메서드는 주어진 `action`을 각 값에 대해 실행하며, 값을 변환하지 않고 그대로 반환합니다. 

<code />
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value -> action(value) return@transform emit(value) }

 

아래의 예시에서 확인할 수 있습니다.

<code />
fun main() = runBlocking { flowOf("a", "b", "c") .onEach { println("onEach: $it") } .collect { println(it) } } // 결과 // onEach: a // a // onEach: b // b // onEach: c // c

 

 

 

 

6. 에러 처리 및 복구 연산자


6.1. catch

`catch` 연산자는 Flow에서 발생한 예외를 포착하고, 지정된 action을 호출하여 발생한 예외를 처리하는 연산자입니다.

 

개념적으로 업스트림(이전 연산자들)의 코드에서 발생한 예외를 포착하고, 다운스트림(이후 연산자들)에 발생한 예외에는 영향을 미치지 않습니다.

<code />
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> = flow { val exception = catchImpl(this) if (exception != null) action(exception) }

 

예를 들면 아래와 같이 처리할 수 있습니다.

<code />
fun main() = runBlocking { flow { emit(1) emit(2) throw RuntimeException("예외 발생!") }.catch { e -> emit(-1) }.collect { println(it) } } // 결과 // 1 // 2 // -1

 

 

6.2. retry, retryWhen

`retry` 연산자는 업스트림에서 발생한 예외가 특정 조건과 일치할 경우, 최대 retries 횟수만큼 collect을 재시도합니다. 

<code />
public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, predicate: suspend (cause: Throwable) -> Boolean = { true } ): Flow<T> { require(retries > 0) { "Expected positive amount of retries, but had $retries" } return retryWhen { cause, attempt -> attempt < retries && predicate(cause) } }

 

이 연산자는 내부에서 또다른 `retryWhen` 연산자를 호출하는데, `retryWhen` 연산자는 업스트림에서 예외가 발생했을 때, true를 반환하면 flow collect를 다시 시도하는 연산자입니다. 

<kotlin />
public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> = flow { var attempt = 0L var shallRetry: Boolean do { shallRetry = false val cause = catchImpl(this) if (cause != null) { if (predicate(cause, attempt)) { shallRetry = true attempt++ } else { throw cause } } } while (shallRetry) }

 

이 연산자에는 시도 횟수를 저장하는 attempt 변수가 있으며 해당 변수를 가지고 다양한 작업을 할 수 있습니다.

 

단순한 재시도 로직을 구현할꺼라면 `retry` 연산자를 사용하면 됩니다. 아래의 예시는 2번 재시도 한 다음, 에러가 발생합니다.

<kotlin />
fun main() = runBlocking { flow { emit("Start") delay(100) throw IOException("Network error") }.retry(2) { e -> if (e is IOException) { println("Retrying due to IOException...") true // 재시도 } else { false // 다른 예외는 재시도 안 함 } }.collect { value -> println(value) } } // 결과 // Start // Retrying due to IOException... // Start // Retrying due to IOException... // Start // Exception in thread "main" java.io.IOException

 

 

6.3. cancellable

`cancellable` 연산자는 각 emit마다 취소 상태를 확인하고, 취소원인에 맞는 예외를 발생시키는 Flow를 반환합니다.

<code />
public fun <T> Flow<T>.cancellable(): Flow<T> = when (this) { is CancellableFlow<*> -> this // Fast-path, already cancellable else -> CancellableFlowImpl(this) }

 

먼저 `cancellable` 연산자를 사용하지 않고 `cancel`을 호출해 Flow 도중에 취소를 시킵니다. 그럼 아래와 같이 1 ~5까지의 값이 모두 emit된 이후에 예외가 발생합니다. 

<kotlin />
fun main() = runBlocking { flowOf(1, 2, 3, 4, 5) .collect { value -> if (value == 3) cancel() print("$value ") // 1 2 3 4 5 Exception in thread "main" } }

 

`cancellable` 연산자를 사용하면 1 ~ 3까지만 호출되고 그 이후는 emit되지 않는 것을 확인할 수 있습니다.

<code />
fun main() = runBlocking { flowOf(1, 2, 3, 4, 5) .cancellable() // <--- This is the new operator .collect { value -> if (value == 3) cancel() print("$value ") // 1 2 3 Exception in thread "main" } }

 

SharedFlow에 `cancellable` 연산자를 사용하면 아래와 같은 에러가 발생합니다. SharedFlow는 기본적으로 취소 가능한 구조를 가지고 있어서 `cancellable` 를 추가로 호출해도 의미가 없습니다. 그렇기에 이를 사용하는 것은 오류라고 알려줍니다. 

 

 

 

 

7. Context를 전환하는 연산자


Flow는 기본적으로 순차적으로 실행됩니다. 이는 모든 연산자가 동일한 코루틴에서 실행된다는 것을 의미합니다. 

<kotlin />
flowOf("A", "B", "C") .onEach { println("1$it") } .collect { println("2$it") }

 

예를 들어, 위의 코드에서는 `onEach`와 `collect`가 같은 코루틴 Q에서 실행됩니다.

<bash />
Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--

 

그렇기 때문에 연산이 오래 걸릴 경우 전체 실행 시간이 연산들의 총합이 됩니다. 

 

 

7.1. buffer

`buffer` 연산자를 사용하면 Flow의 실행이 별도의 코루틴에서 수행됩니다.

<kotlin />
flowOf("A", "B", "C") .onEach { println("1$it") } .buffer() // <--------------- buffer가 추가됨 .collect { println("2$it") }

 

아래와 같이 작업이 2개의 코루틴에서 병렬적으로 실행됩니다.

<bash />
P : -->-- [1A] -- [1B] -- [1C] ---------->-- // flowOf(...).onEach { ... } | | channel // buffer() V Q : -->---------- [2A] -- [2B] -- [2C] -->-- // collect

 

만약 버퍼가 가득차게되면, 이벤트를 발생시키는 P 코루틴이 일시 정지되어 Q 코루틴이 따라잡을 수 있도록 합니다. 이러한 작업은 `onBufferOverflow` 매개변수를 통해 변경가능합니다.

 

`buffer` 연산자의 개념적인 구현은 아래와 같습니다. (실제 코드 구현과는 다릅니다) 내부에서 채널을 사용한다는 것을 알아두셔야 합니다.

<kotlin />
fun <T> Flow<T>.buffer(capacity: Int = DEFAULT): Flow<T> = flow { coroutineScope { val channel = produce(capacity = capacity) { collect { send(it) } } channel.consumeEach { emit(it) } } }

 

 

그리고 아래와 같이 동작합니다.

  • produce()를 사용하여 생산자 코루틴(P)를 새롭게 실행하여 Flow 데이터를 send(it)로 채널에 넣음.
  • channel.consumeEach {}를 사용하여 소비자 코루틴(Q)가 채널에서 데이터를 읽고 emit함.
  • 생산자 P가 데이터를 빠르게 생성하더라도, 소비자 Q가 처리할 준비가 될 때까지 버퍼(capacity)만큼 데이터를 보관함.

 

 

실제로 테스트를 해보면 시간이 단축된다는 것을 확인할 수 있습니다.

 

<code />
fun main() = runBlocking { val timeWithoutBuffer = measureTimeMillis { flowOf("A", "B", "C") .onEach { delay(100) // 생산 지연 (예: API 요청) } .collect { delay(300) // 소비 지연 (예: UI 업데이트) } } println("⏳ Without buffer: $timeWithoutBuffer ms") val timeWithBuffer = measureTimeMillis { flowOf("A", "B", "C") .onEach { delay(100) // 생산 지연 } .buffer() // 🔥 버퍼 추가 .collect { delay(300) // 소비 지연 } } println("🚀 With buffer: $timeWithBuffer ms") } // ⏳ Without buffer: 1222 ms // 🚀 With buffer: 1042 ms

 

 

7.2. conflate

`buffer` 연산자가 내부에서 채널을 사용한다고 말했습니다. `conflate` 연산자는 `buffer` 연산자가 사용하는 내부 채널을 CONFLATED로 사용하는 연산자입니다. CONFLATED를 간단하게 설명하면, 이미 생산된 데이터가 있는 경우 데이터를 override하는 방식입니다.

<code />
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

 

`conflate` 연산자는 collect 작업이 느리게 처리되는 경우에 이전 값을 건너 뛰고 최신값을 전달합니다. 아래의 예시에서는 가장 먼저 A가 collect되고 해당 작업을 처리하는 과정에 나머지 B, C, D, E가 emit되면서 B, C, D는 생략되고 이 중 가장 마지막 값인 E만 출력되게 됩니다.

<code />
fun main() = runBlocking { flowOf("A", "B", "C", "D", "E") .conflate() .collect { delay(1000) println(it) // A E } }

 

 

7.3. flowOn

`flowOn` 연산자는 주어진 context에서 Flow가 실행되도록 변경하는 연산자입니다. 이 연산자는 하위 Flow에는 전파되지 않습니다.

 
<code />
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> { checkFlowContext(context) return when { context == EmptyCoroutineContext -> this this is FusibleFlow -> fuse(context = context) else -> ChannelFlowOperatorImpl(this, context = context) } }

 

예를 들어, 다음과 같이 사용할 수 있습니다.

<kotlin />
withContext(Dispatchers.Main) { val singleValue = intFlow // 컨텍스트가 명시되지 않으면 IO에서 실행됨 .map { ... } // IO에서 실행됨 .flowOn(Dispatchers.IO) // IO에서 실행됨 .filter { ... } // Default에서 실행됨 .flowOn(Dispatchers.Default) // Default에서 실행됨 .single() // Main에서 실행됨 }

 

만약 여러개의 `flowOn` 연산자가 중첩된다면, 첫 번째 연산자가 우선권을 가집니다. 실제로 테스트해보면 아래의 결과를 얻을 수 있습니다.

<code />
fun main() = runBlocking { withContext(Dispatchers.Default) { flowOf("A") .map { println("In map, current context: ${currentCoroutineContext()}") it } .flowOn(Dispatchers.IO) .flowOn(Dispatchers.Default) .collect { println("In collect, current context: ${currentCoroutineContext()}") } } } // In map, current context: [ProducerCoroutine{Active}@3ae99a59, Dispatchers.IO] // In collect, current context: [ScopeCoroutine{Active}@70079648, Dispatchers.Default]

 

그리고 `flowOn` 연산자에 전달할 Context에는 Job을 포함할 수 없습니다.

 

 

 

 

8. 마무리하며


이렇게 해서 Flow의 중단 연산자에 대해 알아보았습니다. 제가 한 분류는 제 생각대로 해본것이기 때문에 틀리거나 이상한점도 있을 수 있습니다. 

 

만약 문제가 있거나 빠트린 부분이 있다면 댓글로 알려주시길 바라겠습니다.

profile

Developing Myself Everyday

@배준형

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!