Developing Myself Everyday
article thumbnail

채널이란?


 

채널은 2개의 코루틴 사이를 연결할 수 있는 파이프 같은 것이다. 임의의 데이터 스트림을 코루틴 사이에 공유할 수 있다. 만약 채널 내부의 용량이 다 찼는데 데이터를 채널에 보내려고 하면, 채널은 현재 코루틴을 일시 중단시키고 나중에 재개하게 된다. 이 부분이 자바의 동시성 API에서의 Blocking Queue와 채널의 가장 큰 차이이다.

 

 

 

채널의 타입


채널의 타입은 크게 4가지로 나눌 수 있다.

 

 1. Buffered

 2. Rendezvous (Unbuffered)

 3. Unlimited

 4. Conflated

 

이제부터 이 4가지의 타입을 어떻게 사용하는지 알아보겠다.

 

 1. Buffered

 

 

고정된 크기의 버퍼를 생성하는 타입이다. 아래의 예제를 보자

 

fun main() {
    runBlocking {
        val channel = Channel<Int>(3)

        launch {
            for (n in 1..5) {
                delay(50)
                println("Sending Num $n")
                channel.send(n)
            }
        }

        launch {
            for (i in 1..5) {
                delay(100)
                val n = channel.receive()
                println("Receiving Num $n")
            }
        }
    }
}

 

위의 예제에서는 send()를 사용해 n를 보내고 receive()를 사용해 n를 받고 있다는 것을 알 수 있다. 지금 Channal의 용량은 3개인 상황에서 송신쪽에서는 delay를 50을 주고 수신쪽에서는 delay를 100을 줬다. 이런 상황이라면 결과는 어떻게 될까?

 

Sending Num 1
Receiving Num 1
Sending Num 2
Sending Num 3
Receiving Num 2
Sending Num 4
Sending Num 5
Receiving Num 3
Receiving Num 4
Receiving Num 5

 

송신쪽에 수신쪽에 비해 뒤져쳐 채널 버퍼가 꽉 차서 이런 결과가 생긴다. 

 

 

 

2. Rendezvous (Unbuffered)

 

 

부제에서 알 수 있듯이 아무 버퍼가 없는 랑데부 채널 형태이다. 이 경우 send() 호출은 receive()가 호출되기 전까지 항상 일시 중단되며 receive() 호출은 send()를 호출할 때까지 일시중단되게 된다. 채널 생성이 용량을 지정하지 않은 Default 채널이 바로 랑데부 채널이다.

 

랑데부 채널은 딜레이 시간과 관계없이 안정적인 동작 순서를 볼 수 있다.

 

입력

fun main() {
    runBlocking {
    	// 용량을 정의하지 않음
        val channel = Channel<Int>()

        launch {
            for (n in 1..5) {
                delay(50)
                println("Sending Num $n")
                channel.send(n)
            }
        }

        launch {
            for (i in 1..5) {
                delay(100)
                val n = channel.receive()
                println("Receiving Num $n")
            }
        }
    }
}

 

결과

Sending Num 1
Receiving Num 1
Sending Num 2
Receiving Num 2
Sending Num 3
Receiving Num 3
Sending Num 4
Receiving Num 4
Sending Num 5
Receiving Num 5

 

 

 

3. Unlimited

 

 

이 채널은 버퍼의 용량 제한이 없고 버퍼의 용량은 필요에 따라 증가한다. 이 경우 send()시에 일시 중단되는 일이 없지만 receive()시 버퍼가 비어있다면 일시 중단된다.

 

극단적인 예를 위해 송신측의 delay를 10으로 줘 보겠다.

 

입력

fun main() {
    runBlocking {
        val channel = Channel<Int>(Channel.UNLIMITED)

        launch {
            for (n in 1..5) {
                delay(10)
                println("Sending Num $n")
                channel.send(n)
            }
        }

        launch {
            for (i in 1..5) {
                delay(100)
                val n = channel.receive()
                println("Receiving Num $n")
            }
        }
    }
}

 

결과

Sending Num 1
Sending Num 2
Sending Num 3
Sending Num 4
Sending Num 5
Receiving Num 1
Receiving Num 2
Receiving Num 3
Receiving Num 4
Receiving Num 5

 

 

 

4. Conflated

 

 

이름 그대로 융합되는, 즉 송신된 값이 합쳐지는 채널이다. 이렇게 하면 크기가 1인 고정 버퍼가 있는 채널이 생성되고 send()로 보낸 원소가 누군가에 의해 수신되기 전에 다른 값이 send() 된다면 기존의 값을 덮어버린다는 것이다. 이렇게 되면 기존의 원소 값은 사라지게 된다. 이 채널의 경우 send() 메서드는 일시중단되지 않는다.

 

아래의 예를 보자

 

 

입력

fun main() {
    runBlocking {
        val channel = Channel<Int>(Channel.CONFLATED)

        launch {
            for (n in 1..5) {
                delay(50)
                println("Sending Num $n")
                channel.send(n)
            }
        }

        launch {
            for (i in 1..5) {
                delay(100)
                val n = channel.receive()
                println("Receiving Num $n")
            }
        }
    }
}

 

결과

Sending Num 1
Receiving Num 1
Sending Num 2
Sending Num 3
Receiving Num 3
Sending Num 4
Sending Num 5
Receiving Num 5

 

결과를 보게 되면 수신 값이 송신 값과 다르다는 것을 알 수 있다. 수신측과 송신측의 속도가 차이나기 때문에 그 속도 차이의 값 만큼 데이터가 유실된 것이다.

 

 

 

Reference

 

Kotlin: Diving in to Coroutines and Channels

An illustrated guide to coroutines and channels in Kotlin

proandroiddev.com

profile

Developing Myself Everyday

@배준형

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!