시퀀스(Sequences)와 Flow
시퀀스는 Iterable과 동일한 기능을 제공하지만 다른 접근 방식을 구현합니다.
바로 가능한 한 지연 실행(Lazy Evaluation) 한다는 것입니다. 시퀀스를 사용하면 실제 계산은 전체 처리 체인의 결과가 요청될 때만 발생합니다.
시퀀스는 각 요소에 대해 모든 처리 단계를 하나씩 순차적으로 수행합니다.
따라서 시퀀스를 사용하면 중간 단계의 결과를 만들지 않고 전체 컬렉션 처리 체인의 성능을 향상시킬 수 있습니다.
시퀀스와 중단 함수
아래와 같은 예시를 보겠습니다.
fun simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
simple().forEach { value -> println(value) }
}
위의 코드를 실행하면 `simple()` 함수는 이 함수를 실행하고 있는 메인 스레드를 정지시키게 됩니다.
이를 방지하는 방법은 함수에 `suspend` 키워드를 붙여 함수를 중단 함수로 바꾸는 것입니다.
suspend fun simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
다만 이것으로는 우리가 원하는 동작을 구현하지 못합니다. 우리는 메인 스레드를 블록하지 않고 시퀀스처럼 중간 단계의 결과를 만들고 싶지 않습니다. 이때 우리가 사용하는것이 바로 Flow입니다.
시퀀스와 Flow
Flow를 사용하면 각각의 연산을 출력할 때 메인 스레드를 정지하지 않고, 한번에 모든 값을 반환하지 않아도 됩니다. Flow는 시퀀스와 유사하게 Flow가 Collect 되기 전까지는 내부의 코드 블록이 실행되지 않습니다.
Flow를 사용하는 함수는 `suspend` 키워드를 붙이지 않아도 됩니다.
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
simple().collect { value -> println(value) }
}
Flow란?
그럼 Flow에 대해 자세히 더 알아보겠습니다. Flow는 단일 값만 반환하는 Suspend 함수와 달리 여러 값을 순차적으로 내보낼 수 있는 유형입니다.
Flow는 비동기식으로 계산할 수 있는 데이터 스트림의 개념입니다.
데이터 스트림에는 스트림에 추가되는 데이터를 생성하는 생산자(Producer), 스트림에 내보내는 각각의 값이나 스트림 자체를 수정하는 중개자(Intermediary), 그리고 이 값을 사용하는 소비자(Consumer)가 있습니다/
Flow 생산자(Producer)
Flow를 만들려면 `flow()` 함수를 사용하고 `emit()` 함수를 사용해 새로운 값을 수정으로 데이터 스트림에 내보냅니다.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow 빌더에서는 생산자가 다른 CoroutineContext의 값을 emit할 수 없습니다. 그러므로 새 코루틴을 만들거나 코드의 withContext 블록을 사용하여 다른 CoroutineContext에서 emit를 호출하지 마세요. 이런 경우 callbackFlow 같은 다른 흐름 빌더를 사용할 수 있습니다.
Flow 중개자(Intermediary)
중개자는 `map` 이나 `filter` 같은 연산자를 사용해 데이터를 수정할 수 있습니다. 이는 Flow가 Collect 되기 전까지 실행되지 않는 작업 체인을 설정합니다.
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
Flow 소비자(Consumer)
마지막으로 데이터 스트림의 모든 값을 가져오기 위해서는 `collect()` 함수를 사용하면 됩니다. `collect()` 함수는 정지 함수이므로 코루틴 내에서 실행해야 합니다.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Jetpack 라이브러리의 Flow
Flow는 많이 사용되는 Android 서드 파티 라이브러리인 다수의 Jetpack 라이브러리에 통합됩니다. Flow는 실시간 데이터 업데이트 및 무제한 데이터 스트림에 아주 적합합니다.
아래와 같이 Room을 사용해 데이터 베이스에 정보를 받을 때 Flow를 사용해 실시간으로 데이터를 수집할 수 있습니다.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Reference
'Android > Kotlin' 카테고리의 다른 글
코틀린 리플렉션과 애노테이션 그리고 함수 참조 (0) | 2023.11.05 |
---|---|
StateFlow, SharedFlow에 대해 알아보기 (0) | 2023.10.01 |
[Deep Dives into Coroutines on JVM] (2) - 코루틴의 Context (0) | 2023.09.26 |
[Deep Dives into Coroutines on JVM] (1) - 코루틴과 Callback (0) | 2023.09.13 |
Sealed Class와 Enum Class 뭘 사용해야 하나요? (0) | 2023.09.04 |