Introduce retry on task executor and use it for pagination

This commit is contained in:
ganfra 2019-01-07 12:12:22 +01:00
parent d288fb7c9c
commit 7669a94a64
4 changed files with 38 additions and 28 deletions

View File

@ -4,10 +4,10 @@ import android.arch.paging.PagedList
import com.zhuinden.monarchy.Monarchy import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.MatrixCallback import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.session.events.model.EnrichedEvent import im.vector.matrix.android.api.session.events.model.EnrichedEvent
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.database.model.ChunkEntity import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.query.findAllIncludingEvents import im.vector.matrix.android.internal.database.query.findAllIncludingEvents
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.PagingRequestHelper import im.vector.matrix.android.internal.util.PagingRequestHelper
import java.util.* import java.util.*


@ -53,6 +53,7 @@ internal class TimelineBoundaryCallback(private val roomId: String,
limit = limit) limit = limit)


paginationTask.configureWith(params) paginationTask.configureWith(params)
.enableRetry()
.dispatchTo(object : MatrixCallback<TokenChunkEvent> { .dispatchTo(object : MatrixCallback<TokenChunkEvent> {
override fun onSuccess(data: TokenChunkEvent) { override fun onSuccess(data: TokenChunkEvent) {
requestCallback.recordSuccess() requestCallback.recordSuccess()

View File

@ -13,6 +13,7 @@ internal data class ConfigurableTask<PARAMS, RESULT>(
val params: PARAMS, val params: PARAMS,
val callbackThread: TaskThread = TaskThread.MAIN, val callbackThread: TaskThread = TaskThread.MAIN,
val executionThread: TaskThread = TaskThread.IO, val executionThread: TaskThread = TaskThread.IO,
val retryCount: Int = 0,
val callback: MatrixCallback<RESULT> = object : MatrixCallback<RESULT> {} val callback: MatrixCallback<RESULT> = object : MatrixCallback<RESULT> {}
) : Task<PARAMS, RESULT> { ) : Task<PARAMS, RESULT> {


@ -33,10 +34,18 @@ internal data class ConfigurableTask<PARAMS, RESULT>(
return copy(callback = matrixCallback) return copy(callback = matrixCallback)
} }


fun enableRetry(retryCount: Int = Int.MAX_VALUE): ConfigurableTask<PARAMS, RESULT> {
return copy(retryCount = retryCount)
}

fun executeBy(taskExecutor: TaskExecutor): Cancelable { fun executeBy(taskExecutor: TaskExecutor): Cancelable {
return taskExecutor.execute(this) return taskExecutor.execute(this)
} }


override fun toString(): String {
return task.javaClass.name
}

} }





View File

@ -1,9 +1,11 @@
package im.vector.matrix.android.internal.task package im.vector.matrix.android.internal.task


import arrow.core.Try
import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.util.CancelableCoroutine import im.vector.matrix.android.internal.util.CancelableCoroutine
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import timber.log.Timber import timber.log.Timber
@ -15,14 +17,36 @@ internal class TaskExecutor(private val coroutineDispatchers: MatrixCoroutineDis


val job = GlobalScope.launch(task.callbackThread.toDispatcher()) { val job = GlobalScope.launch(task.callbackThread.toDispatcher()) {
val resultOrFailure = withContext(task.executionThread.toDispatcher()) { val resultOrFailure = withContext(task.executionThread.toDispatcher()) {
Timber.v("Executing ${task.javaClass} on ${Thread.currentThread().name}") Timber.v("Executing $task on ${Thread.currentThread().name}")
task.execute(task.params) retry(task.retryCount) {
task.execute(task.params)
}
} }
resultOrFailure.fold({ task.callback.onFailure(it) }, { task.callback.onSuccess(it) }) resultOrFailure.fold({ task.callback.onFailure(it) }, { task.callback.onSuccess(it) })
} }
return CancelableCoroutine(job) return CancelableCoroutine(job)
} }


private suspend fun <T> retry(
times: Int = Int.MAX_VALUE,
initialDelay: Long = 100, // 0.1 second
maxDelay: Long = 10_000, // 10 second
factor: Double = 2.0,
block: suspend () -> Try<T>): Try<T> {

var currentDelay = initialDelay
repeat(times - 1) {
val blockResult = block()
if (blockResult.isSuccess()) {
return blockResult
} else {
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
}
return block()
}

private fun TaskThread.toDispatcher() = when (this) { private fun TaskThread.toDispatcher() = when (this) {
TaskThread.MAIN -> coroutineDispatchers.main TaskThread.MAIN -> coroutineDispatchers.main
TaskThread.COMPUTATION -> coroutineDispatchers.computation TaskThread.COMPUTATION -> coroutineDispatchers.computation

View File

@ -1,24 +0,0 @@
package im.vector.matrix.android.internal.util

import arrow.core.Try
import kotlinx.coroutines.delay

suspend fun <T> retry(
times: Int = Int.MAX_VALUE,
initialDelay: Long = 100, // 0.1 second
maxDelay: Long = 10_000, // 10 second
factor: Double = 2.0,
block: suspend () -> Try<T>): Try<T> {

var currentDelay = initialDelay
repeat(times - 1) {
val blockResult = block()
if (blockResult.isSuccess()) {
return blockResult
} else {
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
}
return block()
}