Developing Myself Everyday
article thumbnail

코루틴


코틀린에서는 동시성 기능을 처리할 수 있도록 다양한 클래스와 함수를 코루틴으로 지원합니다.

 

코루틴이란?

코루틴은 이러한 동시성 작업을 간편하게 처리할 수 있게 해줍니다.

 

 

코루틴과 서브루틴의 차이점

서브루틴은(subroutine)은 함수를 호출했을 때 결과를 반환하는 순수 함수입니다. 이런 서브루틴 함수는 순차적으로 처리됩니다.

 

코루틴(Corutine)은 상태를 관리하고 일시 중단했다가 다시 시작할 수 있는 일시중단 함수로 구성합니다.

 

 

동시성 프로그래밍

동시성 프로그래밍은 기존의 순서대로 명령들이 실행되는 구조에서 벗어나 명령들이 불규칙한 순서대로 실행될 수 있게 해주는 프로그래밍 기법입니다. 멀티쓰레딩은 동시성 프로그래밍의 한 방법이지만, 동시성 프로그래밍이 꼭 병렬처리 및 멀티쓰레딩을 의미하지는 않습니다.

 

 

non - blocking

저번주에 배웠던 스레드는 여러 스레드에 나눠서 작업을 수행할 때, 스레드를 일시 중지하면 해당 스레드가 blocking 되어 다른 스레드에서 작업을 완료한 뒤에 blocking된 스레드를 디시 실행했습니다.

 

코루틴도 마찬가지로 스레드를 사용하지만 no -blocking입니다.

 

그 이유는 코루틴은 스레드 내에서 실행할 때 중단 지점을 만난다면 코루틴은 스레드를 떠나 대기중인 다른 코루틴을 선택할 수 있도록 놓아주기 때문입니다.

 

 

 

 

코루틴 환경 구성


코루틴이 실행되기 위해서는 별도의 영역인 코루틴 스코프가 정의되어야 합니다. 이를 정의하는 방법으로는 코루틴 빌더 함수를 사용하는 방법이 있습니다.

 

 

코루틴 스코프

코루틴 스코프의 종류로는 크게 CorountineScope와 GloabalScope가 있습니다.

 

CorountineScope는 인터페이스로 CoroutineContext를 가지고 있습니다.

public interface CoroutineScope {
    public abstract val coroutineContext: kotlin.coroutines.CoroutineContext
}

 

 

GlobalScopeCoroutineScope 인터페이스를 구현한, CoroutineScope의 한 종류입니다. GlobalScope는 object로 선언된 싱글톤 객체로 애플리케이션의 생명주기와 연동됩니다. 

@kotlinx.coroutines.DelicateCoroutinesApi public object GlobalScope : kotlinx.coroutines.CoroutineScope {
    public open val coroutineContext: kotlin.coroutines.CoroutineContext /* compiled code */
        public open get
}

 

그렇기 때문에 어디에서든 접근 가능하며, 애플리케이션이 종료될 때까지 유지되기에 신중히 사용해야 합니다.

 

 

코루틴 빌더

코루틴 빌더는 위의 코루틴 스코프를 구성하는 함수를 말합니다. 코루틴 빌더 함수로는 `runblocking()`, `launch()`, `async()`가 있습니다.

 

 

runBlocking()


runBlocking() 빌더는 현재 스레드에서 실행되는 코루틴을 만들고 코루틴이 완료될 때까지 현재 스레드의 실행을 Block시킵니다.

 

즉 runBlocking을 사용해서 코루틴을 사용했다면 해당 코루틴이 끝날 때까지 대기하게 됩니다.

 

아래와 같은 예시를 보겠습니다.

fun main() = runBlocking {
    delay(100)
    println("메인 스레드 작업: ${Thread.currentThread().name}")

    val c = CoroutineScope(Dispatchers.Default).launch {
        delay(200)
        println("백그라운드 스레드 작업: ${Thread.currentThread().name}")
    }
}
// 출력 - 메인 스레드 작업: main

 

runBlocking으로 메인 스레드에서 실행되는 코루틴을 만들고 그 내부에 백그라운드 스레드에서 실행되는 코루틴을 정의하였습니다.

 

