Makes task configurable to allow choosing threads

This commit is contained in:
ganfra 2018-12-17 16:42:22 +01:00
parent 765a34bcf4
commit d9e24558ec
18 changed files with 144 additions and 76 deletions

View File

@ -6,7 +6,7 @@ import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.InstrumentedTest
import im.vector.matrix.android.LiveDataTestObserver
import im.vector.matrix.android.api.thread.MainThreadExecutor
import im.vector.matrix.android.internal.TaskExecutor
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.session.room.timeline.DefaultTimelineHolder
import im.vector.matrix.android.internal.session.room.timeline.TimelineBoundaryCallback
import im.vector.matrix.android.internal.session.room.timeline.TokenChunkEventPersistor

View File

@ -1,9 +0,0 @@
package im.vector.matrix.android.internal

import arrow.core.Try

interface Task<in PARAMS, out RESULT> {

fun execute(params: PARAMS): Try<RESULT>

}

View File

@ -1,27 +0,0 @@
package im.vector.matrix.android.internal

import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.util.CancelableCoroutine
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import timber.log.Timber

internal class TaskExecutor(private val coroutineDispatchers: MatrixCoroutineDispatchers) {

fun <PARAMS, RESULT> executeTask(task: Task<PARAMS, RESULT>,
params: PARAMS,
callback: MatrixCallback<RESULT>): Cancelable {
val job = GlobalScope.launch(coroutineDispatchers.main) {
val resultOrFailure = withContext(coroutineDispatchers.io) {
Timber.v("Executing ${task.javaClass} on ${Thread.currentThread().name}")
task.execute(params)
}
resultOrFailure.fold({ callback.onFailure(it) }, { callback.onSuccess(it) })
}
return CancelableCoroutine(job)
}

}

View File

@ -1,7 +1,7 @@
package im.vector.matrix.android.internal.di

import im.vector.matrix.android.api.MatrixOptions
import im.vector.matrix.android.internal.TaskExecutor
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.Dispatchers

View File

@ -5,7 +5,7 @@ import arrow.core.fix
import arrow.instances.`try`.monad.monad
import arrow.typeclasses.binding
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.database.model.GroupSummaryEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.network.executeRequest

View File

@ -14,7 +14,8 @@ import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.api.session.room.model.MyMembership
import im.vector.matrix.android.api.session.room.model.RoomSummary
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.TaskExecutor
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
@ -59,7 +60,7 @@ internal data class DefaultRoom(
} else {
val token = syncTokenStore.getLastToken()
val params = LoadRoomMembersTask.Params(roomId, token, Membership.LEAVE)
taskExecutor.executeTask(loadRoomMembersTask, params, object : MatrixCallback<Boolean> {})
loadRoomMembersTask.configureWith(params).executeBy(taskExecutor)
}
}


View File

@ -3,7 +3,7 @@ package im.vector.matrix.android.internal.session.room.members
import arrow.core.Try
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.database.helper.addStateEvents
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.query.where

View File

@ -1,7 +1,7 @@
package im.vector.matrix.android.internal.session.room.timeline

import arrow.core.Try
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.util.FilterUtil

View File

@ -2,7 +2,7 @@ package im.vector.matrix.android.internal.session.room.timeline

import arrow.core.Try
import arrow.core.failure
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.util.FilterUtil

View File

@ -4,11 +4,11 @@ import android.arch.lifecycle.LiveData
import android.arch.paging.LivePagedListBuilder
import android.arch.paging.PagedList
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.session.events.interceptor.EnrichedEventInterceptor
import im.vector.matrix.android.api.session.events.model.EnrichedEvent
import im.vector.matrix.android.api.session.room.TimelineHolder
import im.vector.matrix.android.internal.TaskExecutor
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
import im.vector.matrix.android.internal.database.model.EventEntity
@ -78,7 +78,7 @@ internal class DefaultTimelineHolder(private val roomId: String,
private fun fetchEventIfNeeded(eventId: String) {
if (!isEventPersisted(eventId)) {
val params = GetContextOfEventTask.Params(roomId, eventId)
taskExecutor.executeTask(contextOfEventTask, params, object : MatrixCallback<TokenChunkEvent> {})
contextOfEventTask.configureWith(params).executeBy(taskExecutor)
}
}


View File

@ -2,7 +2,7 @@ package im.vector.matrix.android.internal.session.room.timeline

import arrow.core.Try
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI


View File

@ -4,7 +4,8 @@ import android.arch.paging.PagedList
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.session.events.model.EnrichedEvent
import im.vector.matrix.android.internal.TaskExecutor
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.query.findAllIncludingEvents
import im.vector.matrix.android.internal.util.PagingRequestHelper
@ -51,17 +52,17 @@ internal class TimelineBoundaryCallback(private val roomId: String,
direction = direction,
limit = limit)

taskExecutor.executeTask(paginationTask, params, createCallback(requestCallback))
paginationTask.configureWith(params)
.dispatchTo(object : MatrixCallback<TokenChunkEvent> {
override fun onSuccess(data: TokenChunkEvent) {
requestCallback.recordSuccess()
}

override fun onFailure(failure: Throwable) {
requestCallback.recordFailure(failure)
}
})
.executeBy(taskExecutor)
}


