言いたいことはそれだけか

KotlinとかAndroidとかが好きです。調べたことをメモします。٩( 'ω' )و

複数のsubscribersに対応したFlowとテストの書き方

MVVMアーキテクチャにおいて、一つのModelを複数のView Modelで共有している場合、あるイベントを複数のView Modelで同時に受け取りたいことがある。これをKotlin Coroutine Flowを使って書くとどうなるのかなぁと思ったのでやってみたメモ。テストも書いた。

前回のエントリでPaging Library 3の内部実装を読んだ際に同じようなことをやっているようなコード があったので参考にした。

muumuutech.hatenablog.com

実装例

例として「requestがあったらremote source (api serverとか)から何か値を取ってきてflowにemitする」ようなservice classを実装する。コード例はこちらのgistにまとめてある。

Example of Kotlin Coroutine Flow with multiple subscribers · GitHub

まずはクラス定義。コンストラクタ引数にCoroutineDispatcherを渡せるようにしておく。これはテストの時にTestCoroutineDispatcherでcoroutineを動かすために必要。デフォルト引数でDispatchers.IOを指定し、テスト以外では引数を指定しなくて良いようにしておいた。

class FooService(private val dispatcher: CoroutineDispatcher = Dispatchers.IO) {

privateなchannelを定義する。複数のsubscribersに対応できるようにConflatedBroadcastChannelを使う。

    @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
    private val fooChannel = ConflatedBroadcastChannel<Unit>()

上記のchannelをasFlow()を使ってflowに変換し、class外にexposeする。

    @OptIn(
        kotlinx.coroutines.FlowPreview::class,
        kotlinx.coroutines.ExperimentalCoroutinesApi::class
    )
    val fooFlow = fooChannel.asFlow().map { getFooFromRemoteSource() }.flowOn(dispatcher)

    private fun getFooFromRemoteSource(): String {
        val result = "result" // ここでremote sourceから値をfetchすることを想定
        return result
    }

これでchannelにeventが流れてきた時にmap節の中で変換した値をflowに流すことができる。今回はuserのclick動作など、引数がない場合を想定したのでUnitを流しているが、引数が必要であればUnitの代わりにそのtypeを定義する。mapで変換した後にflowOn()でどのdispatcherを使うか指定している。(この理由は上記で書いた通りtestの時に必要になるから。)

ここでmap節のなかでwithContext()などで動作するthreadを指定していると、testが動いているthreadと切り替えが発生し、処理が並列で動いた結果期待した挙動にならないことがある。(私はこれに気がつかずかなり時間を消費してしまった😩)

最後に、channelがprivateなので外からtriggerできるような関数を定義する。

    @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
    fun requestFoo() {
        fooChannel.offer(Unit)
    }

テスト例

上記のservice classのunit test codeを書いていく。

Coroutineのtestなので、test scopeとtest dispatcherを用意する。

    @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
    private val testScope = TestCoroutineScope()

    @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
    private val dispatcher = TestCoroutineDispatcher()

@Beforeなfunctionでテスト対象を初期化。

    private lateinit var fooService: FooService

    @Before
    fun setup() {
        fooService = FooService(dispatcher)
    }

requestFoo()がtriggerとなり、fooFlowに値が流れてくることを確認するテスト。

   @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
    @Test
    fun `requestFoo() should trigger getting foo and fooFlow emit foo`() {
        testScope.runBlockingTest {
            val foo = mutableListOf<String>()
            val job = launch {
                fooService.fooFlow.collect { foo.add(it) }
            }

            // ここではまだfooは空
            assertThat(foo).isEmpty()

            fooService.requestFoo()

            assertThat(foo).isEqualTo(listOf("result"))

            job.cancel()
        }
    }

TestCoroutineScope.runBlockingTestはまだまだ不安定という記事をどこかで見かけた気がしたが、自分の環境(1.3.7)では上記のコードは問題なく動いていた。ただ、この例だと特にdelayが発生しないのでrunBlockingTestを使う理由も特にない。気になる人はrunBlockingを使えば良いと思う。
また、fooをlistにするべきかは議論があるかもしれないが、個人的には「一度のtriggerで一回しか値が流れてこないこと」を確認できるのでlistで良いと思っている。