runBlocking은 메인 스레드에서 실행되는 코루틴의 실행이 종료되면 종료됩니다. 그렇기 때문에, 지금 위의 예시에서 내부 백그라운드 코루틴의 delay가 200이기 때문에 해당 코루틴의 실행을 보장할 수는 없습니다.

 

 

join()

join() 메서드를 사용한다면, 해당 코루틴의 실행이 종료된 후에 메인 스레드 코루틴도 종료됩니다.

fun main() = runBlocking {
    delay(100)
    println("메인 스레드 작업: ${Thread.currentThread().name}")

    val c = CoroutineScope(Dispatchers.Default).launch {
        delay(200)
        println("백그라운드 스레드 작업: ${Thread.currentThread().name}")
    }

    c.join()
}
// 출력 - 메인 스레드 작업: main
//       백그라운드 스레드 작업: DefaultDispatcher-worker-1

 

 

 

 

 

launch()


launch 빌더는 CoroutineScope의 확장 함수로, 전역 스코프나 코루틴 스코프에 코루틴을 추가할 수 있습니다.

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext, 
    start: CoroutineStart = CoroutineStart.DEFAULT, 
    block: suspend CoroutineScope.() -> Unit
): Job

 

 

launch는 위처럼 Context, 코루틴 실행 옵션, 코루틴의 본문을 정의하는 Block이 있습니다. 코루틴 빌더를 사용해 코루틴을 시작할 때, 아래와 같이 Context을 지정해주면 UI 스레드에서 작업이 수행됩니다.

 

 

 

 

async()


async의 구조는 launch과 매우 유사합니다. 이 둘의 차이는 반환값에 있습니다.

fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext, 
    start: CoroutineStart = CoroutineStart.DEFAULT, 
    block: suspend CoroutineScope.() -> T
): Deferred<T>

 

 

Job

먼저 launch가 반환하는 Job을 보겠습니다.

interface Job : CoroutineContext.Element(source)

 

 

 

Job은 인터페이스로 완료되는 생명주기를 가지며 취소 가능한 객체입니다.

 

 

개념적으로 Job은 결과 값을 생성하지 않으며, 부수 효과만을 위해 시작됩니다.

 

 

Deferred

async가 반환하는 Deferred의 생명주기는 Job과 동일하고 결과 값을 가지는 Job입니다.

interface Deferred<out T> : Job

 

 

Deferred의 결과는 작업이 완료될 때 사용 가능하며, await 메서드를 통해 이를 조회할 수 있습니다.

 public suspend fun await(): T

 

await 메서드는 스레드를 블로킹하지 않고 Job이 완료될 때까지 대기한 다음 결과값을 반환하거나, Job이 취소된 경우 해당 예외를 throw하여 재개합니다.

 

 

start(), await()

LAZY하게 시작하는 코루틴에 start, await 메서드를 사용하면 Deferred를 반환할 때까지 대기할 수 있습니다.

fun main() = runBlocking {
    delay(100)
    println("메인 스레드 작업: ${Thread.currentThread().name}")

    val deferred = CoroutineScope(Dispatchers.Default).async(start = CoroutineStart.LAZY) {
        delay(200)
        println("백그라운드 스레드 작업: ${Thread.currentThread().name}")
    }

    deferred.start()
    deferred.await()
}
// 출력 - 메인 스레드 작업: main
//       백그라운드 스레드 작업: DefaultDispatcher-worker-1

 

 

 

 

withContext()


withContext는 현재 실행되고 있는 코루틴의 Dispatcher를 다른 Dispatcher로 바꾸는 역할을 합니다.

suspend fun <T> withContext(
    context: CoroutineContext, 
    block: suspend CoroutineScope.() -> T
): T

Dispatcher는 스레드나 스레드 풀에서 코루틴이 어디에 실행될 것인지 정의하는 역할을 수행합니다.

 

 

withContext는 블로킹 작업을 수행하기에 매우 적절합니다. 만약 기본 코루틴 실행 Context에서 네트워크 요청이나 디스크 I/O와 같은 작업을 수행하기에는 매우 부적절합니다. 따라서 withContext를 사용하여 Dispatcher를 변경한 다음 해당 Context에서 작업을 수행할 수 있습니다.

 

