forked from GitHub-Mirror/riotX-android
User : rework UserEntityUpdater (and make others RealmLiveEntityObserver process on MonarchyThread instead of Main)
This commit is contained in:
parent
d6f6764b0c
commit
41b06bca60
@ -16,11 +16,12 @@
|
||||
|
||||
package im.vector.matrix.android.internal.database
|
||||
|
||||
import androidx.lifecycle.LiveData
|
||||
import androidx.lifecycle.Observer
|
||||
import com.zhuinden.monarchy.Monarchy
|
||||
import io.realm.OrderedCollectionChangeSet
|
||||
import io.realm.RealmObject
|
||||
import io.realm.RealmResults
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
internal interface LiveEntityObserver {
|
||||
fun start()
|
||||
@ -28,38 +29,41 @@ internal interface LiveEntityObserver {
|
||||
}
|
||||
|
||||
internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val monarchy: Monarchy)
|
||||
: Observer<Monarchy.ManagedChangeSet<T>>, LiveEntityObserver {
|
||||
: LiveEntityObserver {
|
||||
|
||||
protected abstract val query: Monarchy.Query<T>
|
||||
private val isStarted = AtomicBoolean(false)
|
||||
private val liveResults: LiveData<Monarchy.ManagedChangeSet<T>> by lazy {
|
||||
monarchy.findAllManagedWithChanges(query)
|
||||
}
|
||||
private lateinit var results: AtomicReference<RealmResults<T>>
|
||||
|
||||
override fun start() {
|
||||
if (isStarted.compareAndSet(false, true)) {
|
||||
liveResults.observeForever(this)
|
||||
monarchy.postToMonarchyThread {
|
||||
val queryResults = query.createQuery(it).findAll()
|
||||
queryResults.addChangeListener { t, changeSet ->
|
||||
onChanged(t, changeSet)
|
||||
}
|
||||
results = AtomicReference(queryResults)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun dispose() {
|
||||
if (isStarted.compareAndSet(true, false)) {
|
||||
liveResults.removeObserver(this)
|
||||
monarchy.postToMonarchyThread {
|
||||
results.getAndSet(null).removeAllChangeListeners()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// PRIVATE
|
||||
|
||||
override fun onChanged(changeSet: Monarchy.ManagedChangeSet<T>?) {
|
||||
if (changeSet == null) {
|
||||
return
|
||||
}
|
||||
val insertionIndexes = changeSet.orderedCollectionChangeSet.insertions
|
||||
val updateIndexes = changeSet.orderedCollectionChangeSet.changes
|
||||
val deletionIndexes = changeSet.orderedCollectionChangeSet.deletions
|
||||
val inserted = changeSet.realmResults.filterIndexed { index, _ -> insertionIndexes.contains(index) }
|
||||
val updated = changeSet.realmResults.filterIndexed { index, _ -> updateIndexes.contains(index) }
|
||||
val deleted = changeSet.realmResults.filterIndexed { index, _ -> deletionIndexes.contains(index) }
|
||||
private fun onChanged(realmResults: RealmResults<T>, changeSet: OrderedCollectionChangeSet) {
|
||||
val insertionIndexes = changeSet.insertions
|
||||
val updateIndexes = changeSet.changes
|
||||
val deletionIndexes = changeSet.deletions
|
||||
val inserted = realmResults.filterIndexed { index, _ -> insertionIndexes.contains(index) }
|
||||
val updated = realmResults.filterIndexed { index, _ -> updateIndexes.contains(index) }
|
||||
val deleted = realmResults.filterIndexed { index, _ -> deletionIndexes.contains(index) }
|
||||
process(inserted, updated, deleted)
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@ package im.vector.matrix.android.internal.session
|
||||
import android.os.Looper
|
||||
import androidx.annotation.MainThread
|
||||
import androidx.lifecycle.LiveData
|
||||
import com.zhuinden.monarchy.Monarchy
|
||||
import im.vector.matrix.android.api.auth.data.SessionParams
|
||||
import im.vector.matrix.android.api.session.Session
|
||||
import im.vector.matrix.android.api.session.content.ContentUrlResolver
|
||||
@ -37,6 +38,7 @@ import im.vector.matrix.android.internal.session.group.GroupModule
|
||||
import im.vector.matrix.android.internal.session.room.RoomModule
|
||||
import im.vector.matrix.android.internal.session.sync.SyncModule
|
||||
import im.vector.matrix.android.internal.session.sync.job.SyncThread
|
||||
import im.vector.matrix.android.internal.session.user.UserModule
|
||||
import org.koin.core.scope.Scope
|
||||
import org.koin.standalone.inject
|
||||
|
||||
@ -49,6 +51,7 @@ internal class DefaultSession(override val sessionParams: SessionParams) : Sessi
|
||||
|
||||
private lateinit var scope: Scope
|
||||
|
||||
private val monarchy by inject<Monarchy>()
|
||||
private val liveEntityUpdaters by inject<List<LiveEntityObserver>>()
|
||||
private val sessionListeners by inject<SessionListeners>()
|
||||
private val roomService by inject<RoomService>()
|
||||
@ -67,8 +70,12 @@ internal class DefaultSession(override val sessionParams: SessionParams) : Sessi
|
||||
val syncModule = SyncModule().definition
|
||||
val roomModule = RoomModule().definition
|
||||
val groupModule = GroupModule().definition
|
||||
MatrixKoinHolder.instance.loadModules(listOf(sessionModule, syncModule, roomModule, groupModule))
|
||||
val userModule = UserModule().definition
|
||||
MatrixKoinHolder.instance.loadModules(listOf(sessionModule, syncModule, roomModule, groupModule, userModule))
|
||||
scope = getKoin().getOrCreateScope(SCOPE)
|
||||
if (!monarchy.isMonarchyThreadOpen) {
|
||||
monarchy.openManually()
|
||||
}
|
||||
liveEntityUpdaters.forEach { it.start() }
|
||||
syncThread.start()
|
||||
}
|
||||
@ -80,6 +87,9 @@ internal class DefaultSession(override val sessionParams: SessionParams) : Sessi
|
||||
assert(isOpen)
|
||||
syncThread.kill()
|
||||
liveEntityUpdaters.forEach { it.dispose() }
|
||||
if (monarchy.isMonarchyThreadOpen) {
|
||||
monarchy.closeManually()
|
||||
}
|
||||
scope.close()
|
||||
isOpen = false
|
||||
}
|
||||
|
@ -34,6 +34,8 @@ import im.vector.matrix.android.internal.session.room.members.RoomDisplayNameRes
|
||||
import im.vector.matrix.android.internal.session.room.members.RoomMemberDisplayNameResolver
|
||||
import im.vector.matrix.android.internal.session.room.prune.EventsPruner
|
||||
import im.vector.matrix.android.internal.session.user.DefaultUserService
|
||||
import im.vector.matrix.android.internal.session.user.UserEntityUpdater
|
||||
import im.vector.matrix.android.internal.session.user.UserModule
|
||||
import im.vector.matrix.android.internal.util.md5
|
||||
import io.realm.RealmConfiguration
|
||||
import org.koin.dsl.module.module
|
||||
@ -113,7 +115,8 @@ internal class SessionModule(private val sessionParams: SessionParams) {
|
||||
val roomSummaryUpdater = RoomSummaryUpdater(get(), get(), get(), get(), sessionParams.credentials)
|
||||
val groupSummaryUpdater = GroupSummaryUpdater(get())
|
||||
val eventsPruner = EventsPruner(get())
|
||||
listOf<LiveEntityObserver>(roomSummaryUpdater, groupSummaryUpdater, eventsPruner)
|
||||
val userEntityUpdater = UserEntityUpdater(get(), get(), get())
|
||||
listOf<LiveEntityObserver>(roomSummaryUpdater, groupSummaryUpdater, eventsPruner, userEntityUpdater)
|
||||
}
|
||||
|
||||
|
||||
|
@ -34,7 +34,7 @@ import im.vector.matrix.android.internal.session.room.members.LoadRoomMembersTas
|
||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||
import im.vector.matrix.android.internal.task.configureWith
|
||||
|
||||
internal data class DefaultRoom(
|
||||
internal class DefaultRoom(
|
||||
override val roomId: String,
|
||||
private val loadRoomMembersTask: LoadRoomMembersTask,
|
||||
private val monarchy: Monarchy,
|
||||
|
@ -26,6 +26,7 @@ import im.vector.matrix.android.internal.database.model.EventEntityFields
|
||||
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import io.realm.Realm
|
||||
import io.realm.Sort
|
||||
|
||||
internal class RoomMembers(private val realm: Realm,
|
||||
private val roomId: String
|
||||
@ -35,6 +36,17 @@ internal class RoomMembers(private val realm: Realm,
|
||||
RoomSummaryEntity.where(realm, roomId).findFirst()
|
||||
}
|
||||
|
||||
fun get(userId: String): RoomMember? {
|
||||
return EventEntity
|
||||
.where(realm, roomId, EventType.STATE_ROOM_MEMBER)
|
||||
.sort(EventEntityFields.STATE_INDEX, Sort.DESCENDING)
|
||||
.equalTo(EventEntityFields.STATE_KEY, userId)
|
||||
.findFirst()
|
||||
?.let {
|
||||
it.asDomain().content?.toModel<RoomMember>()
|
||||
}
|
||||
}
|
||||
|
||||
fun getLoaded(): Map<String, RoomMember> {
|
||||
return EventEntity
|
||||
.where(realm, roomId, EventType.STATE_ROOM_MEMBER)
|
||||
@ -45,7 +57,6 @@ internal class RoomMembers(private val realm: Realm,
|
||||
.mapValues { it.value.content.toModel<RoomMember>()!! }
|
||||
}
|
||||
|
||||
|
||||
fun getNumberOfJoinedMembers(): Int {
|
||||
return roomSummary?.joinedMembersCount
|
||||
?: getLoaded().filterValues { it.membership == Membership.JOIN }.size
|
||||
|
@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Copyright 2019 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package im.vector.matrix.android.internal.session.user
|
||||
|
||||
import arrow.core.Try
|
||||
import com.zhuinden.monarchy.Monarchy
|
||||
import im.vector.matrix.android.api.session.room.model.Membership
|
||||
import im.vector.matrix.android.internal.database.mapper.asDomain
|
||||
import im.vector.matrix.android.internal.database.model.EventEntity
|
||||
import im.vector.matrix.android.internal.database.model.UserEntity
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.session.room.members.RoomMembers
|
||||
import im.vector.matrix.android.internal.task.Task
|
||||
import im.vector.matrix.android.internal.util.tryTransactionSync
|
||||
|
||||
internal interface UpdateUserTask : Task<UpdateUserTask.Params, Unit> {
|
||||
|
||||
data class Params(val eventIds: List<String>)
|
||||
|
||||
}
|
||||
|
||||
internal class DefaultUpdateUserTask(private val monarchy: Monarchy) : UpdateUserTask {
|
||||
|
||||
override fun execute(params: UpdateUserTask.Params): Try<Unit> {
|
||||
return monarchy.tryTransactionSync { realm ->
|
||||
params.eventIds.forEach { eventId ->
|
||||
val event = EventEntity.where(realm, eventId).findFirst()?.asDomain()
|
||||
?: return@forEach
|
||||
val roomId = event.roomId ?: return@forEach
|
||||
val userId = event.stateKey ?: return@forEach
|
||||
val roomMember = RoomMembers(realm, roomId).get(userId) ?: return@forEach
|
||||
if (roomMember.membership != Membership.JOIN) return@forEach
|
||||
|
||||
val userEntity = UserEntity.where(realm, userId).findFirst()
|
||||
?: realm.createObject(UserEntity::class.java, userId)
|
||||
userEntity.displayName = roomMember.displayName ?: ""
|
||||
userEntity.avatarUrl = roomMember.avatarUrl ?: ""
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -21,41 +21,33 @@ package im.vector.matrix.android.internal.session.user
|
||||
import com.zhuinden.monarchy.Monarchy
|
||||
import im.vector.matrix.android.api.session.events.model.EventType
|
||||
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
|
||||
import im.vector.matrix.android.internal.database.mapper.asDomain
|
||||
import im.vector.matrix.android.internal.database.model.EventEntity
|
||||
import im.vector.matrix.android.internal.database.model.EventEntityFields
|
||||
import im.vector.matrix.android.internal.database.model.UserEntity
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.session.room.members.RoomMembers
|
||||
import io.realm.Realm
|
||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||
import im.vector.matrix.android.internal.task.TaskThread
|
||||
import im.vector.matrix.android.internal.task.configureWith
|
||||
import io.realm.Sort
|
||||
|
||||
internal class UserEntityUpdater(monarchy: Monarchy)
|
||||
internal class UserEntityUpdater(monarchy: Monarchy,
|
||||
private val updateUserTask: UpdateUserTask,
|
||||
private val taskExecutor: TaskExecutor)
|
||||
: RealmLiveEntityObserver<EventEntity>(monarchy) {
|
||||
|
||||
override val query = Monarchy.Query<EventEntity> {
|
||||
EventEntity.where(it, type = EventType.STATE_ROOM_MEMBER)
|
||||
EventEntity
|
||||
.where(it, type = EventType.STATE_ROOM_MEMBER)
|
||||
.sort(EventEntityFields.STATE_INDEX, Sort.DESCENDING)
|
||||
.distinct(EventEntityFields.STATE_KEY)
|
||||
|
||||
}
|
||||
|
||||
override fun process(inserted: List<EventEntity>, updated: List<EventEntity>, deleted: List<EventEntity>) {
|
||||
val roomMembersEvents = (inserted + updated).map { it.eventId }
|
||||
monarchy.writeAsync { realm ->
|
||||
roomMembersEvents.forEach { updateUser(realm, it) }
|
||||
}
|
||||
val roomMembersEvents = inserted.map { it.eventId }
|
||||
val taskParams = UpdateUserTask.Params(roomMembersEvents)
|
||||
updateUserTask
|
||||
.configureWith(taskParams)
|
||||
.executeOn(TaskThread.IO)
|
||||
.executeBy(taskExecutor)
|
||||
}
|
||||
|
||||
private fun updateUser(realm: Realm, eventId: String) {
|
||||
val event = EventEntity.where(realm, eventId).findFirst()?.asDomain() ?: return
|
||||
val roomId = event.roomId ?: return
|
||||
val userId = event.stateKey ?: return
|
||||
val roomMember = RoomMembers(realm, roomId).getLoaded()[userId] ?: return
|
||||
|
||||
val userEntity = UserEntity.where(realm, userId).findFirst()
|
||||
?: realm.createObject(UserEntity::class.java, userId)
|
||||
userEntity.displayName = roomMember.displayName ?: ""
|
||||
userEntity.avatarUrl = roomMember.avatarUrl ?: ""
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Copyright 2019 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package im.vector.matrix.android.internal.session.user
|
||||
|
||||
import im.vector.matrix.android.internal.session.DefaultSession
|
||||
import org.koin.dsl.module.module
|
||||
|
||||
internal class UserModule {
|
||||
|
||||
val definition = module(override = true) {
|
||||
|
||||
scope(DefaultSession.SCOPE) {
|
||||
DefaultUpdateUserTask(get()) as UpdateUserTask
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user