private fun createCallback(pagingRequestCallback: PagingRequestHelper.Request.Callback) = object : MatrixCallback<TokenChunkEvent> {
override fun onSuccess(data: TokenChunkEvent) {
pagingRequestCallback.recordSuccess()
}

override fun onFailure(failure: Throwable) {
pagingRequestCallback.recordFailure(failure)
}
}
}

View File

@ -1,7 +1,7 @@
package im.vector.matrix.android.internal.session.sync

import arrow.core.Try
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.filter.FilterBody
import im.vector.matrix.android.internal.session.sync.model.SyncResponse

View File

@ -3,7 +3,9 @@ package im.vector.matrix.android.internal.session.sync.job
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.TaskExecutor
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.TaskThread
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.session.sync.SyncTokenStore
@ -80,21 +82,27 @@ internal class SyncThread(private val syncTask: SyncTask,
Timber.v("Execute sync request...")
val latch = CountDownLatch(1)
val params = SyncTask.Params(nextBatch)
cancelableTask = taskExecutor.executeTask(syncTask, params, object : MatrixCallback<SyncResponse> {
override fun onSuccess(data: SyncResponse) {
nextBatch = data.nextBatch
syncTokenStore.saveToken(nextBatch)
latch.countDown()
}
cancelableTask = syncTask.configureWith(params)
.callbackOn(TaskThread.CALLER)
.executeOn(TaskThread.CALLER)
.dispatchTo(object : MatrixCallback<SyncResponse> {
override fun onSuccess(data: SyncResponse) {
nextBatch = data.nextBatch
syncTokenStore.saveToken(nextBatch)
latch.countDown()
}

override fun onFailure(failure: Throwable) {
if (failure !is Failure.NetworkConnection) {
// Wait 10s before retrying
sleep(RETRY_WAIT_TIME_MS)
}
latch.countDown()
}

})
.executeBy(taskExecutor)

override fun onFailure(failure: Throwable) {
if (failure !is Failure.NetworkConnection) {
// Wait 10s before retrying
sleep(RETRY_WAIT_TIME_MS)
}
latch.countDown()
}
})
latch.await()
}
}

View File

@ -0,0 +1,42 @@
package im.vector.matrix.android.internal.task

import arrow.core.Try
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.util.Cancelable

internal fun <PARAMS, RESULT> Task<PARAMS, RESULT>.configureWith(params: PARAMS): ConfigurableTask<PARAMS, RESULT> {
return ConfigurableTask(this, params)
}

internal data class ConfigurableTask<PARAMS, RESULT>(
val task: Task<PARAMS, RESULT>,
val params: PARAMS,
val callbackThread: TaskThread = TaskThread.MAIN,
val executionThread: TaskThread = TaskThread.IO,
val callback: MatrixCallback<RESULT> = object : MatrixCallback<RESULT> {}
) : Task<PARAMS, RESULT> {


override fun execute(params: PARAMS): Try<RESULT> {
return task.execute(params)
}

fun callbackOn(thread: TaskThread): ConfigurableTask<PARAMS, RESULT> {
return copy(callbackThread = thread)
}

fun executeOn(thread: TaskThread): ConfigurableTask<PARAMS, RESULT> {
return copy(executionThread = thread)
}

fun dispatchTo(matrixCallback: MatrixCallback<RESULT>): ConfigurableTask<PARAMS, RESULT> {
return copy(callback = matrixCallback)
}

fun executeBy(taskExecutor: TaskExecutor): Cancelable {
return taskExecutor.execute(this)
}

}


View File

@ -0,0 +1,10 @@
package im.vector.matrix.android.internal.task

import arrow.core.Try

internal interface Task<PARAMS, RESULT> {

fun execute(params: PARAMS): Try<RESULT>

}

View File

@ -0,0 +1,34 @@
package im.vector.matrix.android.internal.task

import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.util.CancelableCoroutine
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import timber.log.Timber
import kotlin.coroutines.EmptyCoroutineContext

internal class TaskExecutor(private val coroutineDispatchers: MatrixCoroutineDispatchers) {

fun <PARAMS, RESULT> execute(task: ConfigurableTask<PARAMS, RESULT>): Cancelable {

val job = GlobalScope.launch(task.callbackThread.toDispatcher()) {
val resultOrFailure = withContext(task.executionThread.toDispatcher()) {
Timber.v("Executing ${task.javaClass} on ${Thread.currentThread().name}")
task.execute(task.params)
}
resultOrFailure.fold({ task.callback.onFailure(it) }, { task.callback.onSuccess(it) })
}
return CancelableCoroutine(job)
}

private fun TaskThread.toDispatcher() = when (this) {
TaskThread.MAIN -> coroutineDispatchers.main
TaskThread.COMPUTATION -> coroutineDispatchers.computation
TaskThread.IO -> coroutineDispatchers.io
TaskThread.CALLER -> EmptyCoroutineContext
}


}

View File

@ -0,0 +1,8 @@
package im.vector.matrix.android.internal.task

internal enum class TaskThread {
MAIN,
COMPUTATION,
IO,
CALLER
}