그리고 block에서 바로 값을 리턴해주고 있기 때문에 작업을 수행한 다음 결과를 얻을 때에도 매우 유용합니다.

suspend fun performNetworkRequest(): String = withContext(Dispatchers.IO) {
    // Network request code goes here
    // ...
    // Return the result
}

 

 

 

withTimeout()

withTimeout 메서드는 단순하게 시간을 지정합니다.

suspend fun <T> withTimeout(
    timeMillis: Long, 
    block: suspend CoroutineScope.() -> T
): T

 

 

만약 지정신 시간을 초과한다면 `TimeoutCancellationException`을 던지고 해당 시간이 초과하지 않은 경우 block의 코드를 실행합니다.

 

그렇기 때문에 withTimeout 메서드를 사용하면 특정 시간까지만 block을 처리할 수 있습니다.

 

 

 

 

Dispatcher


Dispatcher는 이전에 말했듯 스레드나 스레드 풀에서 코루틴이 어디에 실행될 것인지 정의하는 역할을 수행합니다.

 

Dispatcher의 종류는 아래와 같습니다.

package kotlinx.coroutines

public object Dispatchers {
    @JvmStatic 
    public final val Default: CoroutineDispatcher

    @JvmStatic 
    public final val IO: CoroutineDispatcher

    @JvmStatic 
    public final val Main: MainCoroutineDispatcher
        public final get

    @JvmStatic 
    public final val Unconfined:CoroutineDispatcher

    @DelicateCoroutinesApi
    public final fun shutdown()
}

 

 

Dispatcher.Default

기본 값으로 JVM의 기본 스레드 풀을 사용합니다. 스레드는 기본적으로 2개가 사용됩니다. Dispatcher를 정의하지 않았다면 Default입니다.

launch(Dispatchers.Default) {
    println("스레드 작업: ${Thread.currentThread().name}")
} // 스레드 작업: DefaultDispatcher-worker-1

 

 

Dispatcher.Main

UI 객체와 작동하는 메인 스레드에 제한된 Dispatcher입니다. 메인 스레드가 하나이기에 이 Dispatcher 또한 단일 스레드를 사용합니다.

 

 

Dispatcher.IO

필요에 따라 스레드를 더 생성하거나 줄일 수 있는 Dispatcher입니다. Default와 같은 스레드를 공유합니다.

suspend fun main() = coroutineScope<Unit> {

    launch(Dispatchers.Default) {
        println("스레드 작업: ${Thread.currentThread().name}")
    } // 스레드 작업: DefaultDispatcher-worker-1

    launch(Dispatchers.IO) {
        println("스레드 작업: ${Thread.currentThread().name}")
    }
}

//    스레드 작업: DefaultDispatcher-worker-1
//    스레드 작업: DefaultDispatcher-worker-1

 

그렇기 때문에 같은 스레드에서 작업이 일어나고 있는 것을 알 수 있습니다.

 

 

Dispatcher.Unconfined

특정 스레드에 자한되지 않는 Dispatcher입니다. 이는 현재 코루틴을 호출하고 있는 스레드에서 코루틴을 재개합니다.

 

다만 이러한 특성은 첫번째 중단점을 만날때 까지만 그렇습니다. 중단점 이후에 코루틴이 재개된다면, 중단 함수를 재개한 스레드에서 코루틴을 수행합니다.

 

이러한 특성 떄문에 특정 스레드에 국한된 작업이 아닌 경우에 적절한 Dispatcher입니다.

 

 

 

 

Suspend 일시중단 함수


코루틴 빌더 메서드들을 사용하면 Block이 일시 중단 함수로 정의됩니다.

suspend CoroutineScope.() -> T

 

 

 

이러한 일시 중단 함수를 다른 곳에서 정의하고 CoroutineScope 내에서 실행할 수 있습니다.

fun main() = runBlocking {
    helloWorld()
}

suspend fun helloWorld() {
    delay(1000L)
    println("hello World")
}

 

 

suspend 키워드를 함수 앞에 붙이는 것으로 일시 중단 함수를 정의할 수 있고, 이 함수는 CoroutineScope 내에서 실행되기에 CoroutineScope 내에서만 사용할 수 있는 여러 메서드를 사용할 수 있습니다.

 

 

 

 

