深入 Kotlin Coroutines 测试全链路:从 TestDispatcher 调度控制到 Turbine Flow 断言的协程单元测试工程实践

上个月给一个网络层模块补单元测试,遇到了一个让人头大的问题:一个 viewModelScope.launch 里调 delay(2000) 后更新 UI 的逻辑,测试要么跑 2 秒,要么 assertion 永远拿不到更新的值。runBlocking 能解决延迟但不支持结构化并发,advanceTimeBy 看文档能用但不知道怎么注入。

协程测试的痛点不在于用法复杂,而在于时间控制和调度注入这两个东西藏在源码里,文档浅尝辄止。解决问题的关键在于理解两件事:虚拟时钟怎么工作,以及怎么把测试调度器塞进被测代码的控制流里。

runTest 虚拟时钟:测试里的时间不走了

kotlinx-coroutines-test 提供了 runTest,它的核心能力是虚拟时钟:所有基于协程调度器的延时操作都不会真实等待,而是记录在未来时间点上,等待被手动推进。

@Test
fun `delay should not really wait`() = runTest {
    val start = System.currentTimeMillis()
    delay(5000) // 虚拟时钟上跳 5 秒
    val elapsed = System.currentTimeMillis() - start
    assertTrue(elapsed < 100) // 实际耗时不到 100ms
}

runTest 内部使用 TestCoroutineScheduler 维护一个虚拟时间轴。当协程调用 delay(n),调度器不会阻塞线程,而是在事件队列里记一条”当前时间 + n 毫秒后恢复”。只有调用 runCurrent() 或让测试协程自然挂起时,调度器才会跳到最近一个待处理事件的时刻去执行。

这里有个坑:runTest 默认只在顶层测试协程空闲时才推进时间。如果被测代码用 launch 起了子协程,且没有 joincancelAndJoin,时间不会自动跳到子协程完成。

@Test
fun `child coroutine needs explicit join`() = runTest {
    var result = ""
    launch {
        delay(1000)
        result = "done"
    }
    // 此时子协程还没跑完,result 仍为 ""
    assertEquals("", result) // 通过,但这不是你想要的
}

解决方式是在 launch 后加 currentCoroutineContext().job.children.forEach { it.join() },或者更直接:StandardTestDispatcher 并手动 runCurrent()

TestDispatcher 调度注入:把时间控制权交给测试

runTest 默认用的是 StandardTestDispatcher,它不会主动推进时间——所有延迟任务都堆积在调度器里,等测试代码显式推进。这才是单元测试需要的控制粒度。

注入 TestDispatcher 有两种场景。

场景一:被测函数接受 CoroutineContext

最直接的方案是让被测方接受 CoroutineContext 参数(默认 Dispatchers.Main),测试时传入 TestDispatcher

class UserRepository(
    private val api: Api,
    private val dispatcher: CoroutineContext = Dispatchers.IO
) {
    suspend fun fetchUser(id: String): User = withContext(dispatcher) {
        delay(300) // 模拟网络延迟
        api.getUser(id)
    }
}

@Test
fun `fetchUser with injected dispatcher`() = runTest {
    val repo = UserRepository(fakeApi, coroutineContext)
    val user = repo.fetchUser("1")
    assertEquals("test_user", user.name)
}

这里直接把 runTestcoroutineContext(也就是 TestDispatcher)传入,300ms 延迟瞬时完成。

场景二:ViewModel 里硬编码了 Dispatchers.Main

Android 开发里最常见的场景:viewModelScope.launch 开在 Dispatchers.Main 上,你没法直接传参注入。这时候要用 Dispatchers.setMain

@Before
fun setup() {
    Dispatchers.setMain(StandardTestDispatcher())
}

@After
fun tearDown() {
    Dispatchers.resetMain()
}

@Test
fun `viewModel fetches and updates state`() = runTest {
    val viewModel = MyViewModel(fakeRepo)
    advanceUntilIdle() // 推进所有待处理任务
    assertEquals(Loaded("data"), viewModel.state.value)
}

advanceUntilIdle() 会持续推进虚拟时间,直到调度器里没有任何待执行任务。对于 ViewModel 里一连串 launchdelay → 更新 StateFlow 的逻辑,这一步就全搞定了。

需要精细控制时用 advanceTimeBy(duration),比如测试超时逻辑:

@Test
fun `timeout triggers fallback`() = runTest {
    val viewModel = MyViewModel(fakeSlowRepo)
    advanceTimeBy(5100) // 跳过 5s 超时阈值
    assertEquals(Fallback("timeout"), viewModel.state.value)
}

我踩过一个坑:如果 @Before 里设了 Dispatchers.setMain(testDispatcher)runTest 内部又创建了新的 TestDispatcher,两者不是同一个调度器实例。advanceUntilIdle() 推进的是 runTest 的调度器,而 ViewModel 里 viewModelScope.launch 跑在你设的 testDispatcher 上——两者脱钩,测试会一直挂起。解决方案是让 runTest 使用同一个实例:

