Read receipts: use a simpler strategy when it's initialSync

This commit is contained in:
ganfra 2019-08-08 15:03:36 +02:00
parent b4ce8748cb
commit d98567045c
3 changed files with 54 additions and 16 deletions

View File

@ -31,27 +31,62 @@ import javax.inject.Inject
// dict value ts value
typealias ReadReceiptContent = Map<String, Map<String, Map<String, Map<String, Double>>>>

private const val READ_KEY = "m.read"
private const val TIMESTAMP_KEY = "ts"

internal class ReadReceiptHandler @Inject constructor() {

fun handle(realm: Realm, roomId: String, content: ReadReceiptContent?) {
fun handle(realm: Realm, roomId: String, content: ReadReceiptContent?, isInitialSync: Boolean) {
if (content == null) {
return
}
try {
handleReadReceiptContent(realm, roomId, content)
handleReadReceiptContent(realm, roomId, content, isInitialSync)
} catch (exception: Exception) {
Timber.e("Fail to handle read receipt for room $roomId")
}
}

private fun handleReadReceiptContent(realm: Realm, roomId: String, content: ReadReceiptContent) {
private fun handleReadReceiptContent(realm: Realm, roomId: String, content: ReadReceiptContent, isInitialSync: Boolean) {
if (isInitialSync) {
initialSyncStrategy(realm, roomId, content)
} else {
incrementalSyncStrategy(realm, roomId, content)
}
}


private fun initialSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) {
val readReceiptSummaries = ArrayList<ReadReceiptsSummaryEntity>()
for ((eventId, receiptDict) in content) {
val userIdsDict = receiptDict["m.read"] ?: continue
val userIdsDict = receiptDict[READ_KEY] ?: continue
val readReceiptsSummary = ReadReceiptsSummaryEntity(eventId = eventId)

for ((userId, paramsDict) in userIdsDict) {
val ts = paramsDict[TIMESTAMP_KEY] ?: 0.0
val primaryKey = "${roomId}_$userId"
val receiptEntity = ReadReceiptEntity().apply {
this.primaryKey = primaryKey
this.eventId = eventId
this.roomId = roomId
this.userId = userId
this.originServerTs = ts
}
readReceiptsSummary.readReceipts.add(receiptEntity)
}
readReceiptSummaries.add(readReceiptsSummary)
}
realm.insertOrUpdate(readReceiptSummaries)
}

private fun incrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) {
for ((eventId, receiptDict) in content) {
val userIdsDict = receiptDict[READ_KEY] ?: continue
val readReceiptsSummary = ReadReceiptsSummaryEntity.where(realm, eventId).findFirst()
?: realm.createObject(ReadReceiptsSummaryEntity::class.java, eventId)

for ((userId, paramsDict) in userIdsDict) {
val ts = paramsDict["ts"] ?: 0.0
val ts = paramsDict[TIMESTAMP_KEY] ?: 0.0
val primaryKey = "${roomId}_$userId"
val receiptEntity = ReadReceiptEntity.where(realm, roomId, userId).findFirst()
?: realm.createObject(ReadReceiptEntity::class.java, primaryKey)
@ -69,4 +104,5 @@ internal class ReadReceiptHandler @Inject constructor() {
}
}
}

}

View File

@ -62,11 +62,11 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
}

fun handle(roomsSyncResponse: RoomsSyncResponse, reporter: DefaultInitialSyncProgressService? = null) {
fun handle(roomsSyncResponse: RoomsSyncResponse, isInitialSync: Boolean, reporter: DefaultInitialSyncProgressService? = null) {
monarchy.runTransactionSync { realm ->
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), reporter)
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), reporter)
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), reporter)
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter)
}

//handle event for bing rule checks
@ -89,12 +89,12 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch

// PRIVATE METHODS *****************************************************************************

private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, reporter: DefaultInitialSyncProgressService?) {
private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: DefaultInitialSyncProgressService?) {

val rooms = when (handlingStrategy) {
is HandlingStrategy.JOINED ->
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) {
handleJoinedRoom(realm, it.key, it.value)
handleJoinedRoom(realm, it.key, it.value, isInitialSync)
}
is HandlingStrategy.INVITED ->
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_invited_rooms, 0.4f) {
@ -112,7 +112,8 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch

private fun handleJoinedRoom(realm: Realm,
roomId: String,
roomSync: RoomSync): RoomEntity {
roomSync: RoomSync,
isInitalSync: Boolean): RoomEntity {

Timber.v("Handle join sync for room $roomId")

@ -152,7 +153,7 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
roomSummaryUpdater.update(realm, roomId, Membership.JOIN, roomSync.summary, roomSync.unreadNotifications)

if (roomSync.ephemeral != null && roomSync.ephemeral.events.isNotEmpty()) {
handleEphemeral(realm, roomId, roomSync.ephemeral)
handleEphemeral(realm, roomId, roomSync.ephemeral, isInitalSync)
}

if (roomSync.accountData != null && roomSync.accountData.events.isNullOrEmpty().not()) {
@ -236,11 +237,12 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
@Suppress("UNCHECKED_CAST")
private fun handleEphemeral(realm: Realm,
roomId: String,
ephemeral: RoomSyncEphemeral) {
ephemeral: RoomSyncEphemeral,
isInitalSync: Boolean) {
for (event in ephemeral.events) {
if (event.type != EventType.RECEIPT) continue
val readReceiptContent = event.content as? ReadReceiptContent ?: continue
readReceiptHandler.handle(realm, roomId, readReceiptContent)
readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitalSync)
}
}


View File

@ -66,7 +66,7 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl

reportSubtask(reporter, R.string.initial_sync_start_importing_account_rooms, 100, 0.7f) {
if (syncResponse.rooms != null) {
roomSyncHandler.handle(syncResponse.rooms, reporter)
roomSyncHandler.handle(syncResponse.rooms, isInitialSync, reporter)
}
}
}.also {