이번 게시글에서는 SharedFlow에 `emit()`을 호출할 때 발생했던 의문점을 해결해보고자 합니다.
의문점: 왜 SharedFlow의 emit()은 suspend 함수일까?
저는 ViewModel에서 발생하는 Effect를 SharedFlow를 통해서 관리하고 있습니다.
예를 들어, 스낵바를 띄우는 행위를 하기 위해 SharedFlow에 `emit()`을 해야 하는데, 이를 위해 매번 viewModelScope.launch를 통해 별도의 코루틴을 생성해야 했습니다.
private fun showSnackBar(message: String) {
viewModelScope.launch {
_UiEffect.emit(UiEffect.ShowSnackBar(message))
}
}
그러다 이런 방식을 사용해야 하는 이유에 대해 궁금증이 생겼습니다. 질문에 답을 하기위해 이제부터 `emit()`이 suspend 함수로 정의된 이유에 대해 알아보겠습니다.
SharedFlow의 구조와 특성
이유에 대해 알기 위해선, 먼저 SharedFlow에 대해서 알아야 합니다.
SharedFlow는 이름에서도 알 수 있듯이, 여러 구독자가 동시에 공유(Shared)된 데이터를 소비할 수 있는 구조를 제공합니다.
하지만, 구독자들은 기본적으로 데이터를 처리하는 속도가 다릅니다. 그렇기 때문에 일반적인 구조에서는 모든 구독자들이 데이터를 수신하는 것을 보장하지 못했습니다.
그렇기 때문에 SharedFlow는 데이터를 일시적으로 저장할 수 있는 버퍼 구조로 구현되었습니다.
이러한 내용은 SharedFlow의 내부 구조에 대한 내용은 구현체 `SharedFlowImpl`의 주석에서 확인할 수 있습니다.
internal open class SharedFlowImpl<T>(
private val replay: Int,
private val bufferCapacity: Int,
private val onBufferOverflow: BufferOverflow
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
/*
Logical structure of the buffer
buffered values
/-----------------------\
replayCache queued emitters
/----------\/----------------------\
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E | | | |
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
^ ^ ^ ^
| | | |
head | head + bufferSize head + totalSize
| | |
index of the slowest | index of the fastest
possible collector | possible collector
| |
| replayIndex == new collector's index
\---------------------- /
range of possible minCollectorIndex
head == minOf(minCollectorIndex, replayIndex) // by definition
totalSize == bufferSize + queueSize // by definition
INVARIANTS:
minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
replayIndex <= head + bufferSize
*/
}
주석의 내용을 요약하자면 아래와 같습니다.
- SharedFlowImpl은 버퍼와 재생 캐시를 기반으로 작동하며, 구독자 간 동기화를 관리합니다.
- head, bufferSize, totalSize 같은 주요 포인터와 조건들이 SharedFlow의 일관성을 보장합니다.
- BufferOverflow와 replay 설정을 통해 다양한 동작을 커스터마이즈할 수 있습니다.
SharedFlow는 버퍼 구조이다.
SharedFlow가 기본적으로 값을 버퍼에 저장한다는 것을 이제 알았습니다. 하지만, 버퍼는 용량이 존재합니다.
SharedFlow의 버퍼 용량은 replay와 extraBuffectCapcity에 의해 결정됩니다.
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
require(replay >= 0) { "replay cannot be negative, but was $replay" }
require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
"replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
}
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
만약 버퍼의 용량이 가득 차 있을 때, 추가적으로 값을 `emit()` 하려고 한다면 버퍼 오버플로우가 발생합니다.
MtableSharedFlow를 정의할 때, `BufferOverflow`를 결정하게 되는데 바로 설정에 따라 버퍼 오버플로우가 발생할 때에 따라 동작이 달라지게 됩니다.
`BufferOverflow`는 기본적으로 SUSPEND로 설정되어 있고, 그렇기 때문에 `emit()`은 버퍼 오버플로우가 발생할 때에는 중단될 수 있습니다.
emit()이 suspend 함수인 이유
이러한 내용들 때문에 `emit()`은 값을 방출하기 위해 대기될 수 있으며, 따라서 suspend 함수로 정의됩니다.
추가: tryEmit()
만약 SharedFlow에 값을 방출할 때, 중단 없이 시도하려면 `tryEmit()`을 사용하여 시도할 수 있습니다.
override fun tryEmit(value: T): Boolean {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitted = synchronized(this) {
if (tryEmitLocked(value)) {
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false
}
}
for (cont in resumes) cont?.resume(Unit)
return emitted
}
이 메서드는 값의 방출을 시도하고, 방출의 성공 여부를 반환합니다. 이 함수가 false를 반환한다는 것은 방출 작업이 버퍼 공간이 확보될 때까지 중단될 것을 의미합니다.
'Android > Kotlin' 카테고리의 다른 글
Flow의 collect은 언제 suspend 될까? (0) | 2025.01.21 |
---|---|
Kotlin에서의 함수 컬러링이란? suspend와 composable 함수 (1) | 2024.10.15 |
KAPT, KSP 그리고 어노테이션 (3) | 2024.09.11 |
Immutable 그리고 Persistent Collections에 대해서 (1) | 2024.02.26 |
Ktor에 대해 알아보고 HTTP 통신 해보기 (1) | 2023.12.04 |