Start handling prune events. WIP

This commit is contained in:
ganfra 2018-11-07 20:36:19 +01:00
parent cd25232572
commit be8b1287c7
14 changed files with 184 additions and 129 deletions

View File

@ -26,7 +26,7 @@ class MessageItemFactory(private val timelineDateFormatter: TimelineDateFormatte
avatarUrl = roomMember.avatarUrl,
showInformation = showInformation,
time = timelineDateFormatter.formatMessageHour(date),
memberName = roomMember.displayName
memberName = roomMember.displayName ?: event.root.sender
)
}


View File

@ -31,7 +31,7 @@ class TimelineEventController(private val roomId: String,
}
}

var snapshotList: List<EnrichedEvent>? = emptyList()
private var snapshotList: List<EnrichedEvent>? = emptyList()
var timeline: PagedList<EnrichedEvent>? = null
set(value) {
field?.removeWeakCallback(pagedListCallback)

View File

@ -4,7 +4,7 @@
<item android:state_selected="true">
<shape android:shape="oval">
<solid android:color="@android:color/transparent" />
<stroke android:width="2dp" android:color="@color/pale_teal" />
<stroke android:width="3dp" android:color="@color/pale_teal" />
</shape>

</item>

View File

@ -16,7 +16,9 @@ data class Event(
@Json(name = "sender") val sender: String? = null,
@Json(name = "state_key") val stateKey: String? = null,
@Json(name = "room_id") var roomId: String? = null,
@Json(name = "unsigned") val unsignedData: UnsignedData? = null
@Json(name = "unsigned") val unsignedData: UnsignedData? = null,
@Json(name = "redacts") val redacts: String? = null

) {

val contentAsJsonObject: JsonObject? by lazy {

View File

@ -0,0 +1,48 @@
package im.vector.matrix.android.internal.database

import android.arch.lifecycle.LiveData
import android.arch.lifecycle.Observer
import com.zhuinden.monarchy.Monarchy
import io.realm.RealmObject
import io.realm.RealmResults
import java.util.concurrent.atomic.AtomicBoolean

internal interface LiveEntityObserver {
fun start()
fun dispose()
}

internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val monarchy: Monarchy)
: Observer<Monarchy.ManagedChangeSet<T>>, LiveEntityObserver {

protected abstract val query: Monarchy.Query<T>
private val isStarted = AtomicBoolean(false)
private val liveResults: LiveData<Monarchy.ManagedChangeSet<T>> by lazy {
monarchy.findAllManagedWithChanges(query)
}

override fun start() {
if (isStarted.compareAndSet(false, true)) {
liveResults.observeForever(this)
}
}

override fun dispose() {
if (isStarted.compareAndSet(true, false)) {
liveResults.removeObserver(this)
}
}

// PRIVATE

override fun onChanged(changeSet: Monarchy.ManagedChangeSet<T>?) {
if (changeSet == null) {
return
}
val indexes = changeSet.orderedCollectionChangeSet.changes + changeSet.orderedCollectionChangeSet.insertions
process(changeSet.realmResults, indexes)
}

abstract fun process(results: RealmResults<T>, indexes: IntArray)

}

View File

@ -1,10 +1,10 @@
package im.vector.matrix.android.internal.database.mapper

import com.squareup.moshi.Types
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.UnsignedData
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.di.MoshiProvider
import im.vector.matrix.android.api.session.events.model.Event


object EventMapper {
@ -22,21 +22,23 @@ object EventMapper {
eventEntity.type = event.type
eventEntity.sender = event.sender
eventEntity.originServerTs = event.originServerTs
eventEntity.redacts = event.redacts
eventEntity.age = event.unsignedData?.age ?: event.originServerTs
return eventEntity
}

internal fun map(eventEntity: EventEntity): Event {
return Event(
eventEntity.type,
eventEntity.eventId,
adapter.fromJson(eventEntity.content),
adapter.fromJson(eventEntity.prevContent ?: ""),
eventEntity.originServerTs,
eventEntity.sender,
eventEntity.stateKey,
null,
UnsignedData(eventEntity.age)
type = eventEntity.type,
eventId = eventEntity.eventId,
content = adapter.fromJson(eventEntity.content),
prevContent = adapter.fromJson(eventEntity.prevContent ?: ""),
originServerTs = eventEntity.originServerTs,
sender = eventEntity.sender,
stateKey = eventEntity.stateKey,
roomId = null,
unsignedData = UnsignedData(eventEntity.age),
redacts = eventEntity.redacts
)
}
}

View File

@ -2,17 +2,18 @@ package im.vector.matrix.android.internal.database.model

import io.realm.RealmObject
import io.realm.RealmResults
import io.realm.annotations.Index
import io.realm.annotations.LinkingObjects
import io.realm.annotations.PrimaryKey

open class EventEntity(@Index var eventId: String = "",
open class EventEntity(@PrimaryKey var eventId: String = "",
var type: String = "",
var content: String = "",
var prevContent: String? = null,
var stateKey: String? = null,
var originServerTs: Long? = null,
var sender: String? = null,
var age: Long? = 0
var age: Long? = 0,
var redacts: String? = null
) : RealmObject() {

companion object

View File

@ -15,9 +15,11 @@ fun EventEntity.Companion.where(realm: Realm, eventId: String): RealmQuery<Event
.equalTo(EventEntityFields.EVENT_ID, eventId)
}

fun EventEntity.Companion.where(realm: Realm, roomId: String, type: String? = null): RealmQuery<EventEntity> {
fun EventEntity.Companion.where(realm: Realm, roomId: String? = null, type: String? = null): RealmQuery<EventEntity> {
val query = realm.where<EventEntity>()
.equalTo("${EventEntityFields.CHUNK}.${ChunkEntityFields.ROOM}.${RoomEntityFields.ROOM_ID}", roomId)
if (roomId != null) {
query.equalTo("${EventEntityFields.CHUNK}.${ChunkEntityFields.ROOM}.${RoomEntityFields.ROOM_ID}", roomId)
}
if (type != null) {
query.equalTo(EventEntityFields.TYPE, type)
}
@ -32,7 +34,7 @@ fun EventEntity.Companion.stateEvents(realm: Realm, roomId: String): RealmQuery<

fun RealmQuery<EventEntity>.last(from: Long? = null): EventEntity? {
if (from != null) {
this.lessThanOrEqualTo(EventEntityFields.ORIGIN_SERVER_TS, from)
this.lessThan(EventEntityFields.ORIGIN_SERVER_TS, from)
}
return this
.sort(EventEntityFields.ORIGIN_SERVER_TS, Sort.DESCENDING)

View File

@ -11,10 +11,9 @@ import im.vector.matrix.android.api.session.room.Room
import im.vector.matrix.android.api.session.room.RoomService
import im.vector.matrix.android.api.session.room.model.RoomSummary
import im.vector.matrix.android.internal.auth.data.SessionParams
import im.vector.matrix.android.internal.database.LiveEntityObserver
import im.vector.matrix.android.internal.session.group.GroupModule
import im.vector.matrix.android.internal.session.group.GroupSummaryUpdater
import im.vector.matrix.android.internal.session.room.RoomModule
import im.vector.matrix.android.internal.session.room.RoomSummaryUpdater
import im.vector.matrix.android.internal.session.sync.SyncModule
import im.vector.matrix.android.internal.session.sync.job.SyncThread
import org.koin.core.scope.Scope
@ -32,8 +31,7 @@ class DefaultSession(override val sessionParams: SessionParams) : Session, KoinC

private lateinit var scope: Scope

private val roomSummaryObserver by inject<RoomSummaryUpdater>()
private val groupSummaryUpdater by inject<GroupSummaryUpdater>()
private val liveEntityUpdaters by inject<List<LiveEntityObserver>>()
private val roomService by inject<RoomService>()
private val groupService by inject<GroupService>()
private val syncThread by inject<SyncThread>()
@ -50,8 +48,7 @@ class DefaultSession(override val sessionParams: SessionParams) : Session, KoinC
val groupModule = GroupModule()
StandAloneContext.loadKoinModules(listOf(sessionModule, syncModule, roomModule, groupModule))
scope = getKoin().getOrCreateScope(SCOPE)
roomSummaryObserver.start()
groupSummaryUpdater.start()
liveEntityUpdaters.forEach { it.start() }
syncThread.start()
}

@ -61,8 +58,7 @@ class DefaultSession(override val sessionParams: SessionParams) : Session, KoinC
checkIsMainThread()
assert(isOpen)
syncThread.kill()
groupSummaryUpdater.dispose()
roomSummaryObserver.dispose()
liveEntityUpdaters.forEach { it.dispose() }
scope.close()
isOpen = false
}

View File

@ -4,6 +4,8 @@ import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.group.GroupService
import im.vector.matrix.android.api.session.room.RoomService
import im.vector.matrix.android.internal.auth.data.SessionParams
import im.vector.matrix.android.internal.database.LiveEntityObserver
import im.vector.matrix.android.internal.session.events.prune.EventsPruner
import im.vector.matrix.android.internal.session.group.DefaultGroupService
import im.vector.matrix.android.internal.session.group.GroupSummaryUpdater
import im.vector.matrix.android.internal.session.room.DefaultRoomService
@ -53,22 +55,23 @@ class SessionModule(private val sessionParams: SessionParams) : Module {
RoomAvatarResolver(get(), sessionParams.credentials)
}

scope(DefaultSession.SCOPE) {
RoomSummaryUpdater(get(), get(), get(), get(), sessionParams.credentials)
}

scope(DefaultSession.SCOPE) {
DefaultRoomService(get()) as RoomService
}

scope(DefaultSession.SCOPE) {
GroupSummaryUpdater(get(), get())
}

scope(DefaultSession.SCOPE) {
DefaultGroupService(get()) as GroupService
}

scope(DefaultSession.SCOPE) {
val roomSummaryUpdater = RoomSummaryUpdater(get(), get(), get(), get(), sessionParams.credentials)
val groupSummaryUpdater = GroupSummaryUpdater(get(), get())
val eventsPruner = EventsPruner(get())
listOf<LiveEntityObserver>(roomSummaryUpdater, groupSummaryUpdater, eventsPruner)
}


}.invoke()



View File

@ -0,0 +1,68 @@
package im.vector.matrix.android.internal.session.events.prune

import arrow.core.Option
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.mapper.asEntity
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.where
import io.realm.Realm
import io.realm.RealmResults

internal class EventsPruner(monarchy: Monarchy) :
RealmLiveEntityObserver<EventEntity>(monarchy) {

override val query = Monarchy.Query<EventEntity> { EventEntity.where(it, type = EventType.REDACTION) }

override fun process(results: RealmResults<EventEntity>, indexes: IntArray) {
val redactionEvents = results.map { it.asDomain() }
monarchy.writeAsync { realm ->
indexes.forEach { index ->
val data = redactionEvents[index]
pruneEvent(realm, data)
}
}
}

private fun pruneEvent(realm: Realm, redactionEvent: Event?) {
if (redactionEvent == null || redactionEvent.redacts.isNullOrEmpty()) {
return
}
val eventToPrune = EventEntity.where(realm, eventId = redactionEvent.redacts).findFirst()?.asDomain()
?: return

val allowedKeys = computeAllowedKeys(eventToPrune.type)
val prunedContent = allowedKeys.fold(
{ eventToPrune.content },
{ eventToPrune.content?.filterKeys { key -> it.contains(key) } }
)
val eventToPruneEntity = eventToPrune.copy(content = prunedContent).asEntity()
realm.insertOrUpdate(eventToPruneEntity)
}

private fun computeAllowedKeys(type: String): Option<List<String>> {
// Add filtered content, allowed keys in content depends on the event type
val result = when (type) {
EventType.STATE_ROOM_MEMBER -> listOf("membership")
EventType.STATE_ROOM_CREATE -> listOf("creator")
EventType.STATE_ROOM_JOIN_RULES -> listOf("join_rule")
EventType.STATE_ROOM_POWER_LEVELS -> listOf("users",
"users_default",
"events",
"events_default",
"state_default",
"ban",
"kick",
"redact",
"invite")
EventType.STATE_ROOM_ALIASES -> listOf("aliases")
EventType.STATE_CANONICAL_ALIAS -> listOf("alias")
EventType.FEEDBACK -> listOf("type", "target_event_id")
else -> null
}
return Option.fromNullable(result)
}
}

View File

@ -1,62 +1,30 @@
package im.vector.matrix.android.internal.session.group

import android.arch.lifecycle.Observer
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.session.group.Group
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.model.GroupEntity
import im.vector.matrix.android.internal.database.query.where
import timber.log.Timber
import java.util.concurrent.atomic.AtomicBoolean
import io.realm.RealmResults

internal class GroupSummaryUpdater(private val monarchy: Monarchy,
internal class GroupSummaryUpdater(monarchy: Monarchy,
private val getGroupDataRequest: GetGroupDataRequest
) : Observer<Monarchy.ManagedChangeSet<GroupEntity>> {
) : RealmLiveEntityObserver<GroupEntity>(monarchy) {

private var isStarted = AtomicBoolean(false)
private val liveResults = monarchy.findAllManagedWithChanges { GroupEntity.where(it) }
override val query = Monarchy.Query<GroupEntity> { GroupEntity.where(it) }

fun start() {
if (isStarted.compareAndSet(false, true)) {
liveResults.observeForever(this)
override fun process(results: RealmResults<GroupEntity>, indexes: IntArray) {
indexes.forEach { index ->
val data = results[index]
fetchGroupData(data)
}
}

fun dispose() {
if (isStarted.compareAndSet(true, false)) {
liveResults.removeObserver(this)
}
}

// PRIVATE

override fun onChanged(changeSet: Monarchy.ManagedChangeSet<GroupEntity>?) {
if (changeSet == null) {
private fun fetchGroupData(data: GroupEntity?) {
if (data == null) {
return
}
val groups = changeSet.realmResults.map { it.asDomain() }
val indexesToUpdate = changeSet.orderedCollectionChangeSet.changes + changeSet.orderedCollectionChangeSet.insertions
updateGroupList(groups, indexesToUpdate)
}


private fun updateGroupList(groups: List<Group>, indexes: IntArray) {
indexes.forEach {
val group = groups[it]
try {
updateGroup(group)
} catch (e: Exception) {
Timber.e(e, "An error occured when updating room summaries")
}
}
}

private fun updateGroup(group: Group?) {
if (group == null) {
return
}
getGroupDataRequest.execute(group.groupId, object : MatrixCallback<Boolean> {})
getGroupDataRequest.execute(data.groupId, object : MatrixCallback<Boolean> {})
}

}

View File

@ -1,12 +1,12 @@
package im.vector.matrix.android.internal.session.room

import android.arch.lifecycle.Observer
import android.content.Context
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.room.Room
import im.vector.matrix.android.api.session.room.model.RoomTopicContent
import im.vector.matrix.android.internal.auth.data.Credentials
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.RoomEntity
@ -16,53 +16,24 @@ import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.room.members.RoomDisplayNameResolver
import im.vector.matrix.android.internal.session.room.members.RoomMembers
import io.realm.Realm
import io.realm.RealmResults
import io.realm.kotlin.createObject
import timber.log.Timber
import java.util.concurrent.atomic.AtomicBoolean

internal class RoomSummaryUpdater(private val monarchy: Monarchy,
internal class RoomSummaryUpdater(monarchy: Monarchy,
private val roomDisplayNameResolver: RoomDisplayNameResolver,
private val roomAvatarResolver: RoomAvatarResolver,
private val context: Context,
private val credentials: Credentials
) : Observer<Monarchy.ManagedChangeSet<RoomEntity>> {
) : RealmLiveEntityObserver<RoomEntity>(monarchy) {

private var isStarted = AtomicBoolean(false)
private val liveResults = monarchy.findAllManagedWithChanges { RoomEntity.where(it) }
override val query = Monarchy.Query<RoomEntity> { RoomEntity.where(it) }

fun start() {
if (isStarted.compareAndSet(false, true)) {
liveResults.observeForever(this)
}
}

fun dispose() {
if (isStarted.compareAndSet(true, false)) {
liveResults.removeObserver(this)
}
}

// PRIVATE

override fun onChanged(changeSet: Monarchy.ManagedChangeSet<RoomEntity>?) {
if (changeSet == null) {
return
}
val rooms = changeSet.realmResults.map { it.asDomain() }
val indexesToUpdate = changeSet.orderedCollectionChangeSet.changes + changeSet.orderedCollectionChangeSet.insertions
override fun process(results: RealmResults<RoomEntity>, indexes: IntArray) {
val rooms = results.map { it.asDomain() }
monarchy.writeAsync { realm ->
updateRoomList(realm, rooms, indexesToUpdate)
}
}


private fun updateRoomList(realm: Realm, rooms: List<Room>, indexes: IntArray) {
indexes.forEach {
val room = rooms[it]
try {
updateRoom(realm, room)
} catch (e: Exception) {
Timber.e(e, "An error occured when updating room summaries")
indexes.forEach { index ->
val data = rooms[index]
updateRoom(realm, data)
}
}
}

View File

@ -11,11 +11,7 @@ import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.query.findAllIncludingEvents
import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.sync.model.InvitedRoomSync
import im.vector.matrix.android.internal.session.sync.model.RoomSync
import im.vector.matrix.android.internal.session.sync.model.RoomSyncEphemeral
import im.vector.matrix.android.internal.session.sync.model.RoomSyncSummary
import im.vector.matrix.android.internal.session.sync.model.RoomsSyncResponse
import im.vector.matrix.android.internal.session.sync.model.*
import io.realm.Realm
import io.realm.kotlin.createObject

@ -42,9 +38,9 @@ internal class RoomSyncHandler(private val monarchy: Monarchy,

private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy) {
val rooms = when (handlingStrategy) {
is HandlingStrategy.JOINED -> handlingStrategy.data.map { handleJoinedRoom(realm, it.key, it.value) }
is HandlingStrategy.JOINED -> handlingStrategy.data.map { handleJoinedRoom(realm, it.key, it.value) }
is HandlingStrategy.INVITED -> handlingStrategy.data.map { handleInvitedRoom(realm, it.key, it.value) }
is HandlingStrategy.LEFT -> handlingStrategy.data.map { handleLeftRoom(it.key, it.value) }
is HandlingStrategy.LEFT -> handlingStrategy.data.map { handleLeftRoom(it.key, it.value) }
}
realm.insertOrUpdate(rooms)
}
@ -54,7 +50,7 @@ internal class RoomSyncHandler(private val monarchy: Monarchy,
roomSync: RoomSync): RoomEntity {

val roomEntity = RoomEntity.where(realm, roomId).findFirst()
?: RoomEntity(roomId)
?: RoomEntity(roomId)

if (roomEntity.membership == MyMembership.INVITED) {
roomEntity.chunks.deleteAllFromRealm()
@ -115,7 +111,7 @@ internal class RoomSyncHandler(private val monarchy: Monarchy,
roomSummary: RoomSyncSummary) {

val roomSummaryEntity = RoomSummaryEntity.where(realm, roomId).findFirst()
?: RoomSummaryEntity(roomId)
?: RoomSummaryEntity(roomId)

if (roomSummary.heroes.isNotEmpty()) {
roomSummaryEntity.heroes.clear()
@ -136,7 +132,6 @@ internal class RoomSyncHandler(private val monarchy: Monarchy,
prevToken: String? = null,
nextToken: String? = null,
isLimited: Boolean = true): ChunkEntity {

val chunkEntity = if (!isLimited) {
ChunkEntity.findLastLiveChunkFromRoom(realm, roomId)
} else {
@ -157,5 +152,4 @@ internal class RoomSyncHandler(private val monarchy: Monarchy,
.map { it.content<ReadReceiptContent>() }
.flatMap { readReceiptHandler.handle(realm, roomId, it) }
}

}