03
29

https://kotlinlang.org/docs/flow.html#intermediate-flow-operators

 

Asynchronous Flow | Kotlin

 

kotlinlang.org

์œ„ ๋งํฌ๋ฅผ ๊ณต๋ถ€ํ•˜๋ฉฐ ๋‚จ๊ธฐ๋Š” ๊ธฐ๋ก์ด๋‹ค. Flow ์ค‘๊ฐ„์—ฐ์‚ฐ์ž๋ถ€ํ„ฐ ํ•˜๊ฒ ๋‹ค. ์ด์ „์˜ ๊ณต๋ถ€๊ธฐ๋ก์€ ์ž‘์„ฑ์„ ๊นœ๋ฐ•ํ–ˆ๋Š”๋ฐ ์ถ”ํ›„ ๊ธฐ๋กํ•ด๋‘˜ ์˜ˆ์ •์ด๋‹ค.

Intermediate flow operators

ํ”Œ๋กœ์šฐ ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž๋‹ค. ํ”Œ๋กœ์šฐ ์ž์ฒด๋ฅผ ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž๋กœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ ์—…์ŠคํŠธ๋ฆผ ํ”Œ๋กœ์šฐ์— ์ ์šฉ์‹œํ‚จ๋‹ค์Œ ๋‹ค์šด์ŠคํŠธ๋ฆผ ํ”Œ๋กœ์šฐ๋ฅผ ๋ฐ˜ํ™˜ ํ•œ๋‹ค. ์ด๋•Œ ๋™์ž‘์€ ์ฝœ๋“œ ํ”Œ๋กœ์šฐ๋‹ค. 

๊ธฐ๋ณธ์—ฐ์‚ฐ์ž๋กœ๋Š” .map, .filter๋“ฑ์ด ์žˆ๋Š”๋ฐ ์‹œํ€€์Šค์™€ ์ฐจ์ด์ ์€ suspend ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.

inline fun <T, R> Flow<T>.map(crossinline transform: suspend (T) -> R): Flow<R>
suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

map์—์„œ 1..3์ธ ์ฝœ๋“œ ํ”Œ๋กœ์šฐ๋ฅผ ํ•˜๋‚˜์”ฉ ๋ฐ›์•„์„œ  performRequest์— ๋„ฃ๋Š”๋‹ค. ๊ทธ๋Ÿฌ๋ฉด collect๋Š” return๋œ response $request๋ฅผ ๊ตฌ๋…ํ•ด์„œ ๊ฐ€์ ธ๊ฐ„๋‹ค.

Transform operator

๋ณ€ํ™˜ ์—ฐ์‚ฐ์ž๋Š” map๊ณผ filter์™€ ๊ฐ™์ด ์ƒ๊ฒผ๋‹ค. transform ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜๋ฉฐ ์ž„์˜์˜ ๊ฐ’์„ ์ž„์˜์˜ ์‹œ๊ฐ„์— ๋ฐฉ์ถœํ•  ์ˆ˜์žˆ๋‹ค.

(1..3).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request") 
            emit(performRequest(request)) 
        }
        .collect { response -> println(response) }

์ด ๊ฒฝ์šฐ ํ•œ ํ”Œ๋กœ์šฐ ๋‹น Making~ ์„ ๋จผ์ € ์ถœ๋ ฅํ•˜๊ณ  performRequest๋กœ ๋ฐ›์€ ๊ฐ’์„ ๋’ค์ด์–ด ์ถœ๋ ฅํ•œ๋‹ค.

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request ->
            println("Making request $request") 
            println(performRequest(request)) 
        }
        .collect {  response -> println(response)}
}

map๊ณผ ๋น„์Šทํ•˜๋‹ค๊ณ  ํ•˜๋‹ˆ ์ด๋ ‡๊ฒŒ ๋ฐ”๊ฟ”๋ดค๋‹ค. ์ถœ๋ ฅ๊ฐ’์„ ์˜ˆ์ƒํ•ด๋ดค์„๋•Œ map์—์„œ ๋ฐ˜ํ™˜๋˜๋Š” ๊ฐ’์ด ์—†์Œ์„ ์•Œ ์ˆ˜ ์žˆ๋Š”๋ฐ, ๋”ฐ๋ผ์„œ ์•„๋ž˜์™€ ๊ฐ™์€ ์ถœ๋ ฅ๋ฌธ์ด ์ด์–ด์ง„๋‹ค. ์ด์— ๋ฐ˜์— emit์œผ๋กœ ๋„˜๊ฒจ์ฃผ๋Š” transform์€ emit๋œ ๊ฐ’์„ ์ฐจ๋ก€๋Œ€๋กœ response์— ๋„ฃ์–ด์ฃผ๋‹ˆ๊นŒ ๊ฐ’์ด ๋ฐ˜ํ™˜๋จ์„ ์•Œ์ˆ˜์žˆ๋‹ค.

