diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/api/failure/Failure.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/api/failure/Failure.kt index 4a6a39b8..7a6e0d4c 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/api/failure/Failure.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/api/failure/Failure.kt @@ -1,11 +1,23 @@ package im.vector.matrix.android.api.failure +import java.io.IOException + sealed class Failure { data class Unknown(val exception: Exception? = null) : Failure() - object NetworkConnection : Failure() + data class NetworkConnection(val ioException: IOException) : Failure() data class ServerError(val error: MatrixError) : Failure() abstract class FeatureFailure : Failure() + fun toException(): Exception { + return when (this) { + is Unknown -> this.exception ?: RuntimeException("Unknown error") + is NetworkConnection -> this.ioException + is ServerError -> RuntimeException(this.error.toString()) + is FeatureFailure -> RuntimeException("Feature error") + } + + } + } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/DBConstants.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/DBConstants.kt new file mode 100644 index 00000000..3aa63a15 --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/DBConstants.kt @@ -0,0 +1,7 @@ +package im.vector.matrix.android.internal.database + +object DBConstants { + + const val STATE_EVENTS_CHUNK_TOKEN = "STATE_EVENTS_CHUNK_TOKEN" + +} \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/ChunkEntityQueries.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/ChunkEntityQueries.kt index c96ea6d2..dd812d3b 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/ChunkEntityQueries.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/ChunkEntityQueries.kt @@ -2,19 +2,43 @@ package im.vector.matrix.android.internal.database.query import im.vector.matrix.android.internal.database.model.ChunkEntity import io.realm.Realm +import io.realm.RealmQuery +import io.realm.RealmResults -fun ChunkEntity.Companion.getLastChunkFromRoom(realm: Realm, roomId: String): ChunkEntity? { - return realm.where(ChunkEntity::class.java) - .equalTo("room.roomId", roomId) - .isNull("nextToken") - .and() - .isNotNull("prevToken") - .findAll() - .lastOrNull() +fun ChunkEntity.Companion.where(realm: Realm, roomId: String): RealmQuery { + return realm.where(ChunkEntity::class.java).equalTo("room.roomId", roomId) } -fun ChunkEntity.Companion.getChunkIncludingEvents(realm: Realm, eventIds: List): ChunkEntity? { +fun ChunkEntity.Companion.findWithPrevToken(realm: Realm, roomId: String, prevToken: String?): ChunkEntity? { + if (prevToken == null) { + return null + } + return where(realm, roomId) + .and() + .equalTo("prevToken", prevToken) + .findFirst() +} + +fun ChunkEntity.Companion.findWithNextToken(realm: Realm, roomId: String, nextToken: String?): ChunkEntity? { + if (nextToken == null) { + return null + } + return where(realm, roomId) + .and() + .equalTo("nextToken", nextToken) + .findFirst() +} + +fun ChunkEntity.Companion.findLastFromRoom(realm: Realm, roomId: String): ChunkEntity? { + return where(realm, roomId) + .and() + .isNull("nextToken") + .findAll() + .last() +} + +fun ChunkEntity.Companion.findAllIncludingEvents(realm: Realm, eventIds: List): RealmResults { return realm.where(ChunkEntity::class.java) .`in`("events.eventId", eventIds.toTypedArray()) - .findFirst() + .findAll() } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/EventEntityQueries.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/EventEntityQueries.kt index 20cdd2b7..0ffc6633 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/EventEntityQueries.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/EventEntityQueries.kt @@ -1,13 +1,21 @@ package im.vector.matrix.android.internal.database.query +import im.vector.matrix.android.internal.database.model.ChunkEntity import im.vector.matrix.android.internal.database.model.EventEntity import io.realm.Realm +import io.realm.RealmQuery import io.realm.RealmResults -fun EventEntity.Companion.getAllFromRoom(realm: Realm, roomId: String): RealmResults { +fun EventEntity.Companion.where(realm: Realm, roomId: String): RealmQuery { return realm.where(EventEntity::class.java) .equalTo("chunk.room.roomId", roomId) - .findAll() +} + +fun EventEntity.Companion.where(realm: Realm, chunk: ChunkEntity?): RealmQuery { + return realm.where(EventEntity::class.java) + .equalTo("chunk.prevToken", chunk?.prevToken) + .and() + .equalTo("chunk.nextToken", chunk?.nextToken) } fun RealmResults.getLast(type: String? = null): EventEntity? { @@ -15,5 +23,5 @@ fun RealmResults.getLast(type: String? = null): EventEntity? { if (type != null) { query = query.equalTo("type", type) } - return query.findAll().sort("age").lastOrNull() + return query.findAll().sort("age").last() } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/RoomEntityQueries.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/RoomEntityQueries.kt index 1901a6ee..0dba5dc0 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/RoomEntityQueries.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/query/RoomEntityQueries.kt @@ -4,13 +4,11 @@ import im.vector.matrix.android.internal.database.model.RoomEntity import io.realm.Realm import io.realm.RealmQuery -fun RoomEntity.Companion.getForId(realm: Realm, roomId: String): RoomEntity? { - return realm.where(RoomEntity::class.java) - .equalTo("roomId", roomId) - .findFirst() +fun RoomEntity.Companion.where(realm: Realm, roomId: String): RealmQuery { + return realm.where(RoomEntity::class.java).equalTo("roomId", roomId) } -fun RoomEntity.Companion.getAll(realm: Realm, membership: RoomEntity.Membership? = null): RealmQuery { +fun RoomEntity.Companion.where(realm: Realm, membership: RoomEntity.Membership? = null): RealmQuery { val query = realm.where(RoomEntity::class.java) if (membership != null) { query.equalTo("membership", membership.name) diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/Request.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/Request.kt index 1777a7d3..f34d3601 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/Request.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/Request.kt @@ -31,7 +31,7 @@ class Request { } catch (e: Exception) { when (e) { - is IOException -> Either.Left(Failure.NetworkConnection) + is IOException -> Either.Left(Failure.NetworkConnection(e)) else -> Either.Left(Failure.Unknown(e)) } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/DefaultSession.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/DefaultSession.kt index 95bfe7fc..8bd4fbca 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/DefaultSession.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/DefaultSession.kt @@ -7,6 +7,7 @@ import im.vector.matrix.android.api.session.Session import im.vector.matrix.android.api.session.room.Room import im.vector.matrix.android.api.session.room.RoomService import im.vector.matrix.android.internal.auth.data.SessionParams +import im.vector.matrix.android.internal.session.room.RoomModule import im.vector.matrix.android.internal.session.room.RoomSummaryObserver import im.vector.matrix.android.internal.session.sync.SyncModule import im.vector.matrix.android.internal.session.sync.job.SyncThread @@ -37,7 +38,8 @@ class DefaultSession(private val sessionParams: SessionParams) : Session, KoinCo isOpen = true val sessionModule = SessionModule(sessionParams) val syncModule = SyncModule() - StandAloneContext.loadKoinModules(listOf(sessionModule, syncModule)) + val roomModule = RoomModule() + StandAloneContext.loadKoinModules(listOf(sessionModule, syncModule, roomModule)) scope = getKoin().getOrCreateScope(SCOPE) roomSummaryObserver.start() syncThread.start() diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoom.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoom.kt index 9c903d31..f3b7d7ec 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoom.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoom.kt @@ -1,7 +1,38 @@ package im.vector.matrix.android.internal.session.room +import android.arch.lifecycle.LiveData +import android.arch.paging.LivePagedListBuilder +import android.arch.paging.PagedList +import com.zhuinden.monarchy.Monarchy +import im.vector.matrix.android.api.session.events.model.Event import im.vector.matrix.android.api.session.room.Room +import im.vector.matrix.android.internal.database.mapper.EventMapper +import im.vector.matrix.android.internal.database.model.ChunkEntity +import im.vector.matrix.android.internal.database.model.EventEntity +import im.vector.matrix.android.internal.database.query.where +import im.vector.matrix.android.internal.session.room.timeline.PaginationRequest +import im.vector.matrix.android.internal.session.room.timeline.TimelineBoundaryCallback +import org.koin.standalone.KoinComponent +import org.koin.standalone.inject +import java.util.concurrent.Executors data class DefaultRoom( override val roomId: String -) : Room \ No newline at end of file +) : Room, KoinComponent { + + private val paginationRequest by inject() + private val monarchy by inject() + private val boundaryCallback = TimelineBoundaryCallback(paginationRequest, roomId, monarchy, Executors.newSingleThreadExecutor()) + + fun events(): LiveData> { + val realmDataSourceFactory = monarchy.createDataSourceFactory { realm -> + val lastChunk = ChunkEntity.where(realm, roomId).findAll().last() + EventEntity.where(realm, lastChunk) + } + val domainSourceFactory = realmDataSourceFactory.map { EventMapper.map(it) } + val livePagedListBuilder = LivePagedListBuilder(domainSourceFactory, 20).setBoundaryCallback(boundaryCallback) + return monarchy.findAllPagedWithChanges(realmDataSourceFactory, livePagedListBuilder) + } + + +} \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomService.kt index 1177a95d..54454421 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/DefaultRoomService.kt @@ -5,15 +5,14 @@ import com.zhuinden.monarchy.Monarchy import im.vector.matrix.android.api.session.room.Room import im.vector.matrix.android.api.session.room.RoomService import im.vector.matrix.android.internal.database.model.RoomEntity -import im.vector.matrix.android.internal.database.query.getAll -import im.vector.matrix.android.internal.database.query.getForId +import im.vector.matrix.android.internal.database.query.find class DefaultRoomService(private val monarchy: Monarchy) : RoomService { override fun getAllRooms(): List { var rooms: List = emptyList() monarchy.doWithRealm { realm -> - rooms = RoomEntity.getAll(realm).findAll().map { DefaultRoom(it.roomId) } + rooms = RoomEntity.find(realm).findAll().map { DefaultRoom(it.roomId) } } return rooms } @@ -21,14 +20,14 @@ class DefaultRoomService(private val monarchy: Monarchy) : RoomService { override fun getRoom(roomId: String): Room? { var room: Room? = null monarchy.doWithRealm { realm -> - room = RoomEntity.getForId(realm, roomId)?.let { DefaultRoom(it.roomId) } + room = RoomEntity.find(realm, roomId).findFirst()?.let { DefaultRoom(it.roomId) } } return room } override fun rooms(): LiveData> { return monarchy.findAllMappedWithChanges( - { realm -> RoomEntity.getAll(realm) }, + { realm -> RoomEntity.find(realm) }, { DefaultRoom(it.roomId) } ) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/RoomAPI.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/RoomAPI.kt new file mode 100644 index 00000000..793494f5 --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/RoomAPI.kt @@ -0,0 +1,29 @@ +package im.vector.matrix.android.internal.session.room + +import im.vector.matrix.android.internal.session.room.model.TokenChunkEvent +import kotlinx.coroutines.Deferred +import retrofit2.Response +import retrofit2.http.GET +import retrofit2.http.Path +import retrofit2.http.Query + +interface RoomAPI { + + /** + * Get a list of messages starting from a reference. + * + * @param roomId the room id + * @param from the token identifying where to start. Required. + * @param dir The direction to return messages from. Required. + * @param limit the maximum number of messages to retrieve. Optional. + * @param filter A JSON RoomEventFilter to filter returned events with. Optional. + */ + @GET("rooms/{roomId}/messages") + fun getRoomMessagesFrom(@Path("roomId") roomId: String, + @Query("from") from: String, + @Query("dir") dir: String, + @Query("limit") limit: Int, + @Query("filter") filter: String?): Deferred> + + +} \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/RoomModule.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/RoomModule.kt new file mode 100644 index 00000000..f355d051 --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/RoomModule.kt @@ -0,0 +1,24 @@ +package im.vector.matrix.android.internal.session.room + +import im.vector.matrix.android.internal.session.DefaultSession +import im.vector.matrix.android.internal.session.room.timeline.PaginationRequest +import org.koin.dsl.context.ModuleDefinition +import org.koin.dsl.module.Module +import org.koin.dsl.module.module +import retrofit2.Retrofit + + +class RoomModule : Module { + + override fun invoke(): ModuleDefinition = module(override = true) { + + scope(DefaultSession.SCOPE) { + val retrofit: Retrofit = get() + retrofit.create(RoomAPI::class.java) + } + + scope(DefaultSession.SCOPE) { + PaginationRequest(get(), get()) + } + }.invoke() +} diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/RoomSummaryObserver.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/RoomSummaryObserver.kt index f35b06fd..89ca01e0 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/RoomSummaryObserver.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/RoomSummaryObserver.kt @@ -8,9 +8,8 @@ import im.vector.matrix.android.internal.database.mapper.asDomain import im.vector.matrix.android.internal.database.model.EventEntity import im.vector.matrix.android.internal.database.model.RoomEntity import im.vector.matrix.android.internal.database.model.RoomSummaryEntity -import im.vector.matrix.android.internal.database.query.getAll -import im.vector.matrix.android.internal.database.query.getAllFromRoom import im.vector.matrix.android.internal.database.query.getLast +import im.vector.matrix.android.internal.database.query.where import io.realm.RealmResults import java.util.concurrent.atomic.AtomicBoolean @@ -22,7 +21,7 @@ internal class RoomSummaryObserver(private val monarchy: Monarchy) { fun start() { if (isStarted.compareAndSet(false, true)) { monarchy.doWithRealm { - roomResults = RoomEntity.getAll(it).findAllAsync() + roomResults = RoomEntity.where(it).findAllAsync() roomResults.addChangeListener { rooms, changeSet -> manageRoomResults(rooms, changeSet.changes) manageRoomResults(rooms, changeSet.insertions) @@ -50,7 +49,7 @@ internal class RoomSummaryObserver(private val monarchy: Monarchy) { private fun manageRoom(roomId: String) { monarchy.writeAsync { realm -> - val roomEvents = EventEntity.getAllFromRoom(realm, roomId) + val roomEvents = EventEntity.where(realm, roomId).findAll() val lastNameEvent = roomEvents.getLast(EventType.STATE_ROOM_NAME)?.asDomain() val lastTopicEvent = roomEvents.getLast(EventType.STATE_ROOM_TOPIC)?.asDomain() val lastMessageEvent = roomEvents.getLast(EventType.MESSAGE) diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/model/TokenChunkEvent.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/model/TokenChunkEvent.kt new file mode 100644 index 00000000..bc1c9b59 --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/model/TokenChunkEvent.kt @@ -0,0 +1,13 @@ +package im.vector.matrix.android.internal.session.room.model + +import com.squareup.moshi.Json +import com.squareup.moshi.JsonClass +import im.vector.matrix.android.api.session.events.model.Event + +@JsonClass(generateAdapter = true) +data class TokenChunkEvent( + @Json(name = "start") val prevToken: String? = null, + @Json(name = "end") val nextToken: String? = null, + @Json(name = "chunks") val chunk: List = emptyList(), + @Json(name = "state") val stateEvents: List = emptyList() +) \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/PaginationRequest.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/PaginationRequest.kt new file mode 100644 index 00000000..8c8f6aea --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/PaginationRequest.kt @@ -0,0 +1,116 @@ +package im.vector.matrix.android.internal.session.room.timeline + +import arrow.core.Either +import arrow.core.flatMap +import arrow.core.leftIfNull +import com.zhuinden.monarchy.Monarchy +import im.vector.matrix.android.api.MatrixCallback +import im.vector.matrix.android.api.failure.Failure +import im.vector.matrix.android.api.util.Cancelable +import im.vector.matrix.android.internal.database.mapper.asEntity +import im.vector.matrix.android.internal.database.model.ChunkEntity +import im.vector.matrix.android.internal.database.model.RoomEntity +import im.vector.matrix.android.internal.database.query.findAllIncludingEvents +import im.vector.matrix.android.internal.database.query.findWithNextToken +import im.vector.matrix.android.internal.database.query.findWithPrevToken +import im.vector.matrix.android.internal.database.query.where +import im.vector.matrix.android.internal.network.executeRequest +import im.vector.matrix.android.internal.session.room.RoomAPI +import im.vector.matrix.android.internal.session.room.model.TokenChunkEvent +import im.vector.matrix.android.internal.util.CancelableCoroutine +import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext + +class PaginationRequest(private val roomAPI: RoomAPI, + private val monarchy: Monarchy, + private val coroutineDispatchers: MatrixCoroutineDispatchers) { + + fun execute(roomId: String, + from: String? = null, + direction: String, + limit: Int = 10, + filter: String? = null, + callback: MatrixCallback + ): Cancelable { + val job = GlobalScope.launch(coroutineDispatchers.main) { + val chunkOrFailure = execute(roomId, from, direction, limit, filter) + chunkOrFailure.bimap({ callback.onFailure(it) }, { callback.onSuccess(it) }) + } + return CancelableCoroutine(job) + } + + private suspend fun execute(roomId: String, + from: String? = null, + direction: String, + limit: Int = 10, + filter: String? = null) = withContext(coroutineDispatchers.io) { + + if (from == null) { + return@withContext Either.left(Failure.Unknown(RuntimeException("From token can't be null"))) + } + executeRequest { + apiCall = roomAPI.getRoomMessagesFrom(roomId, from, direction, limit, filter) + }.leftIfNull { + Failure.Unknown(RuntimeException("TokenChunkEvent shouldn't be null")) + }.flatMap { + try { + insertInDb(it, roomId) + Either.right(it) + } catch (exception: Exception) { + Either.Left(Failure.Unknown(exception)) + } + } + } + + private fun insertInDb(chunkEvent: TokenChunkEvent, roomId: String) { + monarchy.runTransactionSync { realm -> + val roomEntity = RoomEntity.where(realm, roomId).findFirst() + ?: return@runTransactionSync + + val nextChunk = ChunkEntity.findWithPrevToken(realm, roomId, chunkEvent.nextToken) + val prevChunk = ChunkEntity.findWithNextToken(realm, roomId, chunkEvent.prevToken) + + val mergedEvents = chunkEvent.chunk + chunkEvent.stateEvents + val mergedEventIds = mergedEvents.filter { it.eventId != null }.map { it.eventId!! } + val chunksOverlapped = ChunkEntity.findAllIncludingEvents(realm, mergedEventIds) + + val currentChunk: ChunkEntity + if (nextChunk != null) { + currentChunk = nextChunk + } else { + currentChunk = ChunkEntity() + } + currentChunk.prevToken = chunkEvent.prevToken + mergedEvents.forEach { event -> + val eventEntity = event.asEntity().let { + realm.copyToRealmOrUpdate(it) + } + if (!currentChunk.events.contains(eventEntity)) { + currentChunk.events.add(eventEntity) + } + } + + if (prevChunk != null) { + currentChunk.events.addAll(prevChunk.events) + roomEntity.chunks.remove(prevChunk) + + } else if (chunksOverlapped.isNotEmpty()) { + chunksOverlapped.forEach { chunk -> + chunk.events.forEach { event -> + if (!currentChunk.events.contains(event)) { + currentChunk.events.add(event) + } + } + roomEntity.chunks.remove(chunk) + } + } + + if (!roomEntity.chunks.contains(currentChunk)) { + roomEntity.chunks.add(currentChunk) + } + } + } + +} \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/TimelineBoundaryCallback.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/TimelineBoundaryCallback.kt new file mode 100644 index 00000000..c7c3acb1 --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/TimelineBoundaryCallback.kt @@ -0,0 +1,52 @@ +package im.vector.matrix.android.internal.session.room.timeline + +import android.arch.paging.PagedList +import com.zhuinden.monarchy.Monarchy +import im.vector.matrix.android.api.MatrixCallback +import im.vector.matrix.android.api.failure.Failure +import im.vector.matrix.android.api.session.events.model.Event +import im.vector.matrix.android.internal.database.model.ChunkEntity +import im.vector.matrix.android.internal.database.query.findAllIncludingEvents +import im.vector.matrix.android.internal.session.room.model.TokenChunkEvent +import im.vector.matrix.android.internal.util.PagingRequestHelper +import java.util.* +import java.util.concurrent.Executor + +class TimelineBoundaryCallback(private val paginationRequest: PaginationRequest, + private val roomId: String, + private val monarchy: Monarchy, + ioExecutor: Executor +) : PagedList.BoundaryCallback() { + + private val helper = PagingRequestHelper(ioExecutor) + + override fun onZeroItemsLoaded() { + // actually, it's not possible + } + + override fun onItemAtEndLoaded(itemAtEnd: Event) { + helper.runIfNotRunning(PagingRequestHelper.RequestType.AFTER) { + monarchy.doWithRealm { realm -> + if (itemAtEnd.eventId == null) { + return@doWithRealm + } + val chunkEntity = ChunkEntity.findAllIncludingEvents(realm, Collections.singletonList(itemAtEnd.eventId)).firstOrNull() + paginationRequest.execute(roomId, chunkEntity?.prevToken, "forward", callback = createCallback(it)) + } + } + } + + override fun onItemAtFrontLoaded(itemAtFront: Event) { + //Todo handle forward pagination + } + + private fun createCallback(pagingRequestCallback: PagingRequestHelper.Request.Callback) = object : MatrixCallback { + override fun onSuccess(data: TokenChunkEvent) { + pagingRequestCallback.recordSuccess() + } + + override fun onFailure(failure: Failure) { + pagingRequestCallback.recordFailure(failure.toException()) + } + } +} diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt index 54292b13..930aea43 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt @@ -2,12 +2,13 @@ package im.vector.matrix.android.internal.session.sync import com.zhuinden.monarchy.Monarchy import im.vector.matrix.android.api.session.events.model.Event +import im.vector.matrix.android.internal.database.DBConstants import im.vector.matrix.android.internal.database.mapper.asEntity import im.vector.matrix.android.internal.database.model.ChunkEntity import im.vector.matrix.android.internal.database.model.RoomEntity -import im.vector.matrix.android.internal.database.query.getChunkIncludingEvents -import im.vector.matrix.android.internal.database.query.getForId -import im.vector.matrix.android.internal.database.query.getLastChunkFromRoom +import im.vector.matrix.android.internal.database.query.findAllIncludingEvents +import im.vector.matrix.android.internal.database.query.findLastFromRoom +import im.vector.matrix.android.internal.database.query.where import im.vector.matrix.android.internal.session.sync.model.InvitedRoomSync import im.vector.matrix.android.internal.session.sync.model.RoomSync import io.realm.Realm @@ -22,7 +23,7 @@ class RoomSyncHandler(private val monarchy: Monarchy) { } fun handleRoomSync(handlingStrategy: HandlingStrategy) { - monarchy.writeAsync { realm -> + monarchy.runTransactionSync { realm -> val roomEntities = when (handlingStrategy) { is HandlingStrategy.JOINED -> handlingStrategy.data.map { handleJoinedRoom(realm, it.key, it.value) } is HandlingStrategy.INVITED -> handlingStrategy.data.map { handleInvitedRoom(realm, it.key, it.value) } @@ -39,7 +40,7 @@ class RoomSyncHandler(private val monarchy: Monarchy) { roomId: String, roomSync: RoomSync): RoomEntity { - val roomEntity = RoomEntity.getForId(realm, roomId) ?: RoomEntity(roomId) + val roomEntity = RoomEntity.where(realm, roomId).findFirst() ?: RoomEntity(roomId) if (roomEntity.membership == RoomEntity.Membership.INVITED) { roomEntity.chunks.deleteAllFromRealm() @@ -47,7 +48,7 @@ class RoomSyncHandler(private val monarchy: Monarchy) { roomEntity.membership = RoomEntity.Membership.JOINED if (roomSync.state != null && roomSync.state.events.isNotEmpty()) { - val chunkEntity = eventListToChunk(realm, roomId, roomSync.state.events) + val chunkEntity = eventListToChunk(realm, roomId, roomSync.state.events, DBConstants.STATE_EVENTS_CHUNK_TOKEN, DBConstants.STATE_EVENTS_CHUNK_TOKEN) if (!roomEntity.chunks.contains(chunkEntity)) { roomEntity.chunks.add(chunkEntity) } @@ -94,10 +95,10 @@ class RoomSyncHandler(private val monarchy: Monarchy) { isLimited: Boolean = true): ChunkEntity { val chunkEntity = if (!isLimited) { - ChunkEntity.getLastChunkFromRoom(realm, roomId) + ChunkEntity.findLastFromRoom(realm, roomId) } else { val eventIds = eventList.filter { it.eventId != null }.map { it.eventId!! } - ChunkEntity.getChunkIncludingEvents(realm, eventIds) + ChunkEntity.findAllIncludingEvents(realm, eventIds).firstOrNull() } ?: ChunkEntity().apply { this.prevToken = prevToken } chunkEntity.nextToken = nextToken diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/PagingRequestHelper.java b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/PagingRequestHelper.java new file mode 100644 index 00000000..7e77fead --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/PagingRequestHelper.java @@ -0,0 +1,525 @@ + +/* + * Copyright 2017 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package im.vector.matrix.android.internal.util; + +import android.support.annotation.AnyThread; +import android.support.annotation.GuardedBy; +import android.support.annotation.NonNull; +import android.support.annotation.Nullable; +import android.support.annotation.VisibleForTesting; + +import java.util.Arrays; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A helper class for {@link android.arch.paging.PagedList.BoundaryCallback BoundaryCallback}s and + * {@link android.arch.paging.DataSource}s to help with tracking network requests. + *

+ * It is designed to support 3 types of requests, {@link RequestType#INITIAL INITIAL}, + * {@link RequestType#BEFORE BEFORE} and {@link RequestType#AFTER AFTER} and runs only 1 request + * for each of them via {@link #runIfNotRunning(RequestType, Request)}. + *

+ * It tracks a {@link Status} and an {@code error} for each {@link RequestType}. + *

+ * A sample usage of this class to limit requests looks like this: + *

+ * class PagingBoundaryCallback extends PagedList.BoundaryCallback<MyItem> {
+ *     // TODO replace with an executor from your application
+ *     Executor executor = Executors.newSingleThreadExecutor();
+ *     PagingRequestHelper helper = new PagingRequestHelper(executor);
+ *     // imaginary API service, using Retrofit
+ *     MyApi api;
+ *
+ *     {@literal @}Override
+ *     public void onItemAtFrontLoaded({@literal @}NonNull MyItem itemAtFront) {
+ *         helper.runIfNotRunning(PagingRequestHelper.RequestType.BEFORE,
+ *                 helperCallback -> api.getTopBefore(itemAtFront.getName(), 10).enqueue(
+ *                         new Callback<ApiResponse>() {
+ *                             {@literal @}Override
+ *                             public void onResponse(Call<ApiResponse> call,
+ *                                     Response<ApiResponse> response) {
+ *                                 // TODO insert new records into database
+ *                                 helperCallback.recordSuccess();
+ *                             }
+ *
+ *                             {@literal @}Override
+ *                             public void onFailure(Call<ApiResponse> call, Throwable t) {
+ *                                 helperCallback.recordFailure(t);
+ *                             }
+ *                         }));
+ *     }
+ *
+ *     {@literal @}Override
+ *     public void onItemAtEndLoaded({@literal @}NonNull MyItem itemAtEnd) {
+ *         helper.runIfNotRunning(PagingRequestHelper.RequestType.AFTER,
+ *                 helperCallback -> api.getTopBefore(itemAtEnd.getName(), 10).enqueue(
+ *                         new Callback<ApiResponse>() {
+ *                             {@literal @}Override
+ *                             public void onResponse(Call<ApiResponse> call,
+ *                                     Response<ApiResponse> response) {
+ *                                 // TODO insert new records into database
+ *                                 helperCallback.recordSuccess();
+ *                             }
+ *
+ *                             {@literal @}Override
+ *                             public void onFailure(Call<ApiResponse> call, Throwable t) {
+ *                                 helperCallback.recordFailure(t);
+ *                             }
+ *                         }));
+ *     }
+ * }
+ * 
+ *

+ * The helper provides an API to observe combined request status, which can be reported back to the + * application based on your business rules. + *

+ * MutableLiveData<PagingRequestHelper.Status> combined = new MutableLiveData<>();
+ * helper.addListener(status -> {
+ *     // merge multiple states per request type into one, or dispatch separately depending on
+ *     // your application logic.
+ *     if (status.hasRunning()) {
+ *         combined.postValue(PagingRequestHelper.Status.RUNNING);
+ *     } else if (status.hasError()) {
+ *         // can also obtain the error via {@link StatusReport#getErrorFor(RequestType)}
+ *         combined.postValue(PagingRequestHelper.Status.FAILED);
+ *     } else {
+ *         combined.postValue(PagingRequestHelper.Status.SUCCESS);
+ *     }
+ * });
+ * 
+ */ +// THIS class is likely to be moved into the library in a future release. Feel free to copy it +// from this sample. +public class PagingRequestHelper { + private final Object mLock = new Object(); + private final Executor mRetryService; + @GuardedBy("mLock") + private final RequestQueue[] mRequestQueues = new RequestQueue[] + {new RequestQueue(RequestType.INITIAL), + new RequestQueue(RequestType.BEFORE), + new RequestQueue(RequestType.AFTER)}; + @NonNull + final CopyOnWriteArrayList mListeners = new CopyOnWriteArrayList<>(); + + /** + * Creates a new PagingRequestHelper with the given {@link Executor} which is used to run + * retry actions. + * + * @param retryService The {@link Executor} that can run the retry actions. + */ + public PagingRequestHelper(@NonNull Executor retryService) { + mRetryService = retryService; + } + + /** + * Adds a new listener that will be notified when any request changes {@link Status state}. + * + * @param listener The listener that will be notified each time a request's status changes. + * @return True if it is added, false otherwise (e.g. it already exists in the list). + */ + @AnyThread + public boolean addListener(@NonNull Listener listener) { + return mListeners.add(listener); + } + + /** + * Removes the given listener from the listeners list. + * + * @param listener The listener that will be removed. + * @return True if the listener is removed, false otherwise (e.g. it never existed) + */ + public boolean removeListener(@NonNull Listener listener) { + return mListeners.remove(listener); + } + + /** + * Runs the given {@link Request} if no other requests in the given request type is already + * running. + *

+ * If run, the request will be run in the current thread. + * + * @param type The type of the request. + * @param request The request to run. + * @return True if the request is run, false otherwise. + */ + @SuppressWarnings("WeakerAccess") + @AnyThread + public boolean runIfNotRunning(@NonNull RequestType type, @NonNull Request request) { + boolean hasListeners = !mListeners.isEmpty(); + StatusReport report = null; + synchronized (mLock) { + RequestQueue queue = mRequestQueues[type.ordinal()]; + if (queue.mRunning != null) { + return false; + } + queue.mRunning = request; + queue.mStatus = Status.RUNNING; + queue.mFailed = null; + queue.mLastError = null; + if (hasListeners) { + report = prepareStatusReportLocked(); + } + } + if (report != null) { + dispatchReport(report); + } + final RequestWrapper wrapper = new RequestWrapper(request, this, type); + wrapper.run(); + return true; + } + + @GuardedBy("mLock") + private StatusReport prepareStatusReportLocked() { + Throwable[] errors = new Throwable[]{ + mRequestQueues[0].mLastError, + mRequestQueues[1].mLastError, + mRequestQueues[2].mLastError + }; + return new StatusReport( + getStatusForLocked(RequestType.INITIAL), + getStatusForLocked(RequestType.BEFORE), + getStatusForLocked(RequestType.AFTER), + errors + ); + } + + @GuardedBy("mLock") + private Status getStatusForLocked(RequestType type) { + return mRequestQueues[type.ordinal()].mStatus; + } + + @AnyThread + @VisibleForTesting + void recordResult(@NonNull RequestWrapper wrapper, @Nullable Throwable throwable) { + StatusReport report = null; + final boolean success = throwable == null; + boolean hasListeners = !mListeners.isEmpty(); + synchronized (mLock) { + RequestQueue queue = mRequestQueues[wrapper.mType.ordinal()]; + queue.mRunning = null; + queue.mLastError = throwable; + if (success) { + queue.mFailed = null; + queue.mStatus = Status.SUCCESS; + } else { + queue.mFailed = wrapper; + queue.mStatus = Status.FAILED; + } + if (hasListeners) { + report = prepareStatusReportLocked(); + } + } + if (report != null) { + dispatchReport(report); + } + } + + private void dispatchReport(StatusReport report) { + for (Listener listener : mListeners) { + listener.onStatusChange(report); + } + } + + /** + * Retries all failed requests. + * + * @return True if any request is retried, false otherwise. + */ + public boolean retryAllFailed() { + final RequestWrapper[] toBeRetried = new RequestWrapper[RequestType.values().length]; + boolean retried = false; + synchronized (mLock) { + for (int i = 0; i < RequestType.values().length; i++) { + toBeRetried[i] = mRequestQueues[i].mFailed; + mRequestQueues[i].mFailed = null; + } + } + for (RequestWrapper failed : toBeRetried) { + if (failed != null) { + failed.retry(mRetryService); + retried = true; + } + } + return retried; + } + + static class RequestWrapper implements Runnable { + @NonNull + final Request mRequest; + @NonNull + final PagingRequestHelper mHelper; + @NonNull + final RequestType mType; + + RequestWrapper(@NonNull Request request, @NonNull PagingRequestHelper helper, + @NonNull RequestType type) { + mRequest = request; + mHelper = helper; + mType = type; + } + + @Override + public void run() { + mRequest.run(new Request.Callback(this, mHelper)); + } + + void retry(Executor service) { + service.execute(new Runnable() { + @Override + public void run() { + mHelper.runIfNotRunning(mType, mRequest); + } + }); + } + } + + /** + * Runner class that runs a request tracked by the {@link PagingRequestHelper}. + *

+ * When a request is invoked, it must call one of {@link Callback#recordFailure(Throwable)} + * or {@link Callback#recordSuccess()} once and only once. This call + * can be made any time. Until that method call is made, {@link PagingRequestHelper} will + * consider the request is running. + */ + @FunctionalInterface + public interface Request { + /** + * Should run the request and call the given {@link Callback} with the result of the + * request. + * + * @param callback The callback that should be invoked with the result. + */ + void run(Callback callback); + + /** + * Callback class provided to the {@link #run(Callback)} method to report the result. + */ + class Callback { + private final AtomicBoolean mCalled = new AtomicBoolean(); + private final RequestWrapper mWrapper; + private final PagingRequestHelper mHelper; + + Callback(RequestWrapper wrapper, PagingRequestHelper helper) { + mWrapper = wrapper; + mHelper = helper; + } + + /** + * Call this method when the request succeeds and new data is fetched. + */ + @SuppressWarnings("unused") + public final void recordSuccess() { + if (mCalled.compareAndSet(false, true)) { + mHelper.recordResult(mWrapper, null); + } else { + throw new IllegalStateException( + "already called recordSuccess or recordFailure"); + } + } + + /** + * Call this method with the failure message and the request can be retried via + * {@link #retryAllFailed()}. + * + * @param throwable The error that occured while carrying out the request. + */ + @SuppressWarnings("unused") + public final void recordFailure(@NonNull Throwable throwable) { + //noinspection ConstantConditions + if (throwable == null) { + throw new IllegalArgumentException("You must provide a throwable describing" + + " the error to record the failure"); + } + if (mCalled.compareAndSet(false, true)) { + mHelper.recordResult(mWrapper, throwable); + } else { + throw new IllegalStateException( + "already called recordSuccess or recordFailure"); + } + } + } + } + + /** + * Data class that holds the information about the current status of the ongoing requests + * using this helper. + */ + public static final class StatusReport { + /** + * Status of the latest request that were submitted with {@link RequestType#INITIAL}. + */ + @NonNull + public final Status initial; + /** + * Status of the latest request that were submitted with {@link RequestType#BEFORE}. + */ + @NonNull + public final Status before; + /** + * Status of the latest request that were submitted with {@link RequestType#AFTER}. + */ + @NonNull + public final Status after; + @NonNull + private final Throwable[] mErrors; + + StatusReport(@NonNull Status initial, @NonNull Status before, @NonNull Status after, + @NonNull Throwable[] errors) { + this.initial = initial; + this.before = before; + this.after = after; + this.mErrors = errors; + } + + /** + * Convenience method to check if there are any running requests. + * + * @return True if there are any running requests, false otherwise. + */ + public boolean hasRunning() { + return initial == Status.RUNNING + || before == Status.RUNNING + || after == Status.RUNNING; + } + + /** + * Convenience method to check if there are any requests that resulted in an error. + * + * @return True if there are any requests that finished with error, false otherwise. + */ + public boolean hasError() { + return initial == Status.FAILED + || before == Status.FAILED + || after == Status.FAILED; + } + + /** + * Returns the error for the given request type. + * + * @param type The request type for which the error should be returned. + * @return The {@link Throwable} returned by the failing request with the given type or + * {@code null} if the request for the given type did not fail. + */ + @Nullable + public Throwable getErrorFor(@NonNull RequestType type) { + return mErrors[type.ordinal()]; + } + + @Override + public String toString() { + return "StatusReport{" + + "initial=" + initial + + ", before=" + before + + ", after=" + after + + ", mErrors=" + Arrays.toString(mErrors) + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StatusReport that = (StatusReport) o; + if (initial != that.initial) return false; + if (before != that.before) return false; + if (after != that.after) return false; + // Probably incorrect - comparing Object[] arrays with Arrays.equals + return Arrays.equals(mErrors, that.mErrors); + } + + @Override + public int hashCode() { + int result = initial.hashCode(); + result = 31 * result + before.hashCode(); + result = 31 * result + after.hashCode(); + result = 31 * result + Arrays.hashCode(mErrors); + return result; + } + } + + /** + * Listener interface to get notified by request status changes. + */ + public interface Listener { + /** + * Called when the status for any of the requests has changed. + * + * @param report The current status report that has all the information about the requests. + */ + void onStatusChange(@NonNull StatusReport report); + } + + /** + * Represents the status of a Request for each {@link RequestType}. + */ + public enum Status { + /** + * There is current a running request. + */ + RUNNING, + /** + * The last request has succeeded or no such requests have ever been run. + */ + SUCCESS, + /** + * The last request has failed. + */ + FAILED + } + + /** + * Available request types. + */ + public enum RequestType { + /** + * Corresponds to an initial request made to a {@link android.arch.paging.DataSource} or the empty state for + * a {@link android.arch.paging.PagedList.BoundaryCallback BoundaryCallback}. + */ + INITIAL, + /** + * Corresponds to the {@code loadBefore} calls in {@link android.arch.paging.DataSource} or + * {@code onItemAtFrontLoaded} in + * {@link android.arch.paging.PagedList.BoundaryCallback BoundaryCallback}. + */ + BEFORE, + /** + * Corresponds to the {@code loadAfter} calls in {@link android.arch.paging.DataSource} or + * {@code onItemAtEndLoaded} in + * {@link android.arch.paging.PagedList.BoundaryCallback BoundaryCallback}. + */ + AFTER + } + + class RequestQueue { + @NonNull + final RequestType mRequestType; + @Nullable + RequestWrapper mFailed; + @Nullable + Request mRunning; + @Nullable + Throwable mLastError; + @NonNull + Status mStatus = Status.SUCCESS; + + RequestQueue(@NonNull RequestType requestType) { + mRequestType = requestType; + } + } +} \ No newline at end of file