Issue
Is following code safe and why?
val flow: Flow<String> = ...
val allStrings = mutableListOf<String>()
var sum = 0
flow.transform {
allStrings += it
emit(it.toInt())
}.collect {
sum += it
}
Following test demonstrates collect {}
body being called from different threads:
val ctx = newFixedThreadPoolContext(32, "my-context")
runBlocking(ctx) {
val f = flow<Int> {
(1 .. 1000).forEach {
emit(it)
}
}
var t: Thread? = null
f.collect {
delay(1)
// this requirement will fail
if (t == null) t = Thread.currentThread() else require(t == Thread.currentThread())
}
}
And another one that tests publication:
fun main(args: Array<String>) {
val ctx = newFixedThreadPoolContext(32, "my-context")
runBlocking(ctx) {
val f = flow<Int> {
(1 .. 1000000).forEach {
emit(it)
}
}
var c = 0
f.transform {
c += 1
boo()
c += 1
emit(it)
c += 1
}.collect {
c += 1
boo()
c += 1
}
println(c) // prints 5_000_000
}
}
suspend fun boo() {
withContext(Dispatchers.IO) {
}
}
Therefore it seems kotlin flow ensures publication between coroutine invocations but is this intentional (or even documented) or implementation side effect?
Solution
Yes, the code is safe. Flows are sequential by default.
Each individual collection of a flow is performed sequentially unless special operators that operate on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator (
collect
). No new coroutines are launched by default. Each emitted value is processed by all the intermediate operators from upstream to downstream and is then delivered to the terminal operator after.
Kotlin Flows are based on suspending functions and they are completely sequential, but not single-threaded.
Therefore it seems kotlin flow ensures publication between coroutine invocations but is this intentional (or even documented) or implementation side effect?
a flow is a type that can emit multiple values sequentially
According to this I understand that no new value will be collected until previous value is processed, no matter how much Threads are involved.
As Roman mention in his comment, here is a good article about sequential execution. A great quote from there:
Even though a coroutine in Kotlin can execute on multiple threads it is just like a thread from a standpoint of mutable state. No two actions in the same coroutine can be concurrent. And just like with threads you should avoid sharing your mutable state between coroutines or you’ll have to worry about synchronization yourself. Avoid sharing mutable state. Confine each mutable object to either a single thread or to a single coroutine and sleep well.
And this is applicable to Flows
, because collection of Flow
works directly in the coroutine that calls a terminal operator. No new coroutines are launched by default.
Answered By - Sergey
Answer Checked By - David Marino (JavaFixing Volunteer)