Developing Myself Everyday

ConsumeEach()


그럼 명시적인 이터레이션을 사용하지 않고 채널에 있는 내용을 수신할 수는 없을까?? 이를 위한 메서드가 바로 consumeEach()이다. 

 

channel.consumeEach {
    println("Receiving Num $it")
    delay(100)
}

 

위의 함수를 사용하면 송신측의 receive를 했을 때와 똑같은 결과를 얻을 수 있다.

 

 

또한 한 채널에서 생성한 데이터를 여러 개의 수신측이 가져갈 수도 있다. 이를 통해 데이터를 분류해서 처리할 수 있게 될 것이다. 

 

 

 

Produce()


sequence() 함수와 비슷한 동시성 스트림을 생성할 수 있는 produce()라는 코루틴 빌더가 있다. 이 빌더는 ProducerScope 영역을 도입한다.

 

fun main() {
    runBlocking {
        val channel = produce {
            for (n in 1..5) {
                delay(50)
                println("Sending Num $n")
                send(n)
            }
        }

        channel.consumeEach {
            println("Receiving Num $it")
            delay(100)
        }
    }
}

 

produce() 안에서 예외가 발생하면 예외를 저장했다가 해당 채널에 대해 수신을 가장 처음 호출한 코루틴 쪽에 예외를 던지게 된다.

 

 

Ticker()


티커 채널은 이 채널의 마지막 소비 이후 주어진 지연만큼 스트림을 만든다. 말이 어려운데 복잡한 파이프라인과 윈도우 설정 및 기타 시간 단축 처리를 수행하는 연산자를 만드는 데 유용하고 생각하면 된다.

 

 

fun main() = runBlocking {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // ticker 채널 생성
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // 최초 딜레이는 없다.

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // 이후 모든 요소에 100ms 딜레이가 있다.
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } // 100ms가 지났으므로 Unit이 출력됨.
    println("Next element is ready in 100 ms: $nextElement")

    // 150ms 딜레이
    println("Consumer pauses for 150ms")
    delay(150)
    // 다음 요소를 즉시 사용할 수 있음.
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // 'receive' 호출 사이의 일시 중지가 고려되어 다음 요소가 더 빨리 도착한다는 점에 유의.
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // 더 이상 요소가 필요없음을 알림.
}

 

 

 

 

profile

Developing Myself Everyday

@배준형

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!