Making request 1
response 1
kotlin.Unit
Making request 2
response 2
kotlin.Unit
Making request 3
response 3
kotlin.Unit

collect์˜ respose์— ๋“ค์–ด๊ฐˆ println์€ ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— kotlin.Unit์ด ์ถœ๋ ฅ๋œ๋‹ค.

Size-limiting operators

cancel์„ ์ผ์œผํ‚ค๋Š” takeํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•ด ๋ฐ›์•„์˜ค๋Š” ๊ฐ’ ๊ฐœ์ˆ˜๋ฅผ ์กฐ์ ˆํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค.

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}

์ฝ”๋ฃจํ‹ด ์•ˆ์—์„œ cancel์„ ์ผ์œผํ‚ค๊ธฐ ๋•Œ๋ฌธ์— ๊ผญ try-finally๋กœ ์˜ˆ์™ธ์ฒ˜๋ฆฌ๋ฅผ ํ•ด์ค˜์•ผํ•œ๋‹ค. take(2)๋Š” emit(2)๊นŒ์ง€๋งŒ ๋ฐ›๊ณ  ๋Š์–ด๋ฒ„๋ฆฐ๋‹ค. ๋ฐ›์€ ๊ฐ’์„ collectํ•ด์„œ ๊ฐ€์ ธ์˜ฌ์ˆ˜์žˆ๋‹ค๋Š” ์ ์ด ๊ทธ๋ƒฅ cancel๊ณผ ๋‹ค๋ฅด๋‹ค๊ณ  ํ•  ์ˆ˜ ์žˆ๊ฒ ๋‹ค.

Terminal flow operators(์ข…๋‹จ ์—ฐ์‚ฐ์ž)

ํ”Œ๋กœ์šฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๋Š” suspend ํ•จ์ˆ˜๋“ค์ด๋‹ค. collect๊ฐ€ ๋Œ€ํ‘œ์ ์ด๊ณ  toList, toSet๊ณผ ๊ฐ™์ด ๋ฐ”๋กœ ์ปฌ๋ Œ์…˜์œผ๋กœ ๋ฐ”๊ฟ€ ์ˆ˜๋„ ์žˆ๋‹ค.

val sum = (1..3).asFlow()
        .map { it * it } // squares of numbers from 1 to 5                           
        .reduce { a, b -> a + b } // sum them (terminal operator)
    println(sum)

reduce, fold๋„ ํ•ด๋‹นํ•œ๋‹ค. ์ฝ”๋“œ๋ฅผ ๋ณด๋ฉด a์— 1์ด ๋“ค์–ด๊ฐ€๊ณ  b์— 2*2๊ฐ€ ๋“ค์–ด๊ฐ„๋‹ค. ํ•ฉ์นœ๊ฒŒ a๋กœ ๋‹ค์‹œ ๋“ค์–ด๊ฐ€๊ณ  b์— 3*3์ด ๋“ค์–ด์˜ค๋Š” ๋ˆ„์ ํ•ฉ ์ปฌ๋ ‰์…˜ ๋‚ด์žฅํ•จ์ˆ˜์ด๋‹ค.

Flows are sequential

ํ”Œ๋กœ์šฐ์˜ ์ˆ˜์ง‘์€ ๊ธฐ๋ณธ์ ์œผ๋กœ ์ˆœ์ฐจ์ ์œผ๋กœ ์ˆ˜ํ–‰๋œ๋‹ค. ๊ฐ ์—ฐ์‚ฐ์ž๋งˆ๋‹ค emit๋œ ๊ฐ’๋“ค์€ ์—…์ŠคํŠธ๋ฆผ์˜ ๋ชจ๋“  ์ค‘๊ฐ„์—ฐ์‚ฐ์ž์— ์˜ํ•ด ์ฒ˜๋ฆฌ๋˜๊ณ  ์ตœ์ข…์ ์œผ๋กœ ๋‹ค์šด์ŠคํŠธ๋ฆผ์— ์ „๋‹ฌ๋˜์–ด ์ข…๋‹จ์—ฐ์‚ฐ์ž์—์„œ ์ˆ˜์ง‘ํ•˜๋Š”๊ฒŒ ๊ณผ์ •์ด๋‹ค.

