Timeline : reactivate loaders and get off the main thread

This commit is contained in:
ganfra
2019-03-19 19:45:32 +01:00
committed by ganfra
parent 0c76178bee
commit 2898eae566
26 changed files with 366 additions and 228 deletions

View File

@ -76,9 +76,6 @@ dependencies {
implementation 'com.github.Zhuinden:realm-monarchy:0.5.1'
kapt 'dk.ilios:realmfieldnameshelper:1.1.1'
// Paging
implementation 'androidx.paging:paging-runtime:2.0.0'
// Work
implementation "android.arch.work:work-runtime-ktx:1.0.0-beta02"

View File

@ -134,13 +134,13 @@ internal class ChunkEntityTest : InstrumentedTest {
val chunk2: ChunkEntity = realm.createObject()
val eventsForChunk1 = createFakeListOfEvents(30)
val eventsForChunk2 = eventsForChunk1 + createFakeListOfEvents(10)
chunk1.isLast = true
chunk2.isLast = false
chunk1.isLastForward = true
chunk2.isLastForward = false
chunk1.addAll("roomId", eventsForChunk1, PaginationDirection.FORWARDS)
chunk2.addAll("roomId", eventsForChunk2, PaginationDirection.BACKWARDS)
chunk1.merge("roomId", chunk2, PaginationDirection.BACKWARDS)
chunk1.events.size shouldEqual 40
chunk1.isLast.shouldBeTrue()
chunk1.isLastForward.shouldBeTrue()
}
}

View File