코루틴 예외처리


부모 코루틴 안에 자식 코루틴이 있을때 예외 처리를 할 때, 코루틴 스코프를 지정해서 자식 코루틴에서 발생한 예외가 부모 코루틴에게 전파되지 않게 할 수 있습니다.

 

이렇게 할 수 있는 방법은 2가지가 있습니다.

 

  • 슈퍼바이저 스코프(supervisorScope): 코루틴 스코프를 지정해서 모든 자식에게 슈퍼바이저 잡을 적용할 때 사용합니다.
  • 슈퍼바이저 잡(supervisorJop): 특정 코루틴에 예외를 처리할 때, 자식 코루틴에 한정해서 처리하는 방식을 슈퍼바이저 잡이라고 합니다. 코루틴을 빌더할 때, 컨텍스트 내부에 정의합니다.

 

아래는 cancel된 job에 join을 하려고 해서 발생한 예외를 처리하는 예시입니다.

import kotlinx.coroutines.*

fun main() = runBlocking {
    val job = launch {
        try {
            repeat(1000) {
                println("코루틴 job 실행 : $it")
                delay(500L)
            }
        } catch (e: Exception) {
            println("중단에 따른 예외:" + e.message)
        }
    }
    delay(1300L)
    println("메인 처리 후 자식 종료")
    job.cancel()
    job.join()
    println("메인 종료")
}

//    출력 결과
//    코루틴 job 실행 : 0
//    코루틴 job 실행 : 1
//    코루틴 job 실행 : 2
//    메인 처리 후 자식 종료
//    중단에 따른 예외:StandaloneCoroutine was cancelled
//    메인 종료

 

 

핸들러를 사용한 예외 처리

아래와 같이 예외처리를 하는 핸들러를 따로 정의해서 이를 CoroutineContext에 추가해 핸들러에서 예외가 처리하게 할 수 있습니다.

val handler = CoroutineExceptionHandler { _, exception ->
    println("예외 처리: $exception")
}

val job = launch(handler) {
	...
}

 

만약 부모 코루틴에 핸들러를 등록한다면 자식 코루틴에서 발생시킨 예외를 부모 코루틴의 핸들러가 처리합니다.

 

 

Async 빌더 예외 처리

Async 빌더는 내부적으로 예외가 발생하면 예외를 잡아서 반환값으로 전달합니다. 이 반환값은 await 메서드를 통해 전파되며, 이런 예외가 전파되는 것을 막기 위해서는 위에서 설명했던 슈퍼바이저 스코프로 해당 빌더를 지정하면 됩니다.

 

아래의 코드는 슈퍼바이저 스코프를 사용해 부모 코루틴 까지 예외가 전달되지 않는 예시를 보여줍니다.

import kotlinx.coroutines.*

fun main() = runBlocking {
    try {
        supervisorScope {
            val job = async {
                println("코루틴 job 실행")
                delay(500L)
                throw Exception("첫번째 코루틴 내에서 예외 발생")
            }

            println("메인 처리 후 자식 종료")
            try {
                job.await()
            } catch (e: Exception) {
                println("예외를 다시 전달")
                println(e.message)
            }
            println("메인 종료")
        }
    } catch (e: Exception) {
        println("부모 코루틴까지 예외 전달")
        println(e.message)
    }
}

//    출력 결과
//    메인 처리 후 자식 종료
//    코루틴 job 실행
//    예외를 다시 전달
//    첫번째 코루틴 내에서 예외 발생
//    메인 종료

 

 

 

 

 

코루틴 채널


여러 개의 코루틴을 처리할 때, 코루틴 간의 데이터 교환을 처리하려면, 코루틴 간의 정보를 공유할 수 있는 채널(Channel)을 지정해서 정보를 전달할 수 있습니다.

 

 

 

체널의 타입은 capacity 속성을 지정해서 정의할 수 있는데, 이를 크게 4가지로 나눌 수 있습니다.

 

 1. Buffered

 2. Rendezvous (Unbuffered)

 3. Unlimited

 4. Conflated

 

Buffered

 

 

 

고정된 크기의 버퍼를 생성하는 타입입니다. 