Flow Context

ํ”Œ๋กœ์šฐ ์ˆ˜์ง‘์€ ํ•ญ์ƒ ์ˆ˜์ง‘์„ ํ˜ธ์ถœํ•œ ์ฝ”๋ฃจํ‹ด์˜ context์—์„œ ์ด๋ฃจ์–ด์ง„๋‹ค. -> context ๋ณด์กด์ด๋ผ๊ณ  ๋ถ€๋ฅธ๋‹ค.

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
    val job = launch{
        simple().collect { value -> log("Collected $value") } 
    }
}

job์œผ๋กœ ๋ฌถ์€ ์ฝ”๋ฃจํ‹ด๊ณผ runBlocking ๋ฐ”๋กœ ๋ฐ‘์—์žˆ๋Š” ์ฝ”๋ฃจํ‹ด์—์„œ ๊ฐ๊ฐ ์‹คํ–‰ํ•œ ํ”Œ๋กœ์šฐ๋Š” ๊ฐ๊ฐ์˜ ์ฝ”๋ฃจํ‹ด ์ปจํ…์ŠคํŠธ์—์„œ ๋Œ์•„๊ฐ„๋‹ค. simple()๋กœ ๋งŒ๋“  ํ”Œ๋กœ์šฐ ํ•จ์ˆ˜๊ฐ€ ์ˆ˜์ง‘์„ ํ˜ธ์ถœํ•œ ์ฝ”๋ฃจํ‹ด ์ปจํ…์ŠคํŠธ๋ฅผ ๋”ฐ๋ฅธ๋‹ค๋Š” ๊ฑธ ๊ธฐ์–ตํ•˜๋ฉด ๋œ๋‹ค.