@ -25,7 +25,7 @@ import kotlin.random.Random
internal class FakeGetContextOfEventTask(private val tokenChunkEventPersistor: TokenChunkEventPersistor) : GetContextOfEventTask {
override fun execute(params: GetContextOfEventTask.Params): Try<TokenChunkEvent> {
override fun execute(params: GetContextOfEventTask.Params): Try<TokenChunkEventPersistor.Result> {
val fakeEvents = RoomDataHelper.createFakeListOfEvents(30)
val tokenChunkEvent = FakeTokenChunkEvent(
Random.nextLong(System.currentTimeMillis()).toString(),
@ -33,7 +33,6 @@ internal class FakeGetContextOfEventTask(private val tokenChunkEventPersistor: T
fakeEvents
)
return tokenChunkEventPersistor.insertInDb(tokenChunkEvent, params.roomId, PaginationDirection.BACKWARDS)
.map { tokenChunkEvent }
}

View File

@ -18,17 +18,15 @@ package im.vector.matrix.android.session.room.timeline
import arrow.core.Try
import im.vector.matrix.android.internal.session.room.timeline.PaginationTask
import im.vector.matrix.android.internal.session.room.timeline.TokenChunkEvent
import im.vector.matrix.android.internal.session.room.timeline.TokenChunkEventPersistor
import kotlin.random.Random
internal class FakePaginationTask(private val tokenChunkEventPersistor: TokenChunkEventPersistor) : PaginationTask {
override fun execute(params: PaginationTask.Params): Try<TokenChunkEvent> {
override fun execute(params: PaginationTask.Params): Try<TokenChunkEventPersistor.Result> {
val fakeEvents = RoomDataHelper.createFakeListOfEvents(30)
val tokenChunkEvent = FakeTokenChunkEvent(params.from, Random.nextLong(System.currentTimeMillis()).toString(), fakeEvents)
return tokenChunkEventPersistor.insertInDb(tokenChunkEvent, params.roomId, params.direction)
.map { tokenChunkEvent }
}
}

View File

@ -48,7 +48,7 @@ object RoomDataHelper {
val chunkEntity = realm.createObject<ChunkEntity>().apply {
nextToken = null
prevToken = Random.nextLong(System.currentTimeMillis()).toString()
isLast = true
isLastForward = true
}
chunkEntity.addAll("roomId", eventList, PaginationDirection.FORWARDS)
roomEntity.addOrUpdate(chunkEntity)

View File

@ -22,6 +22,8 @@ interface Timeline {
var listener: Timeline.Listener?
fun hasMoreToLoad(direction: Direction): Boolean
fun hasReachedEnd(direction: Direction): Boolean
fun size(): Int
fun snapshot(): List<TimelineEvent>
fun paginate(direction: Direction, count: Int)
@ -44,14 +46,6 @@ interface Timeline {
* These events come from a back pagination.
*/
BACKWARDS("b");
fun reversed(): Direction {
return when (this) {
FORWARDS -> BACKWARDS
BACKWARDS -> FORWARDS
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.api.util
class CancelableBag : Cancelable {
private val cancelableList = ArrayList<Cancelable>()
fun add(cancelable: Cancelable) {
cancelableList.add(cancelable)
}
override fun cancel() {
cancelableList.forEach { it.cancel() }
}
}
fun Cancelable.addTo(cancelables: CancelableBag) {
cancelables.add(this)
}

View File

@ -43,7 +43,6 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val m
queryResults.addChangeListener { t, changeSet ->
onChanged(t, changeSet)
}
processInitialResults(queryResults)
results = AtomicReference(queryResults)
}
}
@ -61,7 +60,7 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val m
return isStarted.get()
}
protected open fun onChanged(realmResults: RealmResults<T>, changeSet: OrderedCollectionChangeSet) {
private fun onChanged(realmResults: RealmResults<T>, changeSet: OrderedCollectionChangeSet) {
val insertionIndexes = changeSet.insertions
val updateIndexes = changeSet.changes
val deletionIndexes = changeSet.deletions
@ -71,12 +70,6 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val m
processChanges(inserted, updated, deleted)
}
protected open fun processInitialResults(results: RealmResults<T>) {
// no-op
}
protected open fun processChanges(inserted: List<T>, updated: List<T>, deleted: List<T>) {
//no-op
}
protected abstract fun processChanges(inserted: List<T>, updated: List<T>, deleted: List<T>)
}

View File

@ -27,18 +27,18 @@ import im.vector.matrix.android.internal.database.query.fastContains
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
import io.realm.Sort
internal fun ChunkEntity.deleteOnCascade() {
assertIsManaged()
this.events.deleteAllFromRealm()
this.deleteFromRealm()
}
// By default if a chunk is empty we consider it unlinked
internal fun ChunkEntity.isUnlinked(): Boolean {
assertIsManaged()
return events.where().equalTo(EventEntityFields.IS_UNLINKED, false).findAll().isEmpty()
}
internal fun ChunkEntity.deleteOnCascade() {
assertIsManaged()
this.events.deleteAllFromRealm()
this.deleteFromRealm()
}
internal fun ChunkEntity.merge(roomId: String,
chunkToMerge: ChunkEntity,
direction: PaginationDirection) {
@ -53,10 +53,11 @@ internal fun ChunkEntity.merge(roomId: String,
val eventsToMerge: List<EventEntity>
if (direction == PaginationDirection.FORWARDS) {
this.nextToken = chunkToMerge.nextToken
this.isLast = chunkToMerge.isLast
this.isLastForward = chunkToMerge.isLastForward
eventsToMerge = chunkToMerge.events.reversed()
} else {
this.prevToken = chunkToMerge.prevToken
this.isLastBackward = chunkToMerge.isLastBackward
eventsToMerge = chunkToMerge.events
}
eventsToMerge.forEach {
@ -117,14 +118,14 @@ private fun ChunkEntity.assertIsManaged() {
internal fun ChunkEntity.lastDisplayIndex(direction: PaginationDirection, defaultValue: Int = 0): Int {
return when (direction) {
PaginationDirection.FORWARDS -> events.where().sort(EventEntityFields.DISPLAY_INDEX, Sort.DESCENDING).findFirst()?.displayIndex
PaginationDirection.BACKWARDS -> events.where().sort(EventEntityFields.DISPLAY_INDEX, Sort.ASCENDING).findFirst()?.displayIndex
} ?: defaultValue
PaginationDirection.FORWARDS -> events.where().sort(EventEntityFields.DISPLAY_INDEX, Sort.DESCENDING).findFirst()?.displayIndex
PaginationDirection.BACKWARDS -> events.where().sort(EventEntityFields.DISPLAY_INDEX, Sort.ASCENDING).findFirst()?.displayIndex
} ?: defaultValue
}
internal fun ChunkEntity.lastStateIndex(direction: PaginationDirection, defaultValue: Int = 0): Int {
return when (direction) {
PaginationDirection.FORWARDS -> events.where().sort(EventEntityFields.STATE_INDEX, Sort.DESCENDING).findFirst()?.stateIndex
PaginationDirection.BACKWARDS -> events.where().sort(EventEntityFields.STATE_INDEX, Sort.ASCENDING).findFirst()?.stateIndex
} ?: defaultValue
PaginationDirection.FORWARDS -> events.where().sort(EventEntityFields.STATE_INDEX, Sort.DESCENDING).findFirst()?.stateIndex
PaginationDirection.BACKWARDS -> events.where().sort(EventEntityFields.STATE_INDEX, Sort.ASCENDING).findFirst()?.stateIndex
} ?: defaultValue
}

View File

@ -23,8 +23,9 @@ import io.realm.annotations.LinkingObjects
internal open class ChunkEntity(var prevToken: String? = null,
var nextToken: String? = null,
var isLast: Boolean = false,
var events: RealmList<EventEntity> = RealmList()
var events: RealmList<EventEntity> = RealmList(),
var isLastForward: Boolean = false,
var isLastBackward: Boolean = false
) : RealmObject() {
@LinkingObjects("chunks")

View File

@ -43,7 +43,7 @@ internal fun ChunkEntity.Companion.find(realm: Realm, roomId: String, prevToken:
internal fun ChunkEntity.Companion.findLastLiveChunkFromRoom(realm: Realm, roomId: String): ChunkEntity? {
return where(realm, roomId)
.equalTo(ChunkEntityFields.IS_LAST, true)
.equalTo(ChunkEntityFields.IS_LAST_FORWARD, true)
.findFirst()
}

View File

@ -17,12 +17,12 @@
package im.vector.matrix.android.internal.session.room.timeline
import arrow.core.Try
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.room.RoomAPI
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.util.FilterUtil
internal interface GetContextOfEventTask : Task<GetContextOfEventTask.Params, TokenChunkEvent> {
internal interface GetContextOfEventTask : Task<GetContextOfEventTask.Params, TokenChunkEventPersistor.Result> {
data class Params(
val roomId: String,
@ -35,12 +35,12 @@ internal class DefaultGetContextOfEventTask(private val roomAPI: RoomAPI,
private val tokenChunkEventPersistor: TokenChunkEventPersistor
) : GetContextOfEventTask {
override fun execute(params: GetContextOfEventTask.Params): Try<EventContextResponse> {
override fun execute(params: GetContextOfEventTask.Params): Try<TokenChunkEventPersistor.Result> {
val filter = FilterUtil.createRoomEventFilter(true)?.toJSONString()
return executeRequest<EventContextResponse> {
apiCall = roomAPI.getContextOfEvent(params.roomId, params.eventId, 0, filter)
}.flatMap { response ->
tokenChunkEventPersistor.insertInDb(response, params.roomId, PaginationDirection.BACKWARDS).map { response }
tokenChunkEventPersistor.insertInDb(response, params.roomId, PaginationDirection.BACKWARDS)
}
}

View File

@ -23,7 +23,7 @@ import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.util.FilterUtil
internal interface PaginationTask : Task<PaginationTask.Params, Boolean> {
internal interface PaginationTask : Task<PaginationTask.Params, TokenChunkEventPersistor.Result> {
data class Params(
val roomId: String,
@ -38,7 +38,7 @@ internal class DefaultPaginationTask(private val roomAPI: RoomAPI,
private val tokenChunkEventPersistor: TokenChunkEventPersistor
) : PaginationTask {
override fun execute(params: PaginationTask.Params): Try<Boolean> {
override fun execute(params: PaginationTask.Params): Try<TokenChunkEventPersistor.Result> {
val filter = FilterUtil.createRoomEventFilter(true)?.toJSONString()
return executeRequest<PaginationResponse> {
apiCall = roomAPI.getRoomMessagesFrom(params.roomId, params.from, params.direction.value, params.limit, filter)

View File

@ -18,10 +18,15 @@
package im.vector.matrix.android.internal.session.room.timeline
import androidx.annotation.UiThread
import android.os.Handler
import android.os.HandlerThread
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.room.timeline.Timeline
import im.vector.matrix.android.api.session.room.timeline.TimelineEvent
import im.vector.matrix.android.api.util.CancelableBag
import im.vector.matrix.android.api.util.addTo
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventEntityFields
@ -29,18 +34,16 @@ import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.PagingRequestHelper
import io.realm.OrderedCollectionChangeSet
import io.realm.OrderedRealmCollectionChangeListener
import io.realm.Realm
import io.realm.RealmConfiguration
import io.realm.RealmQuery
import io.realm.RealmResults
import io.realm.Sort
import io.realm.*
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.collections.ArrayList
private const val INITIAL_LOAD_SIZE = 30
private const val INITIAL_LOAD_SIZE = 20
private const val THREAD_NAME = "TIMELINE_DB_THREAD"
internal class DefaultTimeline(
private val roomId: String,
@ -54,16 +57,24 @@ internal class DefaultTimeline(
) : Timeline {
override var listener: Timeline.Listener? = null
set(value) {
field = value
listener?.onUpdated(snapshot())
}
private lateinit var realm: Realm
private val isStarted = AtomicBoolean(false)
private val handlerThread = AtomicReference<HandlerThread>()
private val handler = AtomicReference<Handler>()
private val realm = AtomicReference<Realm>()
private val cancelableBag = CancelableBag()
private lateinit var liveEvents: RealmResults<EventEntity>
private var prevDisplayIndex: Int = 0
private var nextDisplayIndex: Int = 0
private val isLive = initialEventId == null
private val builtEvents = Collections.synchronizedList<TimelineEvent>(ArrayList())
private val changeListener = OrderedRealmCollectionChangeListener<RealmResults<EventEntity>> { _, changeSet ->
private val eventsChangeListener = OrderedRealmCollectionChangeListener<RealmResults<EventEntity>> { _, changeSet ->
if (changeSet.state == OrderedCollectionChangeSet.State.INITIAL) {
handleInitialLoad()
} else {
@ -78,32 +89,48 @@ internal class DefaultTimeline(
}
}
@UiThread
override fun paginate(direction: Timeline.Direction, count: Int) {
if (direction == Timeline.Direction.FORWARDS && isLive) {
return
}
val startDisplayIndex = if (direction == Timeline.Direction.BACKWARDS) prevDisplayIndex else nextDisplayIndex
val hasBuiltCountItems = insertFromLiveResults(startDisplayIndex, direction, count.toLong())
if (hasBuiltCountItems.not()) {
val token = getToken(direction) ?: return
helper.runIfNotRunning(direction.toRequestType()) {
executePaginationTask(it, token, direction.toPaginationDirection(), 30)
handler.get()?.post {
if (!hasMoreToLoadLive(direction) && hasReachedEndLive(direction)) {
return@post
}
val startDisplayIndex = if (direction == Timeline.Direction.BACKWARDS) prevDisplayIndex else nextDisplayIndex
val builtCountItems = insertFromLiveResults(startDisplayIndex, direction, count.toLong())
if (builtCountItems < count) {
val limit = count - builtCountItems
val token = getTokenLive(direction) ?: return@post
helper.runIfNotRunning(direction.toRequestType()) { executePaginationTask(it, token, direction, limit) }
}
}
}
@UiThread
override fun start() {
realm = Realm.getInstance(realmConfiguration)
liveEvents = buildQuery(initialEventId).findAllAsync()
liveEvents.addChangeListener(changeListener)
if (isStarted.compareAndSet(false, true)) {
val handlerThread = HandlerThread(THREAD_NAME)
handlerThread.start()
val handler = Handler(handlerThread.looper)
this.handlerThread.set(handlerThread)
this.handler.set(handler)
handler.post {
val realm = Realm.getInstance(realmConfiguration)
this.realm.set(realm)
liveEvents = buildEventQuery(realm).findAllAsync()
liveEvents.addChangeListener(eventsChangeListener)
}
}
}
@UiThread
override fun dispose() {
liveEvents.removeAllChangeListeners()
realm.close()
if (isStarted.compareAndSet(true, false)) {
handler.get()?.post {
cancelableBag.cancel()
liveEvents.removeAllChangeListeners()
realm.getAndSet(null)?.close()
handler.set(null)
handlerThread.getAndSet(null)?.quit()
}
}
}
override fun snapshot(): List<TimelineEvent> = synchronized(builtEvents) {
@ -114,6 +141,21 @@ internal class DefaultTimeline(
return builtEvents.size
}
override fun hasReachedEnd(direction: Timeline.Direction): Boolean {
return handler.get()?.postAndWait {
hasReachedEndLive(direction)
} ?: false
}
override fun hasMoreToLoad(direction: Timeline.Direction): Boolean {
return handler.get()?.postAndWait {
hasMoreToLoadLive(direction)
} ?: false
}
/**
* This has to be called on TimelineThread as it access realm live results
*/
private fun handleInitialLoad() = synchronized(builtEvents) {
val initialDisplayIndex = if (isLive) {
liveEvents.firstOrNull()?.displayIndex
@ -138,19 +180,22 @@ internal class DefaultTimeline(
private fun executePaginationTask(requestCallback: PagingRequestHelper.Request.Callback,
from: String,
direction: PaginationDirection,
direction: Timeline.Direction,
limit: Int) {
val params = PaginationTask.Params(roomId = roomId,
from = from,
direction = direction,
limit = limit)
from = from,
direction = direction.toPaginationDirection(),
limit = limit)
paginationTask.configureWith(params)
.enableRetry()
.dispatchTo(object : MatrixCallback<Boolean> {
override fun onSuccess(data: Boolean) {
.dispatchTo(object : MatrixCallback<TokenChunkEventPersistor.Result> {
override fun onSuccess(data: TokenChunkEventPersistor.Result) {
requestCallback.recordSuccess()
if (data == TokenChunkEventPersistor.Result.SHOULD_FETCH_MORE) {
paginate(direction, limit)
}
}
override fun onFailure(failure: Throwable) {
@ -158,26 +203,63 @@ internal class DefaultTimeline(
}
})
.executeBy(taskExecutor)
.addTo(cancelableBag)
}
private fun getToken(direction: Timeline.Direction): String? {
val chunkEntity = liveEvents.firstOrNull()?.chunk?.firstOrNull() ?: return null
/**
* This has to be called on TimelineThread as it access realm live results
*/
private fun getTokenLive(direction: Timeline.Direction): String? {
val chunkEntity = getLiveChunk() ?: return null
return if (direction == Timeline.Direction.BACKWARDS) chunkEntity.prevToken else chunkEntity.nextToken
}
/**
* This has to be called on MonarchyThread as it access realm live results
* @return true if count items has been added
* This has to be called on TimelineThread as it access realm live results
*/
private fun hasReachedEndLive(direction: Timeline.Direction): Boolean {
val liveChunk = getLiveChunk() ?: return false
return if (direction == Timeline.Direction.FORWARDS) {
liveChunk.isLastForward
} else {
liveChunk.isLastBackward || liveEvents.lastOrNull()?.type == EventType.STATE_ROOM_CREATE
}
}
/**
* This has to be called on TimelineThread as it access realm live results
*/
private fun hasMoreToLoadLive(direction: Timeline.Direction): Boolean {
if (liveEvents.isEmpty()) {
return true
}
return if (direction == Timeline.Direction.FORWARDS) {
builtEvents.firstOrNull()?.displayIndex != liveEvents.firstOrNull()?.displayIndex
} else {
builtEvents.lastOrNull()?.displayIndex != liveEvents.lastOrNull()?.displayIndex
}
}
/**
* This has to be called on TimelineThread as it access realm live results
*/
private fun getLiveChunk(): ChunkEntity? {
return liveEvents.firstOrNull()?.chunk?.firstOrNull()
}
/**
* This has to be called on TimelineThread as it access realm live results
* @return number of items who have been added
*/
private fun insertFromLiveResults(startDisplayIndex: Int,
direction: Timeline.Direction,
count: Long): Boolean = synchronized(builtEvents) {
count: Long): Int = synchronized(builtEvents) {
if (count < 1) {
throw java.lang.IllegalStateException("You should provide a count superior to 0")
}
val offsetResults = getOffsetResults(startDisplayIndex, direction, count)
if (offsetResults.isEmpty()) {
return false
return 0
}
val offsetIndex = offsetResults.last()!!.displayIndex
if (direction == Timeline.Direction.BACKWARDS) {
@ -191,9 +273,12 @@ internal class DefaultTimeline(
builtEvents.add(position, timelineEvent)
}
listener?.onUpdated(snapshot())
return offsetResults.size.toLong() == count
return offsetResults.size
}
/**
* This has to be called on TimelineThread as it access realm live results
*/
private fun getOffsetResults(startDisplayIndex: Int,
direction: Timeline.Direction,
count: Long): RealmResults<EventEntity> {
@ -210,21 +295,33 @@ internal class DefaultTimeline(
return offsetQuery.limit(count).findAll()
}
private fun buildQuery(eventId: String?): RealmQuery<EventEntity> {
val query = if (eventId == null) {
private fun buildEventQuery(realm: Realm): RealmQuery<EventEntity> {
val query = if (initialEventId == null) {
EventEntity
.where(realm, roomId = roomId, linkFilterMode = EventEntity.LinkFilterMode.LINKED_ONLY)
.equalTo("${EventEntityFields.CHUNK}.${ChunkEntityFields.IS_LAST}", true)
.equalTo("${EventEntityFields.CHUNK}.${ChunkEntityFields.IS_LAST_FORWARD}", true)
} else {
EventEntity
.where(realm, roomId = roomId, linkFilterMode = EventEntity.LinkFilterMode.BOTH)
.`in`("${EventEntityFields.CHUNK}.${ChunkEntityFields.EVENTS.EVENT_ID}", arrayOf(eventId))
.`in`("${EventEntityFields.CHUNK}.${ChunkEntityFields.EVENTS.EVENT_ID}", arrayOf(initialEventId))
}
query.sort(EventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
return query
}
private fun <T> Handler.postAndWait(runnable: () -> T): T {
val lock = CountDownLatch(1)
val atomicReference = AtomicReference<T>()
post {
val result = runnable()
atomicReference.set(result)
lock.countDown()
}
lock.await()
return atomicReference.get()
}
private fun Timeline.Direction.toRequestType(): PagingRequestHelper.RequestType {
return if (this == Timeline.Direction.BACKWARDS) PagingRequestHelper.RequestType.BEFORE else PagingRequestHelper.RequestType.AFTER
}
@ -233,4 +330,5 @@ internal class DefaultTimeline(
private fun Timeline.Direction.toPaginationDirection(): PaginationDirection {
return if (this == Timeline.Direction.BACKWARDS) PaginationDirection.BACKWARDS else PaginationDirection.FORWARDS
}
}
}

View File

@ -16,12 +16,9 @@
package im.vector.matrix.android.internal.session.room.timeline
import androidx.paging.PagedList
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.room.timeline.Timeline
import im.vector.matrix.android.api.session.room.timeline.TimelineEventInterceptor
import im.vector.matrix.android.api.session.room.timeline.TimelineService
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventEntityFields
import im.vector.matrix.android.internal.database.query.where
@ -29,12 +26,7 @@ import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.PagingRequestHelper
import im.vector.matrix.android.internal.util.tryTransactionAsync
import io.realm.Realm
import io.realm.RealmQuery
import io.realm.Sort
private const val PAGE_SIZE = 100
private const val PREFETCH_DISTANCE = 30
private const val EVENT_NOT_FOUND_INDEX = -1
internal class DefaultTimelineService(private val roomId: String,
@ -46,8 +38,6 @@ internal class DefaultTimelineService(private val roomId: String,
private val helper: PagingRequestHelper
) : TimelineService {
private val eventInterceptors = ArrayList<TimelineEventInterceptor>()
override fun createTimeline(eventId: String?): Timeline {
return DefaultTimeline(roomId, eventId, monarchy.realmConfiguration, taskExecutor, contextOfEventTask, timelineEventFactory, paginationTask, helper)
}
@ -73,15 +63,6 @@ internal class DefaultTimelineService(private val roomId: String,
contextOfEventTask.configureWith(params).executeBy(taskExecutor)
}
private fun buildPagedListConfig(): PagedList.Config {
return PagedList.Config.Builder()
.setEnablePlaceholders(false)
.setPageSize(PAGE_SIZE)
.setInitialLoadSizeHint(2 * PAGE_SIZE)
.setPrefetchDistance(PREFETCH_DISTANCE)
.build()
}
private fun clearUnlinkedEvents() {
monarchy.tryTransactionAsync { realm ->
val unlinkedEvents = EventEntity
@ -101,18 +82,5 @@ internal class DefaultTimelineService(private val roomId: String,
return displayIndex
}
private fun buildDataSourceFactoryQuery(realm: Realm, eventId: String?): RealmQuery<EventEntity> {
val query = if (eventId == null) {
EventEntity
.where(realm, roomId = roomId, linkFilterMode = EventEntity.LinkFilterMode.LINKED_ONLY)
.equalTo("${EventEntityFields.CHUNK}.${ChunkEntityFields.IS_LAST}", true)
} else {
EventEntity
.where(realm, roomId = roomId, linkFilterMode = EventEntity.LinkFilterMode.BOTH)
.`in`("${EventEntityFields.CHUNK}.${ChunkEntityFields.EVENTS.EVENT_ID}", arrayOf(eventId))
}
return query.sort(EventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
}
}

View File

@ -95,8 +95,8 @@ internal class TimelineBoundaryCallback(private val roomId: String,
paginationTask.configureWith(params)
.enableRetry()
.dispatchTo(object : MatrixCallback<Boolean> {
override fun onSuccess(data: Boolean) {
.dispatchTo(object : MatrixCallback<TokenChunkEventPersistor.Result> {
override fun onSuccess(data: TokenChunkEventPersistor.Result) {
requestCallback.recordSuccess()
}

View File

@ -36,13 +36,15 @@ import io.realm.kotlin.createObject
internal class TokenChunkEventPersistor(private val monarchy: Monarchy) {
enum class Result {
SHOULD_FETCH_MORE,
SUCCESS
}
fun insertInDb(receivedChunk: TokenChunkEvent,
roomId: String,
direction: PaginationDirection): Try<Boolean> {
direction: PaginationDirection): Try<Result> {
if (receivedChunk.events.isEmpty() && receivedChunk.stateEvents.isEmpty()) {
return Try.just(false)
}
return monarchy
.tryTransactionSync { realm ->
val roomEntity = RoomEntity.where(realm, roomId).findFirst()
@ -71,27 +73,36 @@ internal class TokenChunkEventPersistor(private val monarchy: Monarchy) {
nextChunk?.apply { this.prevToken = prevToken }
?: ChunkEntity.create(realm, prevToken, nextToken)
}
currentChunk.addAll(roomId, receivedChunk.events, direction, isUnlinked = currentChunk.isUnlinked())
// Then we merge chunks if needed
if (currentChunk != prevChunk && prevChunk != null) {
currentChunk = handleMerge(roomEntity, direction, currentChunk, prevChunk)
} else if (currentChunk != nextChunk && nextChunk != null) {
currentChunk = handleMerge(roomEntity, direction, currentChunk, nextChunk)
if (receivedChunk.events.isEmpty() && receivedChunk.end == receivedChunk.start) {
currentChunk.isLastBackward = true
} else {
val newEventIds = receivedChunk.events.mapNotNull { it.eventId }
ChunkEntity
.findAllIncludingEvents(realm, newEventIds)
.filter { it != currentChunk }
.forEach { overlapped ->
currentChunk = handleMerge(roomEntity, direction, currentChunk, overlapped)
}
currentChunk.addAll(roomId, receivedChunk.events, direction, isUnlinked = currentChunk.isUnlinked())
// Then we merge chunks if needed
if (currentChunk != prevChunk && prevChunk != null) {
currentChunk = handleMerge(roomEntity, direction, currentChunk, prevChunk)
} else if (currentChunk != nextChunk && nextChunk != null) {
currentChunk = handleMerge(roomEntity, direction, currentChunk, nextChunk)
} else {
val newEventIds = receivedChunk.events.mapNotNull { it.eventId }
ChunkEntity
.findAllIncludingEvents(realm, newEventIds)
.filter { it != currentChunk }
.forEach { overlapped ->
currentChunk = handleMerge(roomEntity, direction, currentChunk, overlapped)
}
}
roomEntity.addOrUpdate(currentChunk)
roomEntity.addStateEvents(receivedChunk.stateEvents, isUnlinked = currentChunk.isUnlinked())
}
}
.map {
if (receivedChunk.events.isEmpty() && receivedChunk.stateEvents.isEmpty() && receivedChunk.start != receivedChunk.end) {
Result.SHOULD_FETCH_MORE
} else {
Result.SUCCESS
}
roomEntity.addOrUpdate(currentChunk)
roomEntity.addStateEvents(receivedChunk.stateEvents, isUnlinked = currentChunk.isUnlinked())
}
.map { true }
}
private fun handleMerge(roomEntity: RoomEntity,

View File

@ -51,7 +51,6 @@ internal class RoomSyncHandler(private val monarchy: Monarchy,
data class INVITED(val data: Map<String, InvitedRoomSync>) : HandlingStrategy()
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
}
fun handle(roomsSyncResponse: RoomsSyncResponse) {
monarchy.runTransactionSync { realm ->
handleRoomSync(realm, RoomSyncHandler.HandlingStrategy.JOINED(roomsSyncResponse.join))
@ -164,8 +163,8 @@ internal class RoomSyncHandler(private val monarchy: Monarchy,
realm.createObject<ChunkEntity>().apply { this.prevToken = prevToken }
}
lastChunk?.isLast = false
chunkEntity.isLast = true
lastChunk?.isLastForward = false
chunkEntity.isLastForward = true
chunkEntity.addAll(roomId, eventList, PaginationDirection.FORWARDS, stateIndexOffset)
return chunkEntity
}