fun main() {
    runBlocking {
        val channel = Channel<Int>(3)

        launch {
            for (n in 1..5) {
                delay(50)
                println("Sending Num $n")
                channel.send(n)
            }
        }

        launch {
            for (i in 1..5) {
                delay(100)
                val n = channel.receive()
                println("Receiving Num $n")
            }
        }
    }
}

 

위의 예제에서는 send()를 사용해 n를 보내고 receive()를 사용해 n를 받고 있습니다.

 

지금 Channal의 용량의 3개인 상황에서 송신쪽에서는 delay를 50을 주고 수신쪽에서는 delay를 100을 지정했습니다.

 

이런 상황에서는 시간차가 발생하게 되어, send로 보낸 데이터를 한번에 다 처리하지 못하고 쌓여있게 됩니다.

Sending Num 1
Receiving Num 1
Sending Num 2
Sending Num 3
Receiving Num 2
Sending Num 4
Sending Num 5
Receiving Num 3
Receiving Num 4
Receiving Num 5

 

 

 

 

 

Rendezvous (Unbuffered)

 

 

부제에서 알 수 있듯이 아무 버퍼가 없는 랑데부 채널 형태입니다. 이 경우 send() 호출은 receive()가 호출되기 전까지 항상 일시 중단되며 receive() 호출은 send()를 호출할 때까지 일시중단 됩니다.

 

채널 생성이 용량을 지정하지 않은 Default 채널이 바로 랑데부 채널입니다.

 

랑데부 채널은 딜레이 시간과 관계없이 안정적인 동작 순서를 볼 수 있습니다.

 

입력

fun main() {
    runBlocking {
    	// 용량을 정의하지 않음
        val channel = Channel<Int>()

        launch {
            for (n in 1..5) {
                delay(50)
                println("Sending Num $n")
                channel.send(n)
            }
        }

        launch {
            for (i in 1..5) {
                delay(100)
                val n = channel.receive()
                println("Receiving Num $n")
            }
        }
    }
}

 

출력

Sending Num 1
Receiving Num 1
Sending Num 2
Receiving Num 2
Sending Num 3
Receiving Num 3
Sending Num 4
Receiving Num 4
Sending Num 5
Receiving Num 5

 

 

 

Unlimited

 

 

이 채널은 버퍼의 용량 제한이 없고, 버퍼의 용량은 필요에 따라 증가할 수 있습니다.

 

이 경우 send()시에 일시 중단되는 일이 없지만 receive()시 버퍼가 비어있다면 일시 중단됩니다.

 

아래는 극단적인 예를 위해 송신측의 delay를 10으로 설정한 예시입니다.

 

 

입력

fun main() {
    runBlocking {
        val channel = Channel<Int>(Channel.UNLIMITED)

        launch {
            for (n in 1..5) {
                delay(10)
                println("Sending Num $n")
                channel.send(n)
            }
        }

        launch {
            for (i in 1..5) {
                delay(100)
                val n = channel.receive()
                println("Receiving Num $n")
            }
        }
    }
}

 

결과

Sending Num 1
Sending Num 2
Sending Num 3
Sending Num 4
Sending Num 5
Receiving Num 1
Receiving Num 2
Receiving Num 3
Receiving Num 4
Receiving Num 5

 

 

 

Conflated

 

 

이름 그대로 융합되는, 즉 송신된 값이 합쳐지는 채널입니다.

 

이 체널에서는, 가장 먼저 크기가 1인 고정 버퍼가 있는 채널이 생성됩니다.

 

그 다음에 send()로 보낸 원소가 누군가에 의해 수신되기 전에 다른 값이 send() 된다면 기존의 값을 덮어버리게 됩니다. 이렇게 되면 기존의 원소 값은 사라지게 됩니다.

 

이 채널의 경우 send() 메서드는 일시중단되지 않는다.

 

 

입력

fun main() {
    runBlocking {
        val channel = Channel<Int>(Channel.CONFLATED)

        launch {
            for (n in 1..5) {
                delay(50)
                println("Sending Num $n")
                channel.send(n)
            }
        }

        launch {
            for (i in 1..5) {
                delay(100)
                val n = channel.receive()
                println("Receiving Num $n")
            }
        }
    }
}

 

결과