[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
[main @coroutine#2] Started simple flow
[main @coroutine#2] Collected 1
[main @coroutine#2] Collected 2
[main @coroutine#2] Collected 3

๋ฌธ์ œ๋Š” ์žฅ๊ธฐ์ ์ธ ์‚ฌ์šฉ์ด ํ•„์š”ํ•œ ์ž‘์—…์€ ๋”ฐ๋กœ ๋ณ„๋„์˜ ์Šค๋ ˆ๋“œ์—์„œ ๋Œ๋ ค์•ผ๋˜๋Š”๋ฐ ์ด๋•Œ ์ฝ”๋ฃจํ‹ด ์ปจํ…์ŠคํŠธ๋ฅผ ์ „ํ™˜ํ•ด์„œ ์‚ฌ์šฉํ•˜๋Š” withContext๋ฅผ flow๋นŒ๋” ๋‚ด์—์„œ ํ•˜๋ฉด ์ปจํ…์ŠคํŠธ ๋ณด์กด์„ ์ง€ํ‚ค์ง€ ๋ชปํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค. ์ด๋Ÿด ๋•Œ ํ•„์š”ํ•œ ์—ฐ์‚ฐ์ž๊ฐ€ flowOn์—ฐ์‚ฐ์ž์ด๋‹ค.

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    foo().collect { value ->
        log("Collected $value") 
    } 
}

์ด๋ ‡๊ฒŒ flowOn์„ ์‚ฌ์šฉํ•ด์ฃผ๋ฉด flow๋นŒ๋”๋ถ€๋ถ„์€ DefaultDispatcher์—์„œ ๋Œ๊ณ  collect๋Š” ๋ฉ”์ธ ์Šค๋ ˆ๋“œ ์ฝ”๋ฃจํ‹ด์—์„œ ๋Œ๊ฒŒ๋œ๋‹ค. ์ปจํ…์ŠคํŠธ ์ „ํ™˜์ด ์•ˆ์ „ํ•˜๊ฒŒ ์ด๋ฃจ์–ด์กŒ๋‹ค. ์ฆ‰ flowOn์˜ ๋ชฉ์ ์€ ๋ฐฉ์ถœ-์ˆ˜์ง‘์„ ์ˆœ์„œ๋Œ€๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ํŠน์„ฑ์ธ ์ˆœ์ฐจ์„ฑ์„ ๋ชป์ง€ํ‚ค๋Š” ๋Œ€์‹  ์ปจํ…์ŠคํŠธ ๋ณด์กด์„ ์ง€ํ‚ค๋ฉด์„œ ์ฝ”๋ฃจํ‹ด ๋””์ŠคํŒจ์ฒ˜๋ฅผ ๋ฐ”๊พธ๋Š” ๋ฐ ์žˆ๋‹ค.

Buffering

delay๋Œ€์‹ ์— buffer์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. 

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .buffer() // buffer emissions, don't wait
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

์—ฌ๊ธฐ์„œ ๋ฒ„ํผ๋ฅผ ๋นผ๋ฒ„๋ฆฌ๋ฉด ์‹œ๊ฐ„์ด ๊ฐ ์ˆ˜์ง‘๋‹น 400์”ฉ ์ด 1200ms์ด ๊ฑธ๋ฆฌ๋Š”๋ฐ ๋ฒ„ํผ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์ฒ˜์Œ 100๋งŒ ๊ฑธ๋ ค์„œ 100+300+300+300์ธ 1000ms์ด ๋Œ€์ถฉ ๊ฑธ๋ฆฐ๋‹ค. buffer์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์ˆ˜์ง‘๋•Œ ๋ฐœ์ƒํ•˜๋Š” delay(300) ๋•Œ ๋ฏธ๋ฆฌ ๋ฐฉ์ถœ์‹œ delay๋ฅผ ์ฒ˜๋ฆฌํ•ด์„œ ๋” ํšจ์œจ์ ์ธ ์‹คํ–‰์ด ์ผ์–ด๋‚œ๋‹ค. ๊ทธ๋ ‡๋‹ค๋ฉด ์ˆ˜์ง‘์ด ๋” ๋น ๋ฅด๋ฉด ์–ด๋–ป๊ฒŒ ๋ ๊นŒ?

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .buffer() // buffer emissions, don't wait
            .collect { value -> 
                delay(50) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

์ฒ˜์Œ 100+50+100+100 ์— ์˜ค์ฐจ๋ฒ”์œ„ ์•ฝ 50ms์ด๋ฏ€๋กœ 350~400ms์ด ๋‚˜์˜จ๋‹ค. ์ˆ˜์ง‘๋•Œ๊ฐ€ ๋” ๋น ๋ฅด๋‹ค๋ฉด ๋ฒ„ํผ๋„ ํฌ๊ฒŒ ํšจ๊ณผ๊ฐ€ ์—†๋‹ค๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

Conflation & Processing the latest value

conflate์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์ˆ˜์ง‘์ด ๋„ˆ๋ฌด ๋Š๋ฆด๊ฒฝ์šฐ ๊ฐ’์„ ํ•˜๋‚˜ํ•˜๋‚˜ ๋‹ค ๋ฐฉ์ถœํ•˜์ง€์•Š๊ณ  ์ค‘๊ฐ„๊ฐ’์„ ๊ฑด๋„ˆ๋›ฐ๊ณ  ์ œ์ผ ์ตœ์‹ ๊ฐ’์„ ๊ฐ€์ ธ์˜จ๋‹ค. 

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .conflate() // conflate emissions, don't process each one
            .collect { value -> 
                delay(190) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

2๊นŒ์ง€ ์ˆ˜์ง‘๋˜๋Š”๋ฐ 200์ด ๊ฑธ๋ฆฌ๋Š”๋ฐ, 1์„ ์ˆ˜์ง‘ํ•˜๋ฉด์„œ 200์„ ๋„˜์–ด๋ฒ„๋ฆฌ๋ฉด conflate๊ฐ€ ๋„˜๊ฒจ๋ฒ„๋ฆฐ๋‹ค. ํ•˜์ง€๋งŒ 190๊นŒ์ง€ ์ˆ˜์ง‘ํ•œ๋‹ค๋ฉด 2๋„ ๊ฐ™์ด ์ˆ˜์ง‘ํ•˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

 

์ตœ์‹ ๊ฐ’ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ• ์ค‘์— conflation(๋ณ‘ํ•ฉ)๋„ ์žˆ๊ณ , ์—ฐ์‚ฐ์ž๋’ค์— Latest๊ฐ€ ๋ถ™์€ ์—ฐ์‚ฐ์ž๊ฐ€ ์กด์žฌํ•œ๋‹ค. ์ด ์—ฐ์‚ฐ์ž๋“ค์€ ์ƒˆ๊ฐ’์ด ๋ฐฉ์ถœ๋˜๋ฉด ์ˆ˜์ง‘ํ•˜๋˜ ์ฝ”๋“œ๋ธ”๋ก์„ ์ทจ์†Œํ•˜๊ณ  ์ƒˆ๋กœ ๋ฐฉ์ถœ๋œ ๊ฐ’์„ ๊ฐ€์ ธ์˜จ๋‹ค.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .collectLatest { value -> // cancel & restart on the latest value
                println("Collecting $value") 
                delay(300) // pretend we are processing it for 300 ms
                println("Done $value") 
            } 
    }   
    println("Collected in $time ms")
}

์„ค๋ช…์ด ๊ฒน์น˜๋‹ˆ ๊ทธ๋ƒฅ ๋„˜์–ด๊ฐ€๋„๋ก ํ•˜๊ฒ ๋‹ค.

Composing multiple flows

๋‹ค์ค‘ ํ”Œ๋กœ์šฐ๋ฅผ ํ™œ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•๋“ค์ด๋‹ค.

Zip

val nums = (1..3).asFlow() // numbers 1..3
    val strs = flowOf("one", "two", "three") // strings 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { println(it) } // collect and print

nums ํ”Œ๋กœ์šฐ ๋ฐฉ์ถœ๊ฐ’๊ณผ strsํ”Œ๋กœ์šฐ ๋ฐฉ์ถœ๊ฐ’์„ zip์œผ๋กœ ๋ฌถ์–ด์„œ return์œผ๋กœ String์„ ๋ฐ˜ํ™˜ํ•˜๋Š” ์ฝ”๋“œ๋‹ค.  ์‹œํ€€์Šค์—์„œ ์“ฐ๋Š” zip๊ณผ ๊ฑฐ์˜ ๋™์ผํ•˜๋‹ค.

Combine

์ด๋ฏธ ๋ฐฉ์ถœ๋œ ๊ฐ’์œผ๋กœ ๋˜ ์ถ”๊ฐ€์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•ด์•ผํ•˜๋Š” ๊ฒฝ์šฐ combine์—ฐ์‚ฐ์ž๋ฅผ ์ด์šฉํ•œ๋‹ค.

fun main() = runBlocking<Unit> { 

    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
    val startTime = System.currentTimeMillis() // remember the start time 
    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

์ด๋Ÿฌ๋ฉด ์ˆœ์„œ์„ฑ์„ ์œ ์ง€ํ•œ์ฑ„๋กœ ๊ฐ€๋Š”๋ฐ strs๊ฐ€ ๊ฐ ๋ฐฉ์ถœ์ด ๋๋‚˜๋Š” ์‹œ์ , nums ๊ฐ ๋ฐฉ์ถœ์ด ๋๋‚˜๋Š” ์‹œ์ ์ด ๊ณ„์† ๋งž๋ฌผ๋ ค์„œ ์•„๋ž˜ ์ถœ๋ ฅ๊ณผ ๊ฐ™์ด ๋‚˜์˜จ๋‹ค. ์ฒ˜์Œ์—๋Š” strs๊นŒ์ง€ ๊ธฐ๋‹ค๋ ค์•ผํ•˜๋‹ˆ 400ms, nums๋Š” ๋‹ค์Œ ๋ฐฉ์ถœ์ด ์‹œ์ž‘๋์œผ๋‹ˆ nums๋Š” ๊ฐฑ์‹ ๋˜๊ณ  str๋Š” ์ข€ ๋” ๊ฑธ๋ฆฌ๊ณ  ํ•˜๋Š” ๊ณผ์ •์ด ์ง€์†๋œ๋‹ค.

1 -> one at 444 ms from start
2 -> one at 648 ms from start
2 -> two at 846 ms from start
3 -> two at 949 ms from start
3 -> three at 1246 ms from start

์ฆ‰ ๋ฐฉ์ถœ์ด ์ผ์–ด๋‚  ๋•Œ๋งˆ๋‹ค ๋‹ค๋ฅธ ํ”Œ๋กœ์šฐ์˜ ์ตœ์‹ ๊ฐ’์„ ๊ฐ€์ง€๊ณ  ๋ณ‘ํ•ฉํ•ด์„œ ์ถœ๋ ฅํ•˜๋Š”๊ฒŒ combine ์—ฐ์‚ฐ์ž๋‹ค.

Flattening Flows

ํ”Œ๋กœ์šฐ์˜ ๋ฐฉ์ถœ๊ฐ’์„ ๋˜ ํ”Œ๋กœ์šฐ์— ๋„ฃ์„ ๋•Œ ๋‚˜์ค‘์˜ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด์„œ ํ‰ํƒ„ํ™”๊ฐ€ ํ•„์š”ํ•˜๋‹ค. flatten, flatMap ์—ฐ์‚ฐ์ž๊ฐ€ ์ด๋•Œ ์‚ฌ์šฉ๋œ๋‹ค. 

flatMapConcat๋Š” ๊ทธ๋ƒฅ ์ด์ค‘ ํ”Œ๋กœ์šฐ๊ฐ€ ์ˆœ์ฐจ์ ์œผ๋กœ ๋๋‚˜๋Š”๊ฑธ ๊ธฐ๋‹ค๋ ค์ค€๋‹ค.

flatMapMerge๋Š” ๊ฐ ํ”Œ๋กœ์šฐ๋ฅผ ๋™์‹œ์— ๋Œ๋ ค๋ฒ„๋ฆฐ๋‹ค(๋™์‹œ ์ˆ˜์ง‘ํ•ด์„œ ๋‹จ์ผ ํ”Œ๋กœ์šฐ๋กœ ๊ฐ€๋Šฅํ•œ ๋นจ๋ฆฌ ๋ฐฉ์ถœ์‹œํ‚จ๋‹ค). 

flatMapLatest๋Š” collectLatest์™€ ๋น„์Šทํ•œ๋ฐ, ์ˆ˜์ง‘์ด ๋Š๋ฆฌ๋ฉด ๊ฑด๋„ˆ ๋›ฐ์–ด๋ฒ„๋ฆฐ๋‹ค.

 

Flow exceptions

ํ”Œ๋กœ์šฐ๋ฅผ take์™€ ๊ฐ™์€ ๊ฑธ๋กœ cancel์ด ๋‚˜๊ฒŒํ•˜๋ฉด emit์—์„œ ์˜ˆ์™ธ๋ฅผ ๋˜์ง€๋Š”๋ฐ ์ด ์˜ˆ์™ธ๋Š” ์ˆ˜์ง‘์ชฝ์—์„œ ๋ฐ›๋Š”๋‹ค. ์ด ๋ง์€ ์˜ˆ์™ธ์ฒ˜๋ฆฌ๋ฅผ ์ˆ˜์ง‘์ชฝ์—์„œ ํ•ด์ค˜์•ผํ•œ๋‹ค๋Š” ๋œป์ด๋‹ค.

    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }

value๊ฐ€ 2๊ฐ€๋˜๋ฉด catch๋กœ ๋„˜์–ด๊ฐ€๋Š” ์ฝ”๋“œ๋‹ค. ์ด ๊ฒฝ์šฐ ์ค‘๊ฐ„/์ข…๋‹จ/๋ฐฉ์ถœ์—ฐ์‚ฐ์ž ์ค‘ ์–ด๋Š๋ถ€๋ถ„์—์„œ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ–ˆ๋Š”์ง€ ์•Œ ์ˆ˜ ์—†๋Š”๋ฐ, flow์— map์œผ๋กœ ์˜ˆ์™ธ๋ฅผ ๋„˜๊ฒจ์ฃผ๋ฉด ์ข…๋‹จ์—ฐ์‚ฐ์ž์—์„œ ๋ฐ›๋Š”๊ฑธ ํ™•์‹คํ•˜๊ฒŒ ์ธ์ง€ํ•  ์ˆ˜ ์žˆ๋‹ค. 

fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

Exception transparency ์˜ˆ์™ธ ํˆฌ๋ช…์„ฑ

ํ”Œ๋กœ์šฐ๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ๋Š” ์˜ˆ์™ธ ์ฒ˜๋ฆฌ๋ฅผ ์ž˜ํ•ด์•ผํ•œ๋‹ค. flow{}๋นŒ๋” ์•ˆ์—์„œ try~catch๋กœ ์˜ˆ์™ธ์ฒ˜๋ฆฌํ•ด์„œ ์˜ˆ์™ธ ์—†๋Š” ๊ฐ’๋งŒ ๋ฐฉ์ถœํ•˜๋Š” ๊ฒƒ์€ ์˜ˆ์™ธ ํˆฌ๋ช…์„ฑ์„ ์œ„๋ฐ˜ํ•˜๋Š” ๊ฒƒ์ด๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ๋‹ค. 

 

emit์ด catch์—ฐ์‚ฐ์œผ๋กœ ์ด๋Ÿฐ ์˜ˆ์™ธ ํˆฌ๋ช…์„ฑ์„ ์ง€ํ‚ฌ ์ˆ˜ ์žˆ๋‹ค. catch์—์„œ emit์„ ์‚ฌ์šฉํ•ด ๋ฐฉ์ถœํ•˜๊ฑฐ๋‚˜, throw๋ฅผ ๋˜ ์‚ฌ์šฉํ•ด์„œ ๋‹ค์‹œ ์˜ˆ์™ธ๋ฅผ ๋˜์งˆ ์ˆ˜ ์žˆ๋‹ค.

fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> emit("Caught $e") } // emit on exception
        .collect { value -> println(value) }
}

์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๋Š” 9๋ฒˆ ๋ผ์ธ์—์„œ ๋˜์ ธ์ง„ e๋ฅผ catch๊ฐ€ ๋ฐ›์•„์„œ emitํ•˜๊ณ  ์ด๊ฑธ collect๊ฐ€ ๋ฐ›๋Š”๋‹ค. try~catch๋ฅผ ์“ฐ์ง€์•Š์•„๋„ ๋™์ผํ•œ ๊ฒฐ๊ณผ๋ฅผ ๋ณด์—ฌ์คŒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

 

catch ์—ฐ์‚ฐ์ž๋Š” ์—…์ŠคํŠธ๋ฆผ์—์„œ ๋ฐœ์ƒํ•˜๋Š” ์˜ˆ์™ธ๋งŒ ์žก์•„์ค€๋‹ค. ์œ„ ์ฝ”๋“œ ๋ธ”๋Ÿญ์—์„œ check๊ฐ€ mainํ•จ์ˆ˜ ์•ˆ collect์— ์œ„์น˜ํ–ˆ๋‹ค๋ฉด ์˜ˆ์™ธ์ฒ˜๋ฆฌ๋ฅผ ํ•˜์ง€๋ชปํ•˜๊ณ  ์˜ค๋ฅ˜๋ฅผ ๋‚ด๋ฟœ์—ˆ์„ ๊ฒƒ์ด๋‹ค.

 

Flow completion

์ •์ƒ์ด๋˜ ์˜ˆ์™ธ๋˜ ํ”Œ๋กœ์šฐ๊ฐ€ ์ข…๋ฃŒ๋˜๋ฉด ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹์ด 2๊ฐ€์ง€๊ฐ€ ์žˆ๋‹ค. imperative์™€ declarative๋ฐฉ์‹์ด๋‹ค.

imperativeํ•œ ๋ฐฉ์‹์€ ๋ฌด์กฐ๊ฑด try~finally๋ฅผ ๊ฑฐ์น˜๊ฒŒ ๋งŒ๋“ค์–ด์„œ ์ˆ˜์ง‘์ด ์ข…๋ฃŒ๋˜๊ณ  ์‹คํ–‰ํ•  ์ฝ”๋“œ๋ฅผ ์ •์˜ํ•ด๋‘๋Š” ๊ฒƒ์ด๋‹ค.

try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }

declarativeํ•œ ๋ฐฉ์‹์œผ๋กœ๋Š” ํ”Œ๋กœ์šฐ๋นŒ๋”์— onCompletion์„ ๋ถ™์—ฌ์„œ ์ข…๋ฃŒ์‹œ ์ˆ˜ํ–‰ํ•  ํ–‰๋™์„ ์„ ์–ธํ•ด๋‘๋Š” ๊ฒƒ์ด๋‹ค.

simple()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }

๊ทธ๋Ÿฌ๋‚˜ onCompletion์€ ์˜ˆ์™ธ๋ฅผ ์žก์ง€ ๋ชปํ•˜๊ธฐ๋•Œ๋ฌธ์— catch์™€ ๊ฐ™์ด ์‚ฌ์šฉํ•ด์ค˜์•ผํ•œ๋‹ค. ๋˜ ํ•œ๊ฐ€์ง€ catch์™€ onCompletion์˜ ์ฐจ์ด๋Š” onCompletion์ด ์˜ˆ์™ธ๋ฅผ ์ข…๋ฃŒ์‹œ์ ์œผ๋กœ ํ•ด์„œ ๋ฐœ๊ฒฌํ•  ์ˆ˜๋Š” ์žˆ์–ด๋„ ํ”Œ๋กœ์šฐ ์ทจ์†Œ๋‚˜ ์‹คํŒจ๋ฅผ ๊ฐ™์ด ํ˜ธ์ถœํ•˜๋Š” ๊ฒŒ ์•„๋‹ˆ๊ธฐ ๋•Œ๋ฌธ์— ์‹คํ–‰ ์ค‘ ์˜ค๋ฅ˜๊ฐ€ ๋‚˜๋ฒ„๋ฆฐ๋‹ค.

 

