이번 게시글에서는 Flow의 생산자에 대해서 알아봅니다.
1. Flow 생산자 (Producer)
1.1. Flow란?
우리가 사용하는 Flow는 아래와 같은 인터페이스입니다. 이 인터페이스는 `collect`라는 중단 함수를 가지고 있습니다.
<kotlin />
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
collect 함수는 `FlowCollector`를 인자로 받아 Flow가 발생한 데이터를 수집합니다. 즉, Flow가 데이터를 발행하고, `FlowCollector`가 그것을 처리하는 구조입니다.
`FlowCollector`는 Flow가 발행한 데이터를 실제로 처리하는 객체입니다. `FlowCollector`의 핵심 메서드인 `emit`을 통해 데이터를 수집하고 처리합니다.
<kotlin />
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
1.2. flow { }
Flow를 생성하는 여러 방법 중 가장 기본이 되는 것은 `flow { }`입니다.
<code />
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
예를 들어:
<kotlin />
flow {
emit(1)
emit(2)
emit(3)
}.collect {
println(it)
}
1.3. SafeFlow는 무엇인가?
`flow { }`의 반환 타입은 SafeFlow입니다.
<code />
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
SafeFlow는 이름에서 알 수 있듯이 안전한 Flow에 사용된다고 생각할 수 있습니다. 하지만 사실은 조금 다릅니다. SafeFlow는 반대로 안전하지 않는 Flow를 감싸서 안전하게 만드는 역할을 합니다.
1.4. flow의 "안전성"이란
Flow는 기본적으로 단일 흐름(single flow)을 유지해야 합니다.
데이터들이 순차적으로 흘러가야 하며, 이러한 규칙은 반드시 지켜져야 합니다. 만약 순차성이 깨지면 예기치 못한 결과나 동시성 문제가 발생할 수 있습니다.
이러한 개념을 이해하고 다시 `flow { }` 예제를 보겠습니다. 이 예제에서 flow {}는 순차적으로 값을 발행합니다. emit(1) 후에 emit(2)가 실행되고, 그 후에 emit(3)이 실행됩니다. 각 emit은 suspend 함수이므로, 비동기적으로 값을 발행하고, 이전 값이 처리된 후에야 다음 값으로 넘어갑니다. 이 흐름은 안전하고, 순차적인 처리가 보장됩니다.
<kotlin />
flow {
emit(1)
emit(2)
emit(3)
}.collect {
println(it)
}
만약 내부에서 별도의 코루틴을 생성한다면 어떻게 될까요. launch를 통해 새로운 코루틴을 실행하고 emit을 하게 되면, 순차적으로 값을 처리한다는 규칙을 지키지 못하게 됩니다.
<kotlin />
flow {
emit(1)
emit(2)
launch {
emit(3)
}
}.collect {
println(it)
}
`flow { }`는 다른 빌더들과는 다르게 자율성이 존재합니다. flow 안에서 새롭게 코루틴을 생성할 수 있고 다른 빌더들이 할 수 없는 것들을 많이 할 수 있습니다.
그런데 자율성이 높아지면 자연스럽게 안전성은 떨어지게 됩니다.
SafeFlow는 바로 이 지점에서 사용됩니다. 너무 깊게 다루지는 않겠지만, SafeFlow는 내부에서 규칙을 해치는 경우가 발생하면 에러를 반환해 안전하지 못한 flow를 안전하게 사용하도록 보장합니다.
<kotlin />
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: Emission from another coroutine is detected.
1.5. UnsafeFlow - ex) flowOf
반대로 `flowOf`처럼 내부 구조가 안전성이 보장되는 경우는 SafeFlow가 필요하지 않습니다.
그렇기 때문에 불필요한 내부 체크 로직을 사용하지 않는 unsafeFlow가 사용됩니다.
<kotlin />
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
즉, 동시성이 개입할 여지가 없는 경우에는 안전성 검증 로직을 생략해도 됩니다.
1.6. asFlow()
`asFlow()`는 다양한 값들을 flow로 쉽게 변환할 수 있는 유틸리티 확장함수입니다. 여러 타입에 대응할 수 있게 다양하게 정의되어 있습니다.
<kotlin />
public fun <T> (() -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
1.7. channelFlow
`flow { }`는 emit을 순차적으로 보장하지만, 병렬적으로 데이터를 emit 하고 싶을 때는 `channelFlow`를 사용합니다.
<kotlin />
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
ChannelFlowBuilder(block)
`channelFlow`는 채널을 활용해서 여러 작업이 동시적으로 이뤄질 수 있게 하고, emit도 순차적으로 이뤄지게 보장합니다. 즉 생산은 병렬, 소비는 순차적이라는 구조입니다.
즉,
- 여러 coroutine에서 send()를 통해 데이터를 "채널"에 넣을 수 있고 (병렬 가능)
- 이 채널에 들어간 데이터는 하나씩, 순차적으로 downstream으로 전달됩니다 (collect 시에는 순차적)
아래의 예시를 보면 쉽게 이해가 됩니다. `channelFlow`는 emit 대신에 send라는 별도의 메서드를 사용합니다.
<kotlin />
fun main() = runBlocking {
channelFlow {
send(1)
launch {
send(2)
}
}.collect {
println(it)
}
}
// 1
// 2
`flow { }`와 달리 `channelFlow`는 내부에 별도의 코루틴을 생성해도 에러가 발생하지 않습니다.
다만, 채널에 send 되는 값들의 순서는 보장하지 못한다는 단점이 존재합니다.
1.8. callbackFlow
`callbackFlow`는 이름 그대로 콜백 기반을 Flow로 처리하기 위해 사용합니다.
사실 `callbackFlow`와 `channelFlow`의 구현 클래스를 보게 되면 이 둘은 실질적으로 큰 차이가 존재하지 않습니다. 차이가 있다고 한다면, 하나의 조건이 추가되었다는 것입니다.
<kotlin />
private open class ChannelFlowBuilder<T>(
private val block: suspend ProducerScope<T>.() -> Unit,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelFlowBuilder(block, context, capacity, onBufferOverflow)
override suspend fun collectTo(scope: ProducerScope<T>) =
block(scope)
override fun toString(): String =
"block[$block] -> ${super.toString()}"
}
private class CallbackFlowBuilder<T>(
private val block: suspend ProducerScope<T>.() -> Unit,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowBuilder<T>(block, context, capacity, onBufferOverflow) {
override suspend fun collectTo(scope: ProducerScope<T>) {
if (!scope.isClosedForSend) {
throw IllegalStateException(
"""
'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.
Otherwise, a callback/listener may leak in case of external cancellation.
See callbackFlow API documentation for the details.
""".trimIndent()
)
}
}
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
CallbackFlowBuilder(block, context, capacity, onBufferOverflow)
}
주석은 "awaitClose { ... }를 블록의 마지막에 사용해 콜백 또는 리스너를 명확하게 해제하여 메모리 누수가 일어나지 않도록 하세요" 라고 말합니다.
즉, `callbackFlow`와 `channelFlow`는 기능상 차이가 전혀 없고 사용 사례에 맞춰서 이름만 다르게 지정한 것입니다.
이는 누군가가 작성된 코드를 보았을 때, 어떤 Flow 빌더를 사용했는지에 따라 무엇을 하려는지 바로 파악할 수 있게 합니다.
`callbackFlow`는 아래와 같이 콜백 기반을 Flow로 변환할 때 사용됩니다.
<kotlin />
fun locationFlow(): Flow<Location> = callbackFlow {
val listener = object : LocationListener {
override fun onLocationChanged(loc: Location) {
trySendBlocking(loc)
}
}
locationManager.requestUpdates(listener)
awaitClose {
locationManager.removeUpdates(listener)
}
}
2. 🔍 마무리 정리
flow {} | 기본 빌더, 단일 흐름을 생성 (순차 emit 필수) |
channelFlow {} | 병렬 데이터 생산 가능, send로 emit 대체 |
callbackFlow {} | 콜백 기반 API와 통합할 때 명시적 사용 |
flowOf(...) | 정적인 요소를 Flow로 간단히 생성 |
asFlow() | 기존 타입 (Iterable, Function 등)을 Flow로 변환 |
'Android > Kotlin' 카테고리의 다른 글
안드로이드의 MessageQueue 이해하기 (0) | 2025.04.08 |
---|---|
코루틴의 구조적 동시성(Structured Concurrency) (0) | 2025.03.25 |
[Flow 연산자] 중간 연산자 총정리 (0) | 2025.02.12 |
Flow의 collect은 언제 suspend 될까? (0) | 2025.01.21 |
왜 SharedFlow의 emit()은 suspend 함수일까? (2) | 2025.01.01 |