Introduce Task interface and TaskExecutor to centralize task processing and easier testing (replace Request naming)

This commit is contained in:
ganfra 2018-12-14 17:39:25 +01:00
parent 168814149b
commit 5cc128cbf3
23 changed files with 292 additions and 303 deletions

View File

@ -56,10 +56,12 @@ dependencies {
def support_version = '28.0.0'
def moshi_version = '1.8.0'
def lifecycle_version = "1.1.1"
def coroutines_version = "1.0.1"

implementation fileTree(dir: 'libs', include: ['*.aar'])
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.0.0'
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_version"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$coroutines_version"

implementation "com.android.support:appcompat-v7:$support_version"
implementation "com.android.support:recyclerview-v7:$support_version"

View File

@ -0,0 +1,9 @@
package im.vector.matrix.android.internal

import arrow.core.Try

interface Task<in PARAMS, out RESULT> {

fun execute(params: PARAMS): Try<RESULT>

}

View File

@ -0,0 +1,27 @@
package im.vector.matrix.android.internal

import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.util.CancelableCoroutine
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import timber.log.Timber

internal class TaskExecutor(private val coroutineDispatchers: MatrixCoroutineDispatchers) {

fun <PARAMS, RESULT> executeTask(task: Task<PARAMS, RESULT>,
params: PARAMS,
callback: MatrixCallback<RESULT>): Cancelable {
val job = GlobalScope.launch(coroutineDispatchers.main) {
val resultOrFailure = withContext(coroutineDispatchers.io) {
Timber.v("Executing ${task.javaClass} on ${Thread.currentThread().name}")
task.execute(params)
}
resultOrFailure.fold({ callback.onFailure(it) }, { callback.onSuccess(it) })
}
return CancelableCoroutine(job)
}

}

View File

@ -1,11 +1,10 @@
package im.vector.matrix.android.internal.di

import im.vector.matrix.android.api.MatrixOptions
import im.vector.matrix.android.api.thread.MainThreadExecutor
import im.vector.matrix.android.internal.TaskExecutor
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import org.koin.dsl.module.module


@ -18,7 +17,11 @@ class MatrixModule(private val options: MatrixOptions) {
}

single {
MatrixCoroutineDispatchers(io = Dispatchers.IO, computation = Dispatchers.IO, main = MainThreadExecutor().asCoroutineDispatcher())
MatrixCoroutineDispatchers(io = Dispatchers.IO, computation = Dispatchers.IO, main = Dispatchers.Main)
}

single {
TaskExecutor(get())
}

single {

View File

@ -5,38 +5,30 @@ import arrow.core.fix
import arrow.instances.`try`.monad.monad
import arrow.typeclasses.binding
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.database.model.GroupSummaryEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.group.model.GroupRooms
import im.vector.matrix.android.internal.session.group.model.GroupSummaryResponse
import im.vector.matrix.android.internal.session.group.model.GroupUsers
import im.vector.matrix.android.internal.util.CancelableCoroutine
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import im.vector.matrix.android.internal.util.tryTransactionSync
import io.realm.kotlin.createObject
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch

internal class GetGroupDataRequest(
internal interface GetGroupDataTask : Task<GetGroupDataTask.Params, Unit> {

data class Params(val groupId: String)

}


internal class DefaultGetGroupDataTask(
private val groupAPI: GroupAPI,
private val monarchy: Monarchy,
private val coroutineDispatchers: MatrixCoroutineDispatchers
) {
private val monarchy: Monarchy
) : GetGroupDataTask {

fun execute(groupId: String,
callback: MatrixCallback<Boolean>
): Cancelable {
val job = GlobalScope.launch(coroutineDispatchers.main) {
val groupOrFailure = getGroupData(groupId)
groupOrFailure.fold({ callback.onFailure(it) }, { callback.onSuccess(true) })
}
return CancelableCoroutine(job)
}

fun getGroupData(groupId: String): Try<Unit> {
override fun execute(params: GetGroupDataTask.Params): Try<Unit> {
val groupId = params.groupId
return Try.monad().binding {

val groupSummary = executeRequest<GroupSummaryResponse> {
@ -55,6 +47,7 @@ internal class GetGroupDataRequest(
}.fix()
}


private fun insertInDb(groupSummary: GroupSummaryResponse,
groupRooms: GroupRooms,
groupUsers: GroupUsers,
@ -77,7 +70,6 @@ internal class GetGroupDataRequest(
groupSummaryEntity.userIds.clear()
groupSummaryEntity.userIds.addAll(userIds)


}
}

View File

@ -20,7 +20,7 @@ internal class GetGroupDataWorker(context: Context,
val deletionIndexes: List<Int>
)

private val getGroupDataRequest by inject<GetGroupDataRequest>()
private val getGroupDataTask by inject<GetGroupDataTask>()

override fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
@ -35,7 +35,7 @@ internal class GetGroupDataWorker(context: Context,
}

private fun fetchGroupData(groupId: String): Try<Unit> {
return getGroupDataRequest.getGroupData(groupId)
return getGroupDataTask.execute(GetGroupDataTask.Params(groupId))
}

}

View File

@ -14,7 +14,7 @@ class GroupModule {
}

scope(DefaultSession.SCOPE) {
GetGroupDataRequest(get(), get(), get())
DefaultGetGroupDataTask(get(), get()) as GetGroupDataTask
}

}

View File

@ -14,12 +14,13 @@ import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.api.session.room.model.MyMembership
import im.vector.matrix.android.api.session.room.model.RoomSummary
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.TaskExecutor
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntityFields
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.room.members.LoadRoomMembersRequest
import im.vector.matrix.android.internal.session.room.members.LoadRoomMembersTask
import im.vector.matrix.android.internal.session.sync.SyncTokenStore
import org.koin.core.parameter.parametersOf
import org.koin.standalone.KoinComponent
@ -30,11 +31,12 @@ internal data class DefaultRoom(
override val myMembership: MyMembership
) : Room, KoinComponent {

private val loadRoomMembersRequest by inject<LoadRoomMembersRequest>()
private val loadRoomMembersTask by inject<LoadRoomMembersTask>()
private val syncTokenStore by inject<SyncTokenStore>()
private val monarchy by inject<Monarchy>()
private val timelineHolder by inject<TimelineHolder> { parametersOf(roomId) }
private val sendService by inject<SendService> { parametersOf(roomId) }
private val taskExecutor by inject<TaskExecutor>()

override val roomSummary: LiveData<RoomSummary> by lazy {
val liveData = monarchy
@ -56,7 +58,8 @@ internal data class DefaultRoom(
object : Cancelable {}
} else {
val token = syncTokenStore.getLastToken()
loadRoomMembersRequest.execute(roomId, token, Membership.LEAVE, object : MatrixCallback<Boolean> {})
val params = LoadRoomMembersTask.Params(roomId, token, Membership.LEAVE)
taskExecutor.executeTask(loadRoomMembersTask, params, object : MatrixCallback<Boolean> {})
}
}


View File

@ -5,9 +5,16 @@ import im.vector.matrix.android.api.session.room.SendService
import im.vector.matrix.android.api.session.room.TimelineHolder
import im.vector.matrix.android.api.session.room.send.EventFactory
import im.vector.matrix.android.internal.session.DefaultSession
import im.vector.matrix.android.internal.session.room.members.LoadRoomMembersRequest
import im.vector.matrix.android.internal.session.room.members.DefaultLoadRoomMembersTask
import im.vector.matrix.android.internal.session.room.members.LoadRoomMembersTask
import im.vector.matrix.android.internal.session.room.send.DefaultSendService
import im.vector.matrix.android.internal.session.room.timeline.*
import im.vector.matrix.android.internal.session.room.timeline.DefaultTimelineHolder
import im.vector.matrix.android.internal.session.room.timeline.DefaultGetContextOfEventTask
import im.vector.matrix.android.internal.session.room.timeline.DefaultPaginationTask
import im.vector.matrix.android.internal.session.room.timeline.GetContextOfEventTask
import im.vector.matrix.android.internal.session.room.timeline.PaginationTask
import im.vector.matrix.android.internal.session.room.timeline.TimelineBoundaryCallback
import im.vector.matrix.android.internal.session.room.timeline.TokenChunkEventPersistor
import im.vector.matrix.android.internal.util.PagingRequestHelper
import org.koin.dsl.module.module
import retrofit2.Retrofit
@ -24,7 +31,7 @@ class RoomModule {
}

scope(DefaultSession.SCOPE) {
LoadRoomMembersRequest(get(), get(), get())
DefaultLoadRoomMembersTask(get(), get()) as LoadRoomMembersTask
}

scope(DefaultSession.SCOPE) {
@ -32,11 +39,11 @@ class RoomModule {
}

scope(DefaultSession.SCOPE) {
PaginationRequest(get(), get(), get())
DefaultPaginationTask(get(), get()) as PaginationTask
}

scope(DefaultSession.SCOPE) {
GetContextOfEventRequest(get(), get(), get())
DefaultGetContextOfEventTask(get(), get()) as GetContextOfEventTask
}

scope(DefaultSession.SCOPE) {
@ -46,8 +53,8 @@ class RoomModule {

factory { (roomId: String) ->
val helper = PagingRequestHelper(Executors.newSingleThreadExecutor())
val timelineBoundaryCallback = TimelineBoundaryCallback(roomId, get(), get(), helper)
DefaultTimelineHolder(roomId, get(), timelineBoundaryCallback, get()) as TimelineHolder
val timelineBoundaryCallback = TimelineBoundaryCallback(roomId, get(), get(), get(), helper)
DefaultTimelineHolder(roomId, get(), get() , timelineBoundaryCallback, get()) as TimelineHolder
}

factory { (roomId: String) ->

View File

@ -1,64 +0,0 @@
package im.vector.matrix.android.internal.session.room.members

import arrow.core.Try
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.database.helper.addStateEvents
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.util.CancelableCoroutine
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import im.vector.matrix.android.internal.util.tryTransactionSync
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

internal class LoadRoomMembersRequest(private val roomAPI: RoomAPI,
private val monarchy: Monarchy,
private val coroutineDispatchers: MatrixCoroutineDispatchers) {

fun execute(roomId: String,
streamToken: String?,
excludeMembership: Membership? = null,
callback: MatrixCallback<Boolean>
): Cancelable {
val job = GlobalScope.launch(coroutineDispatchers.main) {
val responseOrFailure = execute(roomId, streamToken, excludeMembership)
responseOrFailure.fold({ callback.onFailure(it) }, { callback.onSuccess(true) })
}
return CancelableCoroutine(job)
}

//TODO : manage stream token (we have 404 on some rooms actually)
private suspend fun execute(roomId: String,
streamToken: String?,
excludeMembership: Membership?) = withContext(coroutineDispatchers.io) {

executeRequest<RoomMembersResponse> {
apiCall = roomAPI.getMembers(roomId, null, null, excludeMembership?.value)
}.flatMap { response ->
insertInDb(response, roomId)
}
}

private fun insertInDb(response: RoomMembersResponse, roomId: String): Try<RoomMembersResponse> {
return monarchy
.tryTransactionSync { realm ->
// We ignore all the already known members
val roomEntity = RoomEntity.where(realm, roomId).findFirst()
?: throw IllegalStateException("You shouldn't use this method without a room")

val roomMembers = RoomMembers(realm, roomId).getLoaded()
val eventsToInsert = response.roomMemberEvents.filter { !roomMembers.containsKey(it.stateKey) }

roomEntity.addStateEvents(eventsToInsert)
roomEntity.areAllMembersLoaded = true
}
.map { response }
}

}

View File

@ -0,0 +1,51 @@
package im.vector.matrix.android.internal.session.room.members

import arrow.core.Try
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.database.helper.addStateEvents
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.util.tryTransactionSync

internal interface LoadRoomMembersTask : Task<LoadRoomMembersTask.Params, Boolean> {

data class Params(
val roomId: String,
val streamToken: String?,
val excludeMembership: Membership? = null
)
}

internal class DefaultLoadRoomMembersTask(private val roomAPI: RoomAPI,
private val monarchy: Monarchy
) : LoadRoomMembersTask {

override fun execute(params: LoadRoomMembersTask.Params): Try<Boolean> {
return executeRequest<RoomMembersResponse> {
apiCall = roomAPI.getMembers(params.roomId, null, null, params.excludeMembership?.value)
}.flatMap { response ->
insertInDb(response, params.roomId)
}.map { true }
}

private fun insertInDb(response: RoomMembersResponse, roomId: String): Try<RoomMembersResponse> {
return monarchy
.tryTransactionSync { realm ->
// We ignore all the already known members
val roomEntity = RoomEntity.where(realm, roomId).findFirst()
?: throw IllegalStateException("You shouldn't use this method without a room")

val roomMembers = RoomMembers(realm, roomId).getLoaded()
val eventsToInsert = response.roomMemberEvents.filter { !roomMembers.containsKey(it.stateKey) }

roomEntity.addStateEvents(eventsToInsert)
roomEntity.areAllMembersLoaded = true
}
.map { response }
}

}

View File

@ -0,0 +1,31 @@
package im.vector.matrix.android.internal.session.room.timeline

import arrow.core.Try
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.util.FilterUtil

internal interface GetContextOfEventTask : Task<GetContextOfEventTask.Params, TokenChunkEvent> {

data class Params(
val roomId: String,
val eventId: String
)

}

internal class DefaultGetContextOfEventTask(private val roomAPI: RoomAPI,
private val tokenChunkEventPersistor: TokenChunkEventPersistor
) : GetContextOfEventTask {

override fun execute(params: GetContextOfEventTask.Params): Try<EventContextResponse> {
val filter = FilterUtil.createRoomEventFilter(true)?.toJSONString()
return executeRequest<EventContextResponse> {
apiCall = roomAPI.getContextOfEvent(params.roomId, params.eventId, 0, filter)
}.flatMap { response ->
tokenChunkEventPersistor.insertInDb(response, params.roomId, PaginationDirection.BACKWARDS).map { response }
}
}

}

View File

@ -0,0 +1,39 @@
package im.vector.matrix.android.internal.session.room.timeline

import arrow.core.Try
import arrow.core.failure
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.util.FilterUtil


internal interface PaginationTask : Task<PaginationTask.Params, TokenChunkEvent> {

data class Params(
val roomId: String,
val from: String?,
val direction: PaginationDirection,
val limit: Int
)

}

internal class DefaultPaginationTask(private val roomAPI: RoomAPI,
private val tokenChunkEventPersistor: TokenChunkEventPersistor
) : PaginationTask {

override fun execute(params: PaginationTask.Params): Try<TokenChunkEvent> {
if (params.from == null) {
return RuntimeException("From token shouldn't be null").failure()
}
val filter = FilterUtil.createRoomEventFilter(true)?.toJSONString()
return executeRequest<PaginationResponse> {
apiCall = roomAPI.getRoomMessagesFrom(params.roomId, params.from, params.direction.value, params.limit, filter)
}.flatMap { chunk ->
tokenChunkEventPersistor
.insertInDb(chunk, params.roomId, params.direction)
.map { chunk }
}
}
}

View File

@ -8,6 +8,7 @@ import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.session.events.interceptor.EnrichedEventInterceptor
import im.vector.matrix.android.api.session.events.model.EnrichedEvent
import im.vector.matrix.android.api.session.room.TimelineHolder
import im.vector.matrix.android.internal.TaskExecutor
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
import im.vector.matrix.android.internal.database.model.EventEntity
@ -22,14 +23,14 @@ private const val PAGE_SIZE = 30

internal class DefaultTimelineHolder(private val roomId: String,
private val monarchy: Monarchy,
private val taskExecutor: TaskExecutor,
private val boundaryCallback: TimelineBoundaryCallback,
private val contextOfEventRequest: GetContextOfEventRequest
private val contextOfEventTask: GetContextOfEventTask
) : TimelineHolder {

private val eventInterceptors = ArrayList<EnrichedEventInterceptor>()

init {
boundaryCallback.limit = 30
eventInterceptors.add(MessageEventInterceptor(monarchy, roomId))
}

@ -76,7 +77,8 @@ internal class DefaultTimelineHolder(private val roomId: String,

private fun fetchEventIfNeeded(eventId: String) {
if (!isEventPersisted(eventId)) {
contextOfEventRequest.execute(roomId, eventId, object : MatrixCallback<EventContextResponse> {})
val params = GetContextOfEventTask.Params(roomId, eventId)
taskExecutor.executeTask(contextOfEventTask, params, object : MatrixCallback<TokenChunkEvent> {})
}
}


View File

@ -1,43 +0,0 @@
package im.vector.matrix.android.internal.session.room.timeline

import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.util.CancelableCoroutine
import im.vector.matrix.android.internal.util.FilterUtil
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

internal class GetContextOfEventRequest(private val roomAPI: RoomAPI,
private val tokenChunkEventPersistor: TokenChunkEventPersistor,
private val coroutineDispatchers: MatrixCoroutineDispatchers
) {

fun execute(roomId: String,
eventId: String,
callback: MatrixCallback<EventContextResponse>
): Cancelable {
val job = GlobalScope.launch(coroutineDispatchers.main) {
val filter = FilterUtil.createRoomEventFilter(true)?.toJSONString()
val contextOrFailure = execute(roomId, eventId, filter)
contextOrFailure.fold({ callback.onFailure(it) }, { callback.onSuccess(it) })
}
return CancelableCoroutine(job)
}

private suspend fun execute(roomId: String,
eventId: String,
filter: String?) = withContext(coroutineDispatchers.io) {

executeRequest<EventContextResponse> {
apiCall = roomAPI.getContextOfEvent(roomId, eventId, 0, filter)
}.flatMap { response ->
tokenChunkEventPersistor.insertInDb(response, roomId, PaginationDirection.BACKWARDS).map { response }
}
}


}

View File

@ -1,39 +0,0 @@
package im.vector.matrix.android.internal.session.room.timeline

import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.util.CancelableCoroutine
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

internal class GetEventRequest(private val roomAPI: RoomAPI,
private val monarchy: Monarchy,
private val coroutineDispatchers: MatrixCoroutineDispatchers
) {

fun execute(roomId: String,
eventId: String,
callback: MatrixCallback<Event>
): Cancelable {
val job = GlobalScope.launch(coroutineDispatchers.main) {
val eventOrFailure = execute(roomId, eventId)
eventOrFailure.fold({ callback.onFailure(it) }, { callback.onSuccess(it) })
}
return CancelableCoroutine(job)
}

private suspend fun execute(roomId: String,
eventId: String) = withContext(coroutineDispatchers.io) {

executeRequest<Event> {
apiCall = roomAPI.getEvent(roomId, eventId)
}
}

}

View File

@ -0,0 +1,22 @@
package im.vector.matrix.android.internal.session.room.timeline

import arrow.core.Try
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI

internal class GetEventTask(private val roomAPI: RoomAPI
) : Task<GetEventTask.Params, Event> {

internal data class Params(
val roomId: String,
val eventId: String
)

override fun execute(params: Params): Try<Event> {
return executeRequest {
apiCall = roomAPI.getEvent(params.roomId, params.eventId)
}
}
}

View File

@ -1,52 +0,0 @@
package im.vector.matrix.android.internal.session.room.timeline

import arrow.core.failure
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.util.CancelableCoroutine
import im.vector.matrix.android.internal.util.FilterUtil
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

internal class PaginationRequest(private val roomAPI: RoomAPI,
private val tokenChunkEventPersistor: TokenChunkEventPersistor,
private val coroutineDispatchers: MatrixCoroutineDispatchers
) {

fun execute(roomId: String,
from: String?,
direction: PaginationDirection,
limit: Int,
callback: MatrixCallback<TokenChunkEvent>
): Cancelable {
val job = GlobalScope.launch(coroutineDispatchers.main) {
val filter = FilterUtil.createRoomEventFilter(true)?.toJSONString()
val chunkOrFailure = execute(roomId, from, direction, limit, filter)
chunkOrFailure.fold({ callback.onFailure(it) }, { callback.onSuccess(it) })
}
return CancelableCoroutine(job)
}

private suspend fun execute(roomId: String,
from: String?,
direction: PaginationDirection,
limit: Int,
filter: String?) = withContext(coroutineDispatchers.io) {

if (from == null) {
return@withContext RuntimeException("From token shouldn't be null").failure<TokenChunkEvent>()
}
executeRequest<PaginationResponse> {
apiCall = roomAPI.getRoomMessagesFrom(roomId, from, direction.value, limit, filter)
}.flatMap { chunk ->
tokenChunkEventPersistor
.insertInDb(chunk, roomId, direction)
.map { chunk }
}
}

}

View File

@ -4,18 +4,20 @@ import android.arch.paging.PagedList
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.session.events.model.EnrichedEvent
import im.vector.matrix.android.internal.TaskExecutor
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.query.findAllIncludingEvents
import im.vector.matrix.android.internal.util.PagingRequestHelper
import java.util.*

internal class TimelineBoundaryCallback(private val roomId: String,
private val paginationRequest: PaginationRequest,
private val taskExecutor: TaskExecutor,
private val paginationTask: PaginationTask,
private val monarchy: Monarchy,
private val helper: PagingRequestHelper
) : PagedList.BoundaryCallback<EnrichedEvent>() {

var limit = 10
var limit = 30

override fun onZeroItemsLoaded() {
// actually, it's not possible
@ -44,13 +46,12 @@ internal class TimelineBoundaryCallback(private val roomId: String,
val chunkEntity = ChunkEntity.findAllIncludingEvents(realm, Collections.singletonList(item.root.eventId)).firstOrNull()
token = if (direction == PaginationDirection.FORWARDS) chunkEntity?.nextToken else chunkEntity?.prevToken
}
paginationRequest.execute(
roomId = roomId,
from = token,
direction = direction,
limit = limit,
callback = createCallback(requestCallback)
)
val params = PaginationTask.Params(roomId = roomId,
from = token,
direction = direction,
limit = limit)

taskExecutor.executeTask(paginationTask, params, createCallback(requestCallback))
}



View File

@ -36,7 +36,7 @@ internal class SyncModule {
}

scope(DefaultSession.SCOPE) {
SyncRequest(get(), get(), get())
DefaultSyncTask(get(), get()) as SyncTask
}

scope(DefaultSession.SCOPE) {
@ -44,7 +44,7 @@ internal class SyncModule {
}

scope(DefaultSession.SCOPE) {
SyncThread(get(), get(), get(), get())
SyncThread(get(), get(), get(), get(), get())
}

}

View File

@ -1,46 +0,0 @@
package im.vector.matrix.android.internal.session.sync

import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.filter.FilterBody
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.util.CancelableCoroutine
import im.vector.matrix.android.internal.util.FilterUtil
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

internal class SyncRequest(private val syncAPI: SyncAPI,
private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val syncResponseHandler: SyncResponseHandler) {


fun execute(token: String?, callback: MatrixCallback<SyncResponse>): Cancelable {
val job = GlobalScope.launch {
val syncOrFailure = execute(token)
syncOrFailure.fold({ callback.onFailure(it) }, { callback.onSuccess(it) })
}
return CancelableCoroutine(job)
}

private suspend fun execute(token: String?) = withContext(coroutineDispatchers.io) {
val params = HashMap<String, String>()
val filterBody = FilterBody()
FilterUtil.enableLazyLoading(filterBody, true)
var timeout = 0
if (token != null) {
params["since"] = token
timeout = 30000
}
params["timeout"] = timeout.toString()
params["filter"] = filterBody.toJSONString()
executeRequest<SyncResponse> {
apiCall = syncAPI.sync(params)
}.flatMap { syncResponse ->
syncResponseHandler.handleResponse(syncResponse, token, false)
}
}

}

View File

@ -0,0 +1,41 @@
package im.vector.matrix.android.internal.session.sync

import arrow.core.Try
import im.vector.matrix.android.internal.Task
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.filter.FilterBody
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.util.FilterUtil

internal interface SyncTask : Task<SyncTask.Params, SyncResponse> {

data class Params(val token: String?)

}

internal class DefaultSyncTask(private val syncAPI: SyncAPI,
private val syncResponseHandler: SyncResponseHandler
) : SyncTask {


override fun execute(params: SyncTask.Params): Try<SyncResponse> {
val requestParams = HashMap<String, String>()
val filterBody = FilterBody()
FilterUtil.enableLazyLoading(filterBody, true)
var timeout = 0
if (params.token != null) {
requestParams["since"] = params.token
timeout = 30000
}
requestParams["timeout"] = timeout.toString()
requestParams["filter"] = filterBody.toJSONString()

return executeRequest<SyncResponse> {
apiCall = syncAPI.sync(requestParams)
}.flatMap { syncResponse ->
syncResponseHandler.handleResponse(syncResponse, params.token, false)
}
}


}

View File

@ -3,8 +3,9 @@ package im.vector.matrix.android.internal.session.sync.job
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.TaskExecutor
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.session.sync.SyncRequest
import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.session.sync.SyncTokenStore
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
@ -13,10 +14,11 @@ import java.util.concurrent.CountDownLatch

private const val RETRY_WAIT_TIME_MS = 10_000L

internal class SyncThread(private val syncRequest: SyncRequest,
internal class SyncThread(private val syncTask: SyncTask,
private val networkConnectivityChecker: NetworkConnectivityChecker,
private val syncTokenStore: SyncTokenStore,
private val backgroundDetectionObserver: BackgroundDetectionObserver
private val backgroundDetectionObserver: BackgroundDetectionObserver,
private val taskExecutor: TaskExecutor
) : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener {

enum class State {
@ -30,7 +32,7 @@ internal class SyncThread(private val syncRequest: SyncRequest,
private var state: State = State.IDLE
private val lock = Object()
private var nextBatch = syncTokenStore.getLastToken()
private var cancelableRequest: Cancelable? = null
private var cancelableTask: Cancelable? = null

fun restart() {
synchronized(lock) {
@ -57,7 +59,7 @@ internal class SyncThread(private val syncRequest: SyncRequest,
synchronized(lock) {
Timber.v("Kill sync...")
state = State.KILLING
cancelableRequest?.cancel()
cancelableTask?.cancel()
lock.notify()
}
}
@ -77,7 +79,8 @@ internal class SyncThread(private val syncRequest: SyncRequest,
} else {
Timber.v("Execute sync request...")
val latch = CountDownLatch(1)
cancelableRequest = syncRequest.execute(nextBatch, object : MatrixCallback<SyncResponse> {
val params = SyncTask.Params(nextBatch)
cancelableTask = taskExecutor.executeTask(syncTask, params, object : MatrixCallback<SyncResponse> {
override fun onSuccess(data: SyncResponse) {
nextBatch = data.nextBatch
syncTokenStore.saveToken(nextBatch)