๋‹ค์Œ ์ฝ”๋“œ๊ฐ€ ๊ทธ ์˜ˆ์‹œ์ธ๋ฐ, ์˜ˆ์™ธ๊ฒฐ๊ณผ๋ฅผ ๋ฐ›์•„์„œ ์ถœ๋ ฅ์€ ํ•˜์ง€๋งŒ ๋‹ค์šด์ŠคํŠธ๋ฆผ์—์„œ ๋ฐœ์ƒํ•˜๋Š” ์˜ˆ์™ธ ํƒ“์— ์‹คํ–‰์˜ค๋ฅ˜๊ฐ€ ๋‚œ๋‹ค.

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}

Imperative(๋ช…๋ นํ˜•)์™€ declarative(์„ ์–ธํ˜•)๊ฐ€ ๊ฐ๊ฐ ํŠน์ง•์ด ์žˆ๋Š”๋ฐ ํ•„์š”์— ๋”ฐ๋ผ ๋งž์ถฐ์‚ฌ์šฉํ•˜๋Š”๊ฒŒ ์ข‹๋‹ค.

Launching flow

onEach๋Š” addEventListener์™€ ๊ฐ™์€ ํ–‰๋™์„ ๋ณด์—ฌ์ค€๋‹ค. onEach๋Š” ์ค‘๊ฐ„์—ฐ์‚ฐ์ž์ด๊ธฐ ๋•Œ๋ฌธ์— ๊ฐ’์„ ๋ฐ›์•„ ๋ฌด์กฐ๊ฑด ์ˆ˜์ง‘ํ•ด์ค˜์•ผํ•œ๋‹ค.

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}