val testDispatcher = StandardTestDispatcher()

@Before
fun setup() {
    Dispatchers.setMain(testDispatcher)
}

@Test
fun `shared dispatcher works`() = runTest(testDispatcher) { ... }

Turbine:Flow 事件断言不再手写 Collector

StateFlow 的测试直接用 state.value 取值就行。但普通 Flow 没有热值,你得手动 collect。最早我是这么写的:

@Test
fun `manual collect is tedious`() = runTest {
    val flow = repo.observeEvents()
    val events = mutableListOf<Event>()
    val job = launch { flow.toList(events) }
    advanceUntilIdle()
    job.cancel()
    assertEquals(listOf(Event.Start, Event.Data), events)
}

写了两三个 case 就觉得不对劲:每次都要 launchtoListcancel,出错了堆栈还不清晰。后来换了 Turbine,这是一个专为 Flow 测试设计的轻量库,核心就两个 API:test()awaitItem()

@Test
fun `flow emits loading then data`() = runTest {
    repo.observeData().test {
        assertEquals(Loading, awaitItem())
        assertEquals(Data("result"), awaitItem())
        awaitComplete() // 断言 Flow 正常完结
    }
}

test { } 会在内部起一个 collect 协程,awaitItem() 挂起等待下一个 emit 的值,awaitComplete() 断言 Flow 已经关闭。所有操作都在 runTest 的虚拟时钟下,delay 一样不需要真等。

如果被测 Flow 只会 emit 有限次然后完结,awaitComplete() 必须调——不然 Turbine 会一直等下一个值,测试挂起超时。

需要测试 Flow 抛异常时用 awaitError()

repo.errorFlow().test {
    assertEquals(Start, awaitItem())
    assertEquals("Network error", (awaitError() as IOException).message)
}

Turbine 的 cancelAndIgnoreRemainingEvents() 在只需要验证前几个事件的场景很好用:

flow.test {
    assertEquals("first", awaitItem())
    cancelAndIgnoreRemainingEvents()
}

不调这个的话,Turbine 在 test {} 结束时发现 Flow 还没完结,会抛 CancellationException,导致测试标记为失败。

工程实践:那些文档不会告诉你的东西

flowOn 的调度问题。如果被测 Flow 链里用了 flowOn(Dispatchers.IO),在 runTest 下主调度器是 TestDispatcher,但 flowOn 创建的内部调度器不受你的 Dispatchers.setMain 影响。这个 Flow 确实不会被真阻塞——runTest 会把所有协程全部纳入虚拟时钟,不管它们的调度器是什么。我验证过,flowOn(Dispatchers.IO).delay(5000)runTest 里同样瞬间完成。原理是 runTest 拦截了底层的 DefaultDelay,只要代码跑在协程上下文里,delay 就走虚拟时间。

测试倒计时逻辑。比如一个 60 秒倒计时的 tick Flow:

fun countdownFlow(seconds: Int): Flow<Int> = flow {
    repeat(seconds) { i ->
        emit(seconds - i)
        delay(1000)
    }
}

@Test
fun `countdown emits all ticks instantly`() = runTest {
    countdownFlow(60).test {
        (60 downTo 1).forEach { assertEquals(it, awaitItem()) }
        awaitComplete()
    }
}

60 次 awaitItem() + delay(1000),在虚拟时钟下全部瞬间完成。真要在真实环境测这个,每个 case 等一分钟,CI 早就爆了。

复杂状态的完整性断言。不要只断言最后一个值,每条事件链都要覆盖:

viewModel.events.test {
    // 前两个事件与初始化相关
    assertEquals(Init, awaitItem())
    assertEquals(Loading, awaitItem())
    
    // 触发动作后
    viewModel.onAction(Refresh)
    assertEquals(Data(emptyList()), awaitItem())
    assertEquals(Idle, awaitItem())
    
    cancelAndIgnoreRemainingEvents()
}

按事件序列逐一断言,比只测 state.value 更能发现中间态的 bug——比如 Loading 和 Data 之间有没有多 emit 一次空数据导致 UI 闪烁,这种问题只在序列测试里暴露。

几点实践建议。第一,新项目从一开始就让 Repository / UseCase 接受 CoroutineContext 参数,别把 Dispatchers.Main 写死在类里,这是最低成本的测试友好设计。第二,runTesttestTimeLimit 参数设一个合理值(默认 10s),超过这个时间虚拟时钟没被推进完,测试直接失败——这能帮你快速发现”忘了 advanceUntilIdle”的问题。第三,Turbine 适合事件序列验证,StateFlow 就用 state.value,不要用 Turbine 去测 StateFlow,两种场景两种工具。