深入 Kotlin Coroutines 测试全链路:从 TestDispatcher 调度控制到 Turbine Flow 断言的协程单元测试工程实践
上个月给一个网络层模块补单元测试,遇到了一个让人头大的问题:一个 viewModelScope.launch 里调 delay(2000) 后更新 UI 的逻辑,测试要么跑 2 秒,要么 assertion 永远拿不到更新的值。runBlocking 能解决延迟但不支持结构化并发,advanceTimeBy 看文档能用但不知道怎么注入。
协程测试的痛点不在于用法复杂,而在于时间控制和调度注入这两个东西藏在源码里,文档浅尝辄止。解决问题的关键在于理解两件事:虚拟时钟怎么工作,以及怎么把测试调度器塞进被测代码的控制流里。
runTest 虚拟时钟:测试里的时间不走了
kotlinx-coroutines-test 提供了 runTest,它的核心能力是虚拟时钟:所有基于协程调度器的延时操作都不会真实等待,而是记录在未来时间点上,等待被手动推进。
@Test
fun `delay should not really wait`() = runTest {
val start = System.currentTimeMillis()
delay(5000) // 虚拟时钟上跳 5 秒
val elapsed = System.currentTimeMillis() - start
assertTrue(elapsed < 100) // 实际耗时不到 100ms
}
runTest 内部使用 TestCoroutineScheduler 维护一个虚拟时间轴。当协程调用 delay(n),调度器不会阻塞线程,而是在事件队列里记一条”当前时间 + n 毫秒后恢复”。只有调用 runCurrent() 或让测试协程自然挂起时,调度器才会跳到最近一个待处理事件的时刻去执行。
这里有个坑:runTest 默认只在顶层测试协程空闲时才推进时间。如果被测代码用 launch 起了子协程,且没有 join 或 cancelAndJoin,时间不会自动跳到子协程完成。
@Test
fun `child coroutine needs explicit join`() = runTest {
var result = ""
launch {
delay(1000)
result = "done"
}
// 此时子协程还没跑完,result 仍为 ""
assertEquals("", result) // 通过,但这不是你想要的
}
解决方式是在 launch 后加 currentCoroutineContext().job.children.forEach { it.join() },或者更直接:用 StandardTestDispatcher 并手动 runCurrent()。
TestDispatcher 调度注入:把时间控制权交给测试
runTest 默认用的是 StandardTestDispatcher,它不会主动推进时间——所有延迟任务都堆积在调度器里,等测试代码显式推进。这才是单元测试需要的控制粒度。
注入 TestDispatcher 有两种场景。
场景一:被测函数接受 CoroutineContext
最直接的方案是让被测方接受 CoroutineContext 参数(默认 Dispatchers.Main),测试时传入 TestDispatcher:
class UserRepository(
private val api: Api,
private val dispatcher: CoroutineContext = Dispatchers.IO
) {
suspend fun fetchUser(id: String): User = withContext(dispatcher) {
delay(300) // 模拟网络延迟
api.getUser(id)
}
}
@Test
fun `fetchUser with injected dispatcher`() = runTest {
val repo = UserRepository(fakeApi, coroutineContext)
val user = repo.fetchUser("1")
assertEquals("test_user", user.name)
}
这里直接把 runTest 的 coroutineContext(也就是 TestDispatcher)传入,300ms 延迟瞬时完成。
场景二:ViewModel 里硬编码了 Dispatchers.Main
Android 开发里最常见的场景:viewModelScope.launch 开在 Dispatchers.Main 上,你没法直接传参注入。这时候要用 Dispatchers.setMain:
@Before
fun setup() {
Dispatchers.setMain(StandardTestDispatcher())
}
@After
fun tearDown() {
Dispatchers.resetMain()
}
@Test
fun `viewModel fetches and updates state`() = runTest {
val viewModel = MyViewModel(fakeRepo)
advanceUntilIdle() // 推进所有待处理任务
assertEquals(Loaded("data"), viewModel.state.value)
}
advanceUntilIdle() 会持续推进虚拟时间,直到调度器里没有任何待执行任务。对于 ViewModel 里一连串 launch → delay → 更新 StateFlow 的逻辑,这一步就全搞定了。
需要精细控制时用 advanceTimeBy(duration),比如测试超时逻辑:
@Test
fun `timeout triggers fallback`() = runTest {
val viewModel = MyViewModel(fakeSlowRepo)
advanceTimeBy(5100) // 跳过 5s 超时阈值
assertEquals(Fallback("timeout"), viewModel.state.value)
}
我踩过一个坑:如果 @Before 里设了 Dispatchers.setMain(testDispatcher) 但 runTest 内部又创建了新的 TestDispatcher,两者不是同一个调度器实例。advanceUntilIdle() 推进的是 runTest 的调度器,而 ViewModel 里 viewModelScope.launch 跑在你设的 testDispatcher 上——两者脱钩,测试会一直挂起。解决方案是让 runTest 使用同一个实例:
val testDispatcher = StandardTestDispatcher()
@Before
fun setup() {
Dispatchers.setMain(testDispatcher)
}
@Test
fun `shared dispatcher works`() = runTest(testDispatcher) { ... }
Turbine:Flow 事件断言不再手写 Collector
StateFlow 的测试直接用 state.value 取值就行。但普通 Flow 没有热值,你得手动 collect。最早我是这么写的:
@Test
fun `manual collect is tedious`() = runTest {
val flow = repo.observeEvents()
val events = mutableListOf<Event>()
val job = launch { flow.toList(events) }
advanceUntilIdle()
job.cancel()
assertEquals(listOf(Event.Start, Event.Data), events)
}
写了两三个 case 就觉得不对劲:每次都要 launch、toList、cancel,出错了堆栈还不清晰。后来换了 Turbine,这是一个专为 Flow 测试设计的轻量库,核心就两个 API:test() 和 awaitItem()。
@Test
fun `flow emits loading then data`() = runTest {
repo.observeData().test {
assertEquals(Loading, awaitItem())
assertEquals(Data("result"), awaitItem())
awaitComplete() // 断言 Flow 正常完结
}
}
test { } 会在内部起一个 collect 协程,awaitItem() 挂起等待下一个 emit 的值,awaitComplete() 断言 Flow 已经关闭。所有操作都在 runTest 的虚拟时钟下,delay 一样不需要真等。
如果被测 Flow 只会 emit 有限次然后完结,awaitComplete() 必须调——不然 Turbine 会一直等下一个值,测试挂起超时。
需要测试 Flow 抛异常时用 awaitError():
repo.errorFlow().test {
assertEquals(Start, awaitItem())
assertEquals("Network error", (awaitError() as IOException).message)
}
Turbine 的 cancelAndIgnoreRemainingEvents() 在只需要验证前几个事件的场景很好用:
flow.test {
assertEquals("first", awaitItem())
cancelAndIgnoreRemainingEvents()
}
不调这个的话,Turbine 在 test {} 结束时发现 Flow 还没完结,会抛 CancellationException,导致测试标记为失败。
工程实践:那些文档不会告诉你的东西
flowOn 的调度问题。如果被测 Flow 链里用了 flowOn(Dispatchers.IO),在 runTest 下主调度器是 TestDispatcher,但 flowOn 创建的内部调度器不受你的 Dispatchers.setMain 影响。这个 Flow 确实不会被真阻塞——runTest 会把所有协程全部纳入虚拟时钟,不管它们的调度器是什么。我验证过,flowOn(Dispatchers.IO).delay(5000) 在 runTest 里同样瞬间完成。原理是 runTest 拦截了底层的 DefaultDelay,只要代码跑在协程上下文里,delay 就走虚拟时间。
测试倒计时逻辑。比如一个 60 秒倒计时的 tick Flow:
fun countdownFlow(seconds: Int): Flow<Int> = flow {
repeat(seconds) { i ->
emit(seconds - i)
delay(1000)
}
}
@Test
fun `countdown emits all ticks instantly`() = runTest {
countdownFlow(60).test {
(60 downTo 1).forEach { assertEquals(it, awaitItem()) }
awaitComplete()
}
}
60 次 awaitItem() + delay(1000),在虚拟时钟下全部瞬间完成。真要在真实环境测这个,每个 case 等一分钟,CI 早就爆了。
复杂状态的完整性断言。不要只断言最后一个值,每条事件链都要覆盖:
viewModel.events.test {
// 前两个事件与初始化相关
assertEquals(Init, awaitItem())
assertEquals(Loading, awaitItem())
// 触发动作后
viewModel.onAction(Refresh)
assertEquals(Data(emptyList()), awaitItem())
assertEquals(Idle, awaitItem())
cancelAndIgnoreRemainingEvents()
}
按事件序列逐一断言,比只测 state.value 更能发现中间态的 bug——比如 Loading 和 Data 之间有没有多 emit 一次空数据导致 UI 闪烁,这种问题只在序列测试里暴露。
几点实践建议。第一,新项目从一开始就让 Repository / UseCase 接受 CoroutineContext 参数,别把 Dispatchers.Main 写死在类里,这是最低成本的测试友好设计。第二,runTest 的 testTimeLimit 参数设一个合理值(默认 10s),超过这个时间虚拟时钟没被推进完,测试直接失败——这能帮你快速发现”忘了 advanceUntilIdle”的问题。第三,Turbine 适合事件序列验证,StateFlow 就用 state.value,不要用 Turbine 去测 StateFlow,两种场景两种工具。