onEach๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ ํ•ด๋‹น ํ”Œ๋กœ์šฐ๋ฅผ ๋ถ„๋ฆฌ๋œ ์ฝ”๋ฃจํ‹ด์œผ๋กœ ๋ณด๋‚ด๋ฒ„๋ฆด ์ˆ˜ ์žˆ๋Š” ์ข…๋‹จ์—ฐ์‚ฐ์ž๊ฐ€ ์žˆ๋Š”๋ฐ launchin์ด ๊ทธ๊ฒƒ์ด๋‹ค.

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}

launchIn์„ ๋ถ™์—ฌ ๋ฒ„๋ ค์„œ  events ํ”Œ๋กœ์šฐ๋Š” ๋ถ„๋ฆฌ๋œ ์ฝ”๋ฃจํ‹ด์ด ๋˜์–ด, ์•„๋ž˜ ๋ผ์ธ์˜ println์ด ๋จผ์ € ์‹คํ–‰๋œ๋‹ค. addEventListener์™€ ๋” ์œ ์‚ฌํ•œ๊ฑด launchIn์ด๋‹ค. ์ˆ˜์ง‘ํ•ด๋ฒ„๋ฆฌ๊ธฐ ๋•Œ๋ฌธ์— eventํšŒ์ˆ˜๋„ ๋œ๋‹ค๊ณ  ๋ณผ ์ˆ˜ ์žˆ์–ด removeEventListener ์—ญํ• ๋„ ๊ฒธํ•œ๋‹ค.

 