Sending Num 1
Receiving Num 1
Sending Num 2
Sending Num 3
Receiving Num 3
Sending Num 4
Sending Num 5
Receiving Num 5

 

결과를 보게 되면 수신 값이 송신 값과 다르다는 것을 알 수 있습니다. 수신측과 송신측의 속도가 차이나기 때문에 그 속도 차이의 값 만큼 데이터가 유실된 것을 확인할 수 있습니다.

 

 

 

 

프로듀스와 컨슈머


이전에는 데이터를 채널로 주고 받았지만, 프로듀스와 컨슈머를 사용해서 처리할 수도 있습니다.

 

생산자(produce): 코루틴을 만들 때 별도의 확장함수를 지정해 생산자 코루틴을 만들어 사용합니다.

소비자(comsumeEach): 생산자의 메서드를 사용해서 전달된 메시지를 처리합니다.

 

 

아래는 코루틴스코프의 확장 함수로 생산자를 만든 마음 메시지를 전달하고, 이를 소비자에서 사용하는 예시입니다.

@ExperimentalCoroutinesApi
fun CoroutineScope.produceSquares(max: Int): ReceiveChannel<Int> = produce {
    for (x in 1..max) {
        send(x * x)
    }
}

@ExperimentalCoroutinesApi
fun main() = runBlocking {
    val squares = produceSquares(5)
    squares.consumeEach { println(it) }
    println("Done")
}


//    출력
//    1
//    4
//    9
//    16
//    25
//    Done

 

 

 

 

시퀀스(Sequences)와 Flow


시퀀스는 Iterable과 동일한 기능을 제공하지만 다른 접근 방식을 구현합니다.

 

바로 가능한 한 지연 실행(Lazy Evaluation) 한다는 것입니다. 시퀀스를 사용하면 실제 계산은 전체 처리 체인의 결과가 요청될 때만 발생합니다.

 

시퀀스는 각 요소에 대해 모든 처리 단계를 하나씩 순차적으로 수행합니다.

 

따라서 시퀀스를 사용하면 중간 단계의 결과를 만들지 않고 전체 컬렉션 처리 체인의 성능을 향상시킬 수 있습니다. 

 

 

 

시퀀스와 중단 함수

아래와 같은 예시를 보겠습니다.

fun simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}

 

위의 코드를 실행하면 `simple()` 함수는 이 함수를 실행하고 있는 메인 스레드를 정지시키게 됩니다.

 

이를 방지하는 방법은 함수에 `suspend` 키워드를 붙여 함수를 중단 함수로 바꾸는 것입니다.

suspend fun simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

 

 

다만 이것으로는 우리가 원하는 동작을 구현하지 못합니다. 우리는 메인 스레드를 블록하지 않고 시퀀스처럼 중간 단계의 결과를 만들고 싶지 않습니다. 이때 우리가 사용하는것이 바로 Flow입니다.

 

 

시퀀스와 Flow

Flow를 사용하면 각각의 연산을 출력할 때 메인 스레드를 정지하지 않고, 한번에 모든 값을 반환하지 않아도 됩니다. Flow는 시퀀스와 유사하게 Flow가 Collect 되기 전까지는 내부의 코드 블록이 실행되지 않습니다.

 

Flow를 사용하는 함수는 `suspend` 키워드를 붙이지 않아도 됩니다.

fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> println(value) } 
}

 

 

 

Flow란?


그럼 Flow에 대해 자세히 더 알아보겠습니다. Flow는 단일 값만 반환하는 Suspend 함수와 달리 여러 값을 순차적으로 내보낼 수 있는 유형입니다. 

 

Flow는 비동기식으로 계산할 수 있는 데이터 스트림의 개념입니다.

 

 

데이터 스트림에는 스트림에 추가되는 데이터를 생성하는 생산자(Producer), 스트림에 내보내는 각각의 값이나 스트림 자체를 수정하는 중개자(Intermediary), 그리고 이 값을 사용하는 소비자(Consumer)가 있습니다/

 

 

Flow 생산자(Producer)

Flow를 만들려면 `flow()` 함수를 사용하고 `emit()` 함수를 사용해 새로운 값을 수정으로 데이터 스트림에 내보냅니다.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

