Developing Myself Everyday
article thumbnail

사진: UnsplashErfan

 

 

 

이번 게시글에서는 SharedFlow에 `emit()`을 호출할 때 발생했던 의문점을 해결해보고자 합니다.


 

 

의문점: 왜 SharedFlow의 emit()은 suspend 함수일까?


저는 ViewModel에서 발생하는 Effect를 SharedFlow를 통해서 관리하고 있습니다.

 

예를 들어, 스낵바를 띄우는 행위를 하기 위해 SharedFlow에 `emit()`을 해야 하는데, 이를 위해 매번 viewModelScope.launch를 통해 별도의 코루틴을 생성해야 했습니다. 

private fun showSnackBar(message: String) {
    viewModelScope.launch {
        _UiEffect.emit(UiEffect.ShowSnackBar(message))
    }
}

 

그러다 이런 방식을 사용해야 하는 이유에 대해 궁금증이 생겼습니다. 질문에 답을 하기위해 이제부터 `emit()`이 suspend 함수로 정의된 이유에 대해 알아보겠습니다.

 

 

 

SharedFlow의 구조와 특성


이유에 대해 알기 위해선, 먼저 SharedFlow에 대해서 알아야 합니다.

 

SharedFlow는 이름에서도 알 수 있듯이, 여러 구독자가 동시에 공유(Shared)된 데이터를 소비할 수 있는 구조를 제공합니다.

 

하지만, 구독자들은 기본적으로 데이터를 처리하는 속도가 다릅니다. 그렇기 때문에 일반적인 구조에서는 모든 구독자들이 데이터를 수신하는 것을 보장하지 못했습니다. 

 

그렇기 때문에 SharedFlow는 데이터를 일시적으로 저장할 수 있는 버퍼 구조로 구현되었습니다.

 

이러한 내용은 SharedFlow의 내부 구조에 대한 내용은 구현체 `SharedFlowImpl`의 주석에서 확인할 수 있습니다.

internal open class SharedFlowImpl<T>(
    private val replay: Int,
    private val bufferCapacity: Int,
    private val onBufferOverflow: BufferOverflow
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    /*
        Logical structure of the buffer

                  buffered values
             /-----------------------\
                          replayCache      queued emitters
                          /----------\/----------------------\
         +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
         |   | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E |   |   |   |
         +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
               ^           ^           ^                      ^
               |           |           |                      |
              head         |      head + bufferSize     head + totalSize
               |           |           |
     index of the slowest  |    index of the fastest
      possible collector   |     possible collector
               |           |
               |     replayIndex == new collector's index
               \---------------------- /
          range of possible minCollectorIndex

          head == minOf(minCollectorIndex, replayIndex) // by definition
          totalSize == bufferSize + queueSize // by definition

       INVARIANTS:
          minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
          replayIndex <= head + bufferSize
     */
     
 }

 

주석의 내용을 요약하자면 아래와 같습니다.

  • SharedFlowImpl은 버퍼와 재생 캐시를 기반으로 작동하며, 구독자 간 동기화를 관리합니다.
  • head, bufferSize, totalSize 같은 주요 포인터와 조건들이 SharedFlow의 일관성을 보장합니다.
  • BufferOverflow와 replay 설정을 통해 다양한 동작을 커스터마이즈할 수 있습니다.

 

 

 

SharedFlow는 버퍼 구조이다.


SharedFlow가 기본적으로 값을 버퍼에 저장한다는 것을 이제 알았습니다. 하지만, 버퍼는 용량이 존재합니다. 

 

SharedFlow의 버퍼 용량은 replay와 extraBuffectCapcity에 의해 결정됩니다.

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    require(replay >= 0) { "replay cannot be negative, but was $replay" }
    require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
    require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
        "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
    }
    val bufferCapacity0 = replay + extraBufferCapacity
    val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
    return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}

 

 

만약 버퍼의 용량이 가득 차 있을 때, 추가적으로 값을 `emit()` 려고 한다면 버퍼 오버플로우가 발생합니다.

 

MtableSharedFlow를 정의할 때, `BufferOverflow`를 결정하게 되는데 바로 설정에 따라 버퍼 오버플로우가 발생할 때에 따라 동작이 달라지게 됩니다. 

 

`BufferOverflow`는 기본적으로 SUSPEND로 설정되어 있고, 그렇기 때문에 `emit()` 버퍼 오버플로우가 발생할 때에는 중단될 수 있습니다.

 

 

 

emit()이 suspend 함수인 이유


이러한 내용들 때문에 `emit()` 값을 방출하기 위해 대기될 수 있으며, 따라서 suspend 함수로 정의됩니다.

 

 

 

추가: tryEmit()


만약 SharedFlow에 값을 방출할 때, 중단 없이 시도하려면 `tryEmit()`을 사용하여 시도할 수 있습니다.

override fun tryEmit(value: T): Boolean {
    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
    val emitted = synchronized(this) {
        if (tryEmitLocked(value)) {
            resumes = findSlotsToResumeLocked(resumes)
            true
        } else {
            false
        }
    }
    for (cont in resumes) cont?.resume(Unit)
    return emitted
}

 

 

이 메서드는 값의 방출을 시도하고, 방출의 성공 여부를 반환합니다. 이 함수가 false를 반환한다는 것은 방출 작업이 버퍼 공간이 확보될 때까지 중단될 것을 의미합니다.

profile

Developing Myself Everyday

@배준형

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!