launchIn์€ ์ฝ”๋ฃจํ‹ด์„ ์ƒ์„ฑํ•˜๊ธฐ ๋•Œ๋ฌธ์— Job๊ฐ์ฒด๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค. ๋”ฐ๋ผ์„œ job์— ๊ฑธ์ˆ˜์žˆ๋Š” ์—ฐ์‚ฐ๋„ ๋‹ค ๊ฐ€๋Šฅํ•˜๋‹ค.

 

ํ”Œ๋กœ์šฐ๋ฅผ ์ค‘๊ฐ„์— cancelํ•˜๋ ค๋ฉด cancellable()์—ฐ์‚ฐ์ž๋ฅผ ๋ถ™์—ฌ์ฃผ๋ฉด ๋œ๋‹ค. ์ด๋Ÿฌ๋ฉด cancel์ด ์ผ์–ด๋‚ ๋•Œ ์˜ˆ์™ธ๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

 

 

 

"๋Œ“๊ธ€, ๊ณต๊ฐ ๋ฒ„ํŠผ ํ•œ ๋ฒˆ์”ฉ ๋ˆ„๋ฅด๊ณ  ๊ฐ€์ฃผ์‹œ๋ฉด ํฐ ํž˜์ด ๋ฉ๋‹ˆ๋‹ค"
๋ฐ˜์‘ํ˜•
COMMENT