flow 빌더에서는 생산자가 다른 CoroutineContext의 값을 emit할 수 없습니다. 그러므로 새 코루틴을 만들거나 코드의 withContext 블록을 사용하여 다른 CoroutineContext에서 emit를 호출하지 마세요. 이런 경우 callbackFlow 같은 다른 흐름 빌더를 사용할 수 있습니다.

 

 

Flow 중개자(Intermediary) 

중개자는 `map` 이나 `filter` 같은 연산자를 사용해 데이터를 수정할 수 있습니다. 이는 Flow가 Collect 되기 전까지 실행되지 않는 작업 체인을 설정합니다.

val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }

 

 

Flow 소비자(Consumer)

마지막으로 데이터 스트림의 모든 값을 가져오기 위해서는 `collect()` 함수를 사용하면 됩니다. `collect()` 함수는 정지 함수이므로 코루틴 내에서 실행해야 합니다. 

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

 

 

Flow 타임아웃 처리

Flow 이전에 코루틴에 했던 것처럼 withTimeoutOrNull 메서드를 사용해 특정 시간 내에서 처리하도록 할 수 있습니다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    withTimeoutOrNull(250) {
        simple().collect(::println)
    }
    println("Done")
}

// 출력
// 1
// 2
// Done

 

 

asFlow()

시퀀스나 배열을 asFlow() 메서드를 사용해서 Flow로 변환할 수 있습니다.

val foo : Flow<Int> = (1..3).asFlow()

 

 

 

zip()

zip() 메서드를 사용하면 여러 개의 flow를 하나로 결합시킬 수 있습니다.

 

아래의 예시는 2개의 flow를 하나로 결합시킨 예시를 보여줍니다. 

fun main() = runBlocking {
    val numberFlow = flowOf(1, 2, 3).onEach { delay(10) }
    val letterFlow = flowOf("a", "b", "c", "d").onEach { delay(15) }
    numberFlow.zip(letterFlow) { i, s -> i.toString() + s }.collect {
        println(it)
    }
}

// 출력
// 1a
// 2b
// 3c

 

2개의 Flow는 원소의 개수가 다릅니다. 하나의 Flow가 값을 다 방출해서 더 이상 방출할 값이 남아있지 않다면, 다른 Flow에 상관없이 zip이 종료됩니다. 그렇기에 위의 예시에서는 letterFlow의 `d`가 출력되지 않습니다.

 

 

Flow 취소

flow는 별도로 취소되지 않습니다. flow를 취소하려면 flow의 상위 코루틴을 취소해야 합니다.

 

 

 

 

액터(Actor)


액터는 내부에 mailbox 채널에서 코루틴을 시작하고, mailbox 채널의 참조를 SendChannel로 반환합니다.

@ObsoleteCoroutinesApi
fun <E> CoroutineScope.actor(
    context: CoroutineContext = EmptyCoroutineContext, 
    capacity: Int = 0, 
    start: CoroutineStart = CoroutineStart.DEFAULT, 
    onCompletion: CompletionHandler? = null, 
    block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>

 

 

코루틴 스코프는 ActorScope inferface가 포함되어 있는데, 이는 CoroutineScopeReceiveChannel을 구현하는 인터페이스입니다.

@ObsoleteCoroutinesApi
interface ActorScope<E> : CoroutineScope, ReceiveChannel<E>

 

이를 통해서 receive를 호출할 수 있습니다. 

 

 

아래는 확장 함수로 액터를 생성하고 메시지 전송과 수신을 하는 예시입니다.

@ObsoleteCoroutinesApi
fun CoroutineScope.counterActor() = actor<Int> {
    var counter = 0 // actor state
    for (msg in channel) {
        counter++
        println("수신 받은 메시지: $msg")
    }
    println("result $counter")
}

@ObsoleteCoroutinesApi
fun main() = runBlocking {
    val counter = counterActor()

    val workA = async {
        repeat(2) { counter.send(it) }
    }

    val workB = async {
        repeat(3) { counter.send(it) }
    }

    workA.await()
    workB.await()
    counter.close()
    println()
}

수신 받은 메시지: 0
수신 받은 메시지: 1
수신 받은 메시지: 0
수신 받은 메시지: 1
수신 받은 메시지: 2

result 5
profile

Developing Myself Everyday

@배준형

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!