https://kotlinlang.org/docs/flow.html#intermediate-flow-operators
์ ๋งํฌ๋ฅผ ๊ณต๋ถํ๋ฉฐ ๋จ๊ธฐ๋ ๊ธฐ๋ก์ด๋ค. Flow ์ค๊ฐ์ฐ์ฐ์๋ถํฐ ํ๊ฒ ๋ค. ์ด์ ์ ๊ณต๋ถ๊ธฐ๋ก์ ์์ฑ์ ๊น๋ฐํ๋๋ฐ ์ถํ ๊ธฐ๋กํด๋ ์์ ์ด๋ค.
Intermediate flow operators
ํ๋ก์ฐ ์ค๊ฐ ์ฐ์ฐ์๋ค. ํ๋ก์ฐ ์์ฒด๋ฅผ ์ค๊ฐ ์ฐ์ฐ์๋ก ์ฌ์ฉํ ์ ์์ผ๋ฉฐ ์ ์คํธ๋ฆผ ํ๋ก์ฐ์ ์ ์ฉ์ํจ๋ค์ ๋ค์ด์คํธ๋ฆผ ํ๋ก์ฐ๋ฅผ ๋ฐํ ํ๋ค. ์ด๋ ๋์์ ์ฝ๋ ํ๋ก์ฐ๋ค.
๊ธฐ๋ณธ์ฐ์ฐ์๋ก๋ .map, .filter๋ฑ์ด ์๋๋ฐ ์ํ์ค์ ์ฐจ์ด์ ์ suspend ํจ์๋ฅผ ํธ์ถํ ์ ์๋ค๋ ๊ฒ์ด๋ค.
inline fun <T, R> Flow<T>.map(crossinline transform: suspend (T) -> R): Flow<R>
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
map์์ 1..3์ธ ์ฝ๋ ํ๋ก์ฐ๋ฅผ ํ๋์ฉ ๋ฐ์์ performRequest์ ๋ฃ๋๋ค. ๊ทธ๋ฌ๋ฉด collect๋ return๋ response $request๋ฅผ ๊ตฌ๋ ํด์ ๊ฐ์ ธ๊ฐ๋ค.
Transform operator
๋ณํ ์ฐ์ฐ์๋ map๊ณผ filter์ ๊ฐ์ด ์๊ฒผ๋ค. transform ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ๋ฉฐ ์์์ ๊ฐ์ ์์์ ์๊ฐ์ ๋ฐฉ์ถํ ์์๋ค.
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
์ด ๊ฒฝ์ฐ ํ ํ๋ก์ฐ ๋น Making~ ์ ๋จผ์ ์ถ๋ ฅํ๊ณ performRequest๋ก ๋ฐ์ ๊ฐ์ ๋ค์ด์ด ์ถ๋ ฅํ๋ค.
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request ->
println("Making request $request")
println(performRequest(request))
}
.collect { response -> println(response)}
}
map๊ณผ ๋น์ทํ๋ค๊ณ ํ๋ ์ด๋ ๊ฒ ๋ฐ๊ฟ๋ดค๋ค. ์ถ๋ ฅ๊ฐ์ ์์ํด๋ดค์๋ map์์ ๋ฐํ๋๋ ๊ฐ์ด ์์์ ์ ์ ์๋๋ฐ, ๋ฐ๋ผ์ ์๋์ ๊ฐ์ ์ถ๋ ฅ๋ฌธ์ด ์ด์ด์ง๋ค. ์ด์ ๋ฐ์ emit์ผ๋ก ๋๊ฒจ์ฃผ๋ transform์ emit๋ ๊ฐ์ ์ฐจ๋ก๋๋ก response์ ๋ฃ์ด์ฃผ๋๊น ๊ฐ์ด ๋ฐํ๋จ์ ์์์๋ค.
Making request 1
response 1
kotlin.Unit
Making request 2
response 2
kotlin.Unit
Making request 3
response 3
kotlin.Unit
collect์ respose์ ๋ค์ด๊ฐ println์ ๊ฐ์ ๋ฐํํ์ง ์๊ธฐ ๋๋ฌธ์ kotlin.Unit์ด ์ถ๋ ฅ๋๋ค.
Size-limiting operators
cancel์ ์ผ์ผํค๋ takeํจ์๋ฅผ ์ด์ฉํด ๋ฐ์์ค๋ ๊ฐ ๊ฐ์๋ฅผ ์กฐ์ ํ๋ ๋ฐฉ์์ด๋ค.
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
์ฝ๋ฃจํด ์์์ cancel์ ์ผ์ผํค๊ธฐ ๋๋ฌธ์ ๊ผญ try-finally๋ก ์์ธ์ฒ๋ฆฌ๋ฅผ ํด์ค์ผํ๋ค. take(2)๋ emit(2)๊น์ง๋ง ๋ฐ๊ณ ๋์ด๋ฒ๋ฆฐ๋ค. ๋ฐ์ ๊ฐ์ collectํด์ ๊ฐ์ ธ์ฌ์์๋ค๋ ์ ์ด ๊ทธ๋ฅ cancel๊ณผ ๋ค๋ฅด๋ค๊ณ ํ ์ ์๊ฒ ๋ค.
Terminal flow operators(์ข ๋จ ์ฐ์ฐ์)
ํ๋ก์ฐ๋ฅผ ์์งํ๋ suspend ํจ์๋ค์ด๋ค. collect๊ฐ ๋ํ์ ์ด๊ณ toList, toSet๊ณผ ๊ฐ์ด ๋ฐ๋ก ์ปฌ๋ ์ ์ผ๋ก ๋ฐ๊ฟ ์๋ ์๋ค.
val sum = (1..3).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
reduce, fold๋ ํด๋นํ๋ค. ์ฝ๋๋ฅผ ๋ณด๋ฉด a์ 1์ด ๋ค์ด๊ฐ๊ณ b์ 2*2๊ฐ ๋ค์ด๊ฐ๋ค. ํฉ์น๊ฒ a๋ก ๋ค์ ๋ค์ด๊ฐ๊ณ b์ 3*3์ด ๋ค์ด์ค๋ ๋์ ํฉ ์ปฌ๋ ์ ๋ด์ฅํจ์์ด๋ค.
Flows are sequential
ํ๋ก์ฐ์ ์์ง์ ๊ธฐ๋ณธ์ ์ผ๋ก ์์ฐจ์ ์ผ๋ก ์ํ๋๋ค. ๊ฐ ์ฐ์ฐ์๋ง๋ค emit๋ ๊ฐ๋ค์ ์ ์คํธ๋ฆผ์ ๋ชจ๋ ์ค๊ฐ์ฐ์ฐ์์ ์ํด ์ฒ๋ฆฌ๋๊ณ ์ต์ข ์ ์ผ๋ก ๋ค์ด์คํธ๋ฆผ์ ์ ๋ฌ๋์ด ์ข ๋จ์ฐ์ฐ์์์ ์์งํ๋๊ฒ ๊ณผ์ ์ด๋ค.
Flow Context
ํ๋ก์ฐ ์์ง์ ํญ์ ์์ง์ ํธ์ถํ ์ฝ๋ฃจํด์ context์์ ์ด๋ฃจ์ด์ง๋ค. -> context ๋ณด์กด์ด๋ผ๊ณ ๋ถ๋ฅธ๋ค.
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
val job = launch{
simple().collect { value -> log("Collected $value") }
}
}
job์ผ๋ก ๋ฌถ์ ์ฝ๋ฃจํด๊ณผ runBlocking ๋ฐ๋ก ๋ฐ์์๋ ์ฝ๋ฃจํด์์ ๊ฐ๊ฐ ์คํํ ํ๋ก์ฐ๋ ๊ฐ๊ฐ์ ์ฝ๋ฃจํด ์ปจํ ์คํธ์์ ๋์๊ฐ๋ค. simple()๋ก ๋ง๋ ํ๋ก์ฐ ํจ์๊ฐ ์์ง์ ํธ์ถํ ์ฝ๋ฃจํด ์ปจํ ์คํธ๋ฅผ ๋ฐ๋ฅธ๋ค๋ ๊ฑธ ๊ธฐ์ตํ๋ฉด ๋๋ค.
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
[main @coroutine#2] Started simple flow
[main @coroutine#2] Collected 1
[main @coroutine#2] Collected 2
[main @coroutine#2] Collected 3
๋ฌธ์ ๋ ์ฅ๊ธฐ์ ์ธ ์ฌ์ฉ์ด ํ์ํ ์์ ์ ๋ฐ๋ก ๋ณ๋์ ์ค๋ ๋์์ ๋๋ ค์ผ๋๋๋ฐ ์ด๋ ์ฝ๋ฃจํด ์ปจํ ์คํธ๋ฅผ ์ ํํด์ ์ฌ์ฉํ๋ withContext๋ฅผ flow๋น๋ ๋ด์์ ํ๋ฉด ์ปจํ ์คํธ ๋ณด์กด์ ์งํค์ง ๋ชปํ๊ธฐ ๋๋ฌธ์ ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ค. ์ด๋ด ๋ ํ์ํ ์ฐ์ฐ์๊ฐ flowOn์ฐ์ฐ์์ด๋ค.
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
foo().collect { value ->
log("Collected $value")
}
}
์ด๋ ๊ฒ flowOn์ ์ฌ์ฉํด์ฃผ๋ฉด flow๋น๋๋ถ๋ถ์ DefaultDispatcher์์ ๋๊ณ collect๋ ๋ฉ์ธ ์ค๋ ๋ ์ฝ๋ฃจํด์์ ๋๊ฒ๋๋ค. ์ปจํ ์คํธ ์ ํ์ด ์์ ํ๊ฒ ์ด๋ฃจ์ด์ก๋ค. ์ฆ flowOn์ ๋ชฉ์ ์ ๋ฐฉ์ถ-์์ง์ ์์๋๋ก ์ฒ๋ฆฌํ๋ ํน์ฑ์ธ ์์ฐจ์ฑ์ ๋ชป์งํค๋ ๋์ ์ปจํ ์คํธ ๋ณด์กด์ ์งํค๋ฉด์ ์ฝ๋ฃจํด ๋์คํจ์ฒ๋ฅผ ๋ฐ๊พธ๋ ๋ฐ ์๋ค.
Buffering
delay๋์ ์ buffer์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ด๋ค.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
์ฌ๊ธฐ์ ๋ฒํผ๋ฅผ ๋นผ๋ฒ๋ฆฌ๋ฉด ์๊ฐ์ด ๊ฐ ์์ง๋น 400์ฉ ์ด 1200ms์ด ๊ฑธ๋ฆฌ๋๋ฐ ๋ฒํผ๋ฅผ ์ฌ์ฉํ๋ฉด ์ฒ์ 100๋ง ๊ฑธ๋ ค์ 100+300+300+300์ธ 1000ms์ด ๋์ถฉ ๊ฑธ๋ฆฐ๋ค. buffer์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ๋ฉด ์์ง๋ ๋ฐ์ํ๋ delay(300) ๋ ๋ฏธ๋ฆฌ ๋ฐฉ์ถ์ delay๋ฅผ ์ฒ๋ฆฌํด์ ๋ ํจ์จ์ ์ธ ์คํ์ด ์ผ์ด๋๋ค. ๊ทธ๋ ๋ค๋ฉด ์์ง์ด ๋ ๋น ๋ฅด๋ฉด ์ด๋ป๊ฒ ๋ ๊น?
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(50) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
์ฒ์ 100+50+100+100 ์ ์ค์ฐจ๋ฒ์ ์ฝ 50ms์ด๋ฏ๋ก 350~400ms์ด ๋์จ๋ค. ์์ง๋๊ฐ ๋ ๋น ๋ฅด๋ค๋ฉด ๋ฒํผ๋ ํฌ๊ฒ ํจ๊ณผ๊ฐ ์๋ค๋ ๊ฒ์ ์ ์ ์๋ค.
Conflation & Processing the latest value
conflate์ฐ์ฐ์๋ฅผ ์ฌ์ฉํด์ ์์ง์ด ๋๋ฌด ๋๋ฆด๊ฒฝ์ฐ ๊ฐ์ ํ๋ํ๋ ๋ค ๋ฐฉ์ถํ์ง์๊ณ ์ค๊ฐ๊ฐ์ ๊ฑด๋๋ฐ๊ณ ์ ์ผ ์ต์ ๊ฐ์ ๊ฐ์ ธ์จ๋ค.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(190) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
2๊น์ง ์์ง๋๋๋ฐ 200์ด ๊ฑธ๋ฆฌ๋๋ฐ, 1์ ์์งํ๋ฉด์ 200์ ๋์ด๋ฒ๋ฆฌ๋ฉด conflate๊ฐ ๋๊ฒจ๋ฒ๋ฆฐ๋ค. ํ์ง๋ง 190๊น์ง ์์งํ๋ค๋ฉด 2๋ ๊ฐ์ด ์์งํ๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
์ต์ ๊ฐ ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ ์ค์ conflation(๋ณํฉ)๋ ์๊ณ , ์ฐ์ฐ์๋ค์ Latest๊ฐ ๋ถ์ ์ฐ์ฐ์๊ฐ ์กด์ฌํ๋ค. ์ด ์ฐ์ฐ์๋ค์ ์๊ฐ์ด ๋ฐฉ์ถ๋๋ฉด ์์งํ๋ ์ฝ๋๋ธ๋ก์ ์ทจ์ํ๊ณ ์๋ก ๋ฐฉ์ถ๋ ๊ฐ์ ๊ฐ์ ธ์จ๋ค.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
}
์ค๋ช ์ด ๊ฒน์น๋ ๊ทธ๋ฅ ๋์ด๊ฐ๋๋ก ํ๊ฒ ๋ค.
Composing multiple flows
๋ค์ค ํ๋ก์ฐ๋ฅผ ํ์ฉํ๋ ๋ฐฉ๋ฒ๋ค์ด๋ค.
Zip
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
nums ํ๋ก์ฐ ๋ฐฉ์ถ๊ฐ๊ณผ strsํ๋ก์ฐ ๋ฐฉ์ถ๊ฐ์ zip์ผ๋ก ๋ฌถ์ด์ return์ผ๋ก String์ ๋ฐํํ๋ ์ฝ๋๋ค. ์ํ์ค์์ ์ฐ๋ zip๊ณผ ๊ฑฐ์ ๋์ผํ๋ค.
Combine
์ด๋ฏธ ๋ฐฉ์ถ๋ ๊ฐ์ผ๋ก ๋ ์ถ๊ฐ์ฐ์ฐ์ ์ํํด์ผํ๋ ๊ฒฝ์ฐ combine์ฐ์ฐ์๋ฅผ ์ด์ฉํ๋ค.
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
์ด๋ฌ๋ฉด ์์์ฑ์ ์ ์งํ์ฑ๋ก ๊ฐ๋๋ฐ strs๊ฐ ๊ฐ ๋ฐฉ์ถ์ด ๋๋๋ ์์ , nums ๊ฐ ๋ฐฉ์ถ์ด ๋๋๋ ์์ ์ด ๊ณ์ ๋ง๋ฌผ๋ ค์ ์๋ ์ถ๋ ฅ๊ณผ ๊ฐ์ด ๋์จ๋ค. ์ฒ์์๋ strs๊น์ง ๊ธฐ๋ค๋ ค์ผํ๋ 400ms, nums๋ ๋ค์ ๋ฐฉ์ถ์ด ์์๋์ผ๋ nums๋ ๊ฐฑ์ ๋๊ณ str๋ ์ข ๋ ๊ฑธ๋ฆฌ๊ณ ํ๋ ๊ณผ์ ์ด ์ง์๋๋ค.
1 -> one at 444 ms from start
2 -> one at 648 ms from start
2 -> two at 846 ms from start
3 -> two at 949 ms from start
3 -> three at 1246 ms from start
์ฆ ๋ฐฉ์ถ์ด ์ผ์ด๋ ๋๋ง๋ค ๋ค๋ฅธ ํ๋ก์ฐ์ ์ต์ ๊ฐ์ ๊ฐ์ง๊ณ ๋ณํฉํด์ ์ถ๋ ฅํ๋๊ฒ combine ์ฐ์ฐ์๋ค.
Flattening Flows
ํ๋ก์ฐ์ ๋ฐฉ์ถ๊ฐ์ ๋ ํ๋ก์ฐ์ ๋ฃ์ ๋ ๋์ค์ ์ฒ๋ฆฌ๋ฅผ ์ํด์ ํํํ๊ฐ ํ์ํ๋ค. flatten, flatMap ์ฐ์ฐ์๊ฐ ์ด๋ ์ฌ์ฉ๋๋ค.
flatMapConcat๋ ๊ทธ๋ฅ ์ด์ค ํ๋ก์ฐ๊ฐ ์์ฐจ์ ์ผ๋ก ๋๋๋๊ฑธ ๊ธฐ๋ค๋ ค์ค๋ค.
flatMapMerge๋ ๊ฐ ํ๋ก์ฐ๋ฅผ ๋์์ ๋๋ ค๋ฒ๋ฆฐ๋ค(๋์ ์์งํด์ ๋จ์ผ ํ๋ก์ฐ๋ก ๊ฐ๋ฅํ ๋นจ๋ฆฌ ๋ฐฉ์ถ์ํจ๋ค).
flatMapLatest๋ collectLatest์ ๋น์ทํ๋ฐ, ์์ง์ด ๋๋ฆฌ๋ฉด ๊ฑด๋ ๋ฐ์ด๋ฒ๋ฆฐ๋ค.
Flow exceptions
ํ๋ก์ฐ๋ฅผ take์ ๊ฐ์ ๊ฑธ๋ก cancel์ด ๋๊ฒํ๋ฉด emit์์ ์์ธ๋ฅผ ๋์ง๋๋ฐ ์ด ์์ธ๋ ์์ง์ชฝ์์ ๋ฐ๋๋ค. ์ด ๋ง์ ์์ธ์ฒ๋ฆฌ๋ฅผ ์์ง์ชฝ์์ ํด์ค์ผํ๋ค๋ ๋ป์ด๋ค.
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
value๊ฐ 2๊ฐ๋๋ฉด catch๋ก ๋์ด๊ฐ๋ ์ฝ๋๋ค. ์ด ๊ฒฝ์ฐ ์ค๊ฐ/์ข ๋จ/๋ฐฉ์ถ์ฐ์ฐ์ ์ค ์ด๋๋ถ๋ถ์์ ์์ธ๊ฐ ๋ฐ์ํ๋์ง ์ ์ ์๋๋ฐ, flow์ map์ผ๋ก ์์ธ๋ฅผ ๋๊ฒจ์ฃผ๋ฉด ์ข ๋จ์ฐ์ฐ์์์ ๋ฐ๋๊ฑธ ํ์คํ๊ฒ ์ธ์งํ ์ ์๋ค.
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
Exception transparency ์์ธ ํฌ๋ช ์ฑ
ํ๋ก์ฐ๋ฅผ ์ฌ์ฉํ ๋๋ ์์ธ ์ฒ๋ฆฌ๋ฅผ ์ํด์ผํ๋ค. flow{}๋น๋ ์์์ try~catch๋ก ์์ธ์ฒ๋ฆฌํด์ ์์ธ ์๋ ๊ฐ๋ง ๋ฐฉ์ถํ๋ ๊ฒ์ ์์ธ ํฌ๋ช ์ฑ์ ์๋ฐํ๋ ๊ฒ์ด๋ผ๊ณ ํ ์ ์๋ค.
emit์ด catch์ฐ์ฐ์ผ๋ก ์ด๋ฐ ์์ธ ํฌ๋ช ์ฑ์ ์งํฌ ์ ์๋ค. catch์์ emit์ ์ฌ์ฉํด ๋ฐฉ์ถํ๊ฑฐ๋, throw๋ฅผ ๋ ์ฌ์ฉํด์ ๋ค์ ์์ธ๋ฅผ ๋์ง ์ ์๋ค.
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
}
์์ธ๊ฐ ๋ฐ์ํ๋ 9๋ฒ ๋ผ์ธ์์ ๋์ ธ์ง e๋ฅผ catch๊ฐ ๋ฐ์์ emitํ๊ณ ์ด๊ฑธ collect๊ฐ ๋ฐ๋๋ค. try~catch๋ฅผ ์ฐ์ง์์๋ ๋์ผํ ๊ฒฐ๊ณผ๋ฅผ ๋ณด์ฌ์ค์ ์ ์ ์๋ค.
catch ์ฐ์ฐ์๋ ์ ์คํธ๋ฆผ์์ ๋ฐ์ํ๋ ์์ธ๋ง ์ก์์ค๋ค. ์ ์ฝ๋ ๋ธ๋ญ์์ check๊ฐ mainํจ์ ์ collect์ ์์นํ๋ค๋ฉด ์์ธ์ฒ๋ฆฌ๋ฅผ ํ์ง๋ชปํ๊ณ ์ค๋ฅ๋ฅผ ๋ด๋ฟ์์ ๊ฒ์ด๋ค.
Flow completion
์ ์์ด๋ ์์ธ๋ ํ๋ก์ฐ๊ฐ ์ข ๋ฃ๋๋ฉด ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ด 2๊ฐ์ง๊ฐ ์๋ค. imperative์ declarative๋ฐฉ์์ด๋ค.
imperativeํ ๋ฐฉ์์ ๋ฌด์กฐ๊ฑด try~finally๋ฅผ ๊ฑฐ์น๊ฒ ๋ง๋ค์ด์ ์์ง์ด ์ข ๋ฃ๋๊ณ ์คํํ ์ฝ๋๋ฅผ ์ ์ํด๋๋ ๊ฒ์ด๋ค.
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
declarativeํ ๋ฐฉ์์ผ๋ก๋ ํ๋ก์ฐ๋น๋์ onCompletion์ ๋ถ์ฌ์ ์ข ๋ฃ์ ์ํํ ํ๋์ ์ ์ธํด๋๋ ๊ฒ์ด๋ค.
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
๊ทธ๋ฌ๋ onCompletion์ ์์ธ๋ฅผ ์ก์ง ๋ชปํ๊ธฐ๋๋ฌธ์ catch์ ๊ฐ์ด ์ฌ์ฉํด์ค์ผํ๋ค. ๋ ํ๊ฐ์ง catch์ onCompletion์ ์ฐจ์ด๋ onCompletion์ด ์์ธ๋ฅผ ์ข ๋ฃ์์ ์ผ๋ก ํด์ ๋ฐ๊ฒฌํ ์๋ ์์ด๋ ํ๋ก์ฐ ์ทจ์๋ ์คํจ๋ฅผ ๊ฐ์ด ํธ์ถํ๋ ๊ฒ ์๋๊ธฐ ๋๋ฌธ์ ์คํ ์ค ์ค๋ฅ๊ฐ ๋๋ฒ๋ฆฐ๋ค.
๋ค์ ์ฝ๋๊ฐ ๊ทธ ์์์ธ๋ฐ, ์์ธ๊ฒฐ๊ณผ๋ฅผ ๋ฐ์์ ์ถ๋ ฅ์ ํ์ง๋ง ๋ค์ด์คํธ๋ฆผ์์ ๋ฐ์ํ๋ ์์ธ ํ์ ์คํ์ค๋ฅ๊ฐ ๋๋ค.
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
Imperative(๋ช ๋ นํ)์ declarative(์ ์ธํ)๊ฐ ๊ฐ๊ฐ ํน์ง์ด ์๋๋ฐ ํ์์ ๋ฐ๋ผ ๋ง์ถฐ์ฌ์ฉํ๋๊ฒ ์ข๋ค.
Launching flow
onEach๋ addEventListener์ ๊ฐ์ ํ๋์ ๋ณด์ฌ์ค๋ค. onEach๋ ์ค๊ฐ์ฐ์ฐ์์ด๊ธฐ ๋๋ฌธ์ ๊ฐ์ ๋ฐ์ ๋ฌด์กฐ๊ฑด ์์งํด์ค์ผํ๋ค.
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
onEach๋ฅผ ์ฌ์ฉํ ๋ ํด๋น ํ๋ก์ฐ๋ฅผ ๋ถ๋ฆฌ๋ ์ฝ๋ฃจํด์ผ๋ก ๋ณด๋ด๋ฒ๋ฆด ์ ์๋ ์ข ๋จ์ฐ์ฐ์๊ฐ ์๋๋ฐ launchin์ด ๊ทธ๊ฒ์ด๋ค.
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
launchIn์ ๋ถ์ฌ ๋ฒ๋ ค์ events ํ๋ก์ฐ๋ ๋ถ๋ฆฌ๋ ์ฝ๋ฃจํด์ด ๋์ด, ์๋ ๋ผ์ธ์ println์ด ๋จผ์ ์คํ๋๋ค. addEventListener์ ๋ ์ ์ฌํ๊ฑด launchIn์ด๋ค. ์์งํด๋ฒ๋ฆฌ๊ธฐ ๋๋ฌธ์ eventํ์๋ ๋๋ค๊ณ ๋ณผ ์ ์์ด removeEventListener ์ญํ ๋ ๊ฒธํ๋ค.
launchIn์ ์ฝ๋ฃจํด์ ์์ฑํ๊ธฐ ๋๋ฌธ์ Job๊ฐ์ฒด๋ฅผ ๋ฐํํ๋ค. ๋ฐ๋ผ์ job์ ๊ฑธ์์๋ ์ฐ์ฐ๋ ๋ค ๊ฐ๋ฅํ๋ค.
ํ๋ก์ฐ๋ฅผ ์ค๊ฐ์ cancelํ๋ ค๋ฉด cancellable()์ฐ์ฐ์๋ฅผ ๋ถ์ฌ์ฃผ๋ฉด ๋๋ค. ์ด๋ฌ๋ฉด cancel์ด ์ผ์ด๋ ๋ ์์ธ๋ฅผ ๋ฐํํ๋ค.
"๋๊ธ, ๊ณต๊ฐ ๋ฒํผ ํ ๋ฒ์ฉ ๋๋ฅด๊ณ ๊ฐ์ฃผ์๋ฉด ํฐ ํ์ด ๋ฉ๋๋ค"
'Android ๐ฅ๏ธ > Coroutine๐' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Callback ๊ธฐ๋ฐ์ ๋น๋๊ธฐ์์ ์ ์ฌ์ฉ๋๋ CompletableDeferred, suspendCancellableCoroutine (0) | 2024.05.05 |
---|---|
Coroutine exceptions handling (0) | 2024.03.29 |
Coroutine context and dispatchers (0) | 2024.03.29 |
Coroutine Composing suspending functions (0) | 2024.03.29 |
Coroutine Cancellation and timeouts (0) | 2024.03.29 |