Datastoreのエラーハンドリングの仕組みを調べたメモ
Android JetpackのDatastoreがリリースされた。1
非同期処理でread/writeができるshared preferenceみたいなものかというぼんやりした理解だったが、このつぶやきを拝見して
SharedPreference 、ファイル IO がサイレントに失敗してて保存できてなくて次回起動時?に未保存状態になるらしいんだけど、まぁ気がつけないので問題を問題として認識できないという問題がある
— wada811 (@wada811) 2020年9月24日
この問題を認識しないと DataStore は意味不明だよね
実際どれくらい発生してるのかも謎なので難しい
Datastoreはどうやってエラーハンドリングしているのか興味を持ったので、内部実装を調べてみた。(ver. 1.0.0-alpha01
)
Datastore内部実装
まず、writeに失敗してIOExceptionが出るのはここ
internal class SingleProcessDataStore<T>( internal fun writeData(newData: T) { file.createParentDirectories() val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX) try { FileOutputStream(scratchFile).use { stream -> serializer.writeTo(newData, stream) stream.fd.sync() // TODO(b/151635324): fsync the directory, otherwise a badly timed crash could // result in reverting to a previous state. } if (!scratchFile.renameTo(file)) { throw IOException("$scratchFile could not be renamed to $file") } } catch (ex: IOException) { if (scratchFile.exists()) { scratchFile.delete() } throw ex } }
ここで投げられたexceptionはdownstream channelに流れる。
private val actor: SendChannel<Message<T>> = scope.actor( capacity = UNLIMITED ) { try { messageConsumer@ for (msg in channel) { if (msg.dataChannel.isClosedForSend) { // The message was sent with an old, now closed, dataChannel. This means that // our read failed. continue@messageConsumer } try { readAndInitOnce(msg.dataChannel) } catch (ex: Throwable) { resetDataChannel(ex) continue@messageConsumer } // We have successfully read data and sent it to downstreamChannel. if (msg is Message.Update) { msg.ack.completeWith( runCatching { // この関数の中で上記のwriteData()がcallされている transformAndWrite(msg.transform, downstreamChannel()) } ) } } } finally { // The scope has been cancelled. Cancel downstream in case there are any collectors // still active. downstreamChannel().cancel() } }
runCatching{}
はblock内で発生したexceptionをResultでwrapして返却するので、exceptionはcompleteWith()
の引数に渡される。
Message.Update
のackはupdateData()
でawaitされてreturnしている。
override suspend fun updateData(transform: suspend (t: T) -> T): T { val ack = CompletableDeferred<T>() val dataChannel = downstreamChannel() val updateMsg = Message.Update<T>(transform, ack, dataChannel) actor.send(updateMsg) // If no read has succeeded yet, we need to wait on the result of the next read so we can // bubble exceptions up to the caller. Read exceptions are not bubbled up through ack. if (dataChannel.valueOrNull == null) { dataChannel.asFlow().first() } // Wait with same scope as the actor, so we're not waiting on a cancelled actor. return withContext(scope.coroutineContext) { ack.await() } }
で、このupdateData()
はDataStore#edit()
でcallされる。
suspend fun DataStore<Preferences>.edit( transform: suspend (MutablePreferences) -> Unit ): Preferences { return this.updateData { // It's safe to return MutablePreferences since we make a defensive copy in // PreferencesDataStore.updateData() it.toMutablePreferences().apply { transform(this) } } }
こうやって書けばエラーをハンドリングできる
Datastoreの使い方の概要ページ見たらeditをコールする時にexceptionのcatchとかしてないけど、disk書き込みエラーがたまーに発生するのであればちゃんとcatchした方がいいと思った。
下記は既存の値を読み込んで1足して保存する場合の例。
try { dataStore.edit { settings -> val currentValue = settings[KEY_FOO]?.toMutableSet() ?: mutableSetOf() settings[KEY_FOO] = currentValue + 1 } } catch (e: IOException) { // do something if need (e.g. retry) }
ちなみにshared preferenceのapply)は特にexceptionを投げてくれないのでそもそもエラーを検出できないっぽい。