forked from GitHub-Mirror/riotX-android
Merge branch 'develop' into feature/room_update
This commit is contained in:
@ -20,6 +20,9 @@ import android.text.TextUtils
|
||||
import com.squareup.moshi.Json
|
||||
import com.squareup.moshi.JsonClass
|
||||
import im.vector.matrix.android.api.session.crypto.MXCryptoError
|
||||
import im.vector.matrix.android.api.session.room.model.message.MessageContent
|
||||
import im.vector.matrix.android.api.session.room.model.message.MessageType
|
||||
import im.vector.matrix.android.api.session.room.send.SendState
|
||||
import im.vector.matrix.android.api.util.JsonDict
|
||||
import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
|
||||
import im.vector.matrix.android.internal.di.MoshiProvider
|
||||
@ -81,6 +84,7 @@ data class Event(
|
||||
|
||||
var mxDecryptionResult: OlmDecryptionResult? = null
|
||||
var mCryptoError: MXCryptoError.ErrorType? = null
|
||||
var sendState: SendState = SendState.UNKNOWN
|
||||
|
||||
|
||||
/**
|
||||
@ -272,6 +276,7 @@ data class Event(
|
||||
if (redacts != other.redacts) return false
|
||||
if (mxDecryptionResult != other.mxDecryptionResult) return false
|
||||
if (mCryptoError != other.mCryptoError) return false
|
||||
if (sendState != other.sendState) return false
|
||||
|
||||
return true
|
||||
}
|
||||
@ -289,6 +294,39 @@ data class Event(
|
||||
result = 31 * result + (redacts?.hashCode() ?: 0)
|
||||
result = 31 * result + (mxDecryptionResult?.hashCode() ?: 0)
|
||||
result = 31 * result + (mCryptoError?.hashCode() ?: 0)
|
||||
result = 31 * result + sendState.hashCode()
|
||||
return result
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
fun Event.isTextMessage(): Boolean {
|
||||
if (this.getClearType() == EventType.MESSAGE) {
|
||||
return getClearContent()?.toModel<MessageContent>()?.let {
|
||||
when (it.type) {
|
||||
MessageType.MSGTYPE_TEXT,
|
||||
MessageType.MSGTYPE_EMOTE,
|
||||
MessageType.MSGTYPE_NOTICE -> {
|
||||
true
|
||||
}
|
||||
else -> false
|
||||
}
|
||||
} ?: false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
fun Event.isImageMessage(): Boolean {
|
||||
if (this.getClearType() == EventType.MESSAGE) {
|
||||
return getClearContent()?.toModel<MessageContent>()?.let {
|
||||
when (it.type) {
|
||||
MessageType.MSGTYPE_IMAGE -> {
|
||||
true
|
||||
}
|
||||
else -> false
|
||||
}
|
||||
} ?: false
|
||||
}
|
||||
return false
|
||||
}
|
@ -29,5 +29,5 @@ interface MessageContent {
|
||||
|
||||
|
||||
fun MessageContent?.isReply(): Boolean {
|
||||
return this?.relatesTo?.inReplyTo != null
|
||||
}
|
||||
return this?.relatesTo?.inReplyTo?.eventId != null
|
||||
}
|
||||
|
@ -21,5 +21,5 @@ import com.squareup.moshi.JsonClass
|
||||
|
||||
@JsonClass(generateAdapter = true)
|
||||
data class ReplyToContent(
|
||||
@Json(name = "event_id") val eventId: String
|
||||
)
|
||||
@Json(name = "event_id") val eventId: String? = null
|
||||
)
|
||||
|
@ -19,6 +19,7 @@ package im.vector.matrix.android.api.session.room.send
|
||||
import im.vector.matrix.android.api.session.content.ContentAttachmentData
|
||||
import im.vector.matrix.android.api.session.events.model.Event
|
||||
import im.vector.matrix.android.api.session.room.model.message.MessageType
|
||||
import im.vector.matrix.android.api.session.room.timeline.TimelineEvent
|
||||
import im.vector.matrix.android.api.util.Cancelable
|
||||
|
||||
|
||||
@ -65,4 +66,31 @@ interface SendService {
|
||||
*/
|
||||
fun redactEvent(event: Event, reason: String?): Cancelable
|
||||
|
||||
|
||||
/**
|
||||
* Schedule this message to be resent
|
||||
* @param localEcho the unsent local echo
|
||||
*/
|
||||
fun resendTextMessage(localEcho: TimelineEvent): Cancelable?
|
||||
|
||||
/**
|
||||
* Schedule this message to be resent
|
||||
* @param localEcho the unsent local echo
|
||||
*/
|
||||
fun resendMediaMessage(localEcho: TimelineEvent): Cancelable?
|
||||
|
||||
|
||||
/**
|
||||
* Remove this failed message from the timeline
|
||||
* @param localEcho the unsent local echo
|
||||
*/
|
||||
fun deleteFailedEcho(localEcho: TimelineEvent)
|
||||
|
||||
fun clearSendingQueue()
|
||||
|
||||
/**
|
||||
* Resend all failed messages one by one (and keep order)
|
||||
*/
|
||||
fun resendAllFailedMessages()
|
||||
|
||||
}
|
@ -41,4 +41,8 @@ enum class SendState {
|
||||
return this == UNDELIVERED || this == FAILED_UNKNOWN_DEVICES
|
||||
}
|
||||
|
||||
fun isSending(): Boolean {
|
||||
return this == UNSENT || this == ENCRYPTING || this == SENDING
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -56,6 +56,9 @@ interface Timeline {
|
||||
*/
|
||||
fun paginate(direction: Direction, count: Int)
|
||||
|
||||
fun pendingEventCount() : Int
|
||||
|
||||
fun failedToDeliverEventCount() : Int
|
||||
|
||||
interface Listener {
|
||||
/**
|
||||
|
@ -38,7 +38,6 @@ data class TimelineEvent(
|
||||
val senderName: String?,
|
||||
val isUniqueDisplayName: Boolean,
|
||||
val senderAvatar: String?,
|
||||
val sendState: SendState,
|
||||
val annotations: EventAnnotationsSummary? = null
|
||||
) {
|
||||
|
||||
|
@ -73,6 +73,7 @@ internal object EventMapper {
|
||||
unsignedData = ud,
|
||||
redacts = eventEntity.redacts
|
||||
).also {
|
||||
it.sendState = eventEntity.sendState
|
||||
eventEntity.decryptionResultJson?.let { json ->
|
||||
try {
|
||||
it.mxDecryptionResult = MoshiProvider.providesMoshi().adapter(OlmDecryptionResult::class.java).fromJson(json)
|
||||
|
@ -33,8 +33,7 @@ internal object TimelineEventMapper {
|
||||
displayIndex = timelineEventEntity.root?.displayIndex ?: 0,
|
||||
senderName = timelineEventEntity.senderName,
|
||||
isUniqueDisplayName = timelineEventEntity.isUniqueDisplayName,
|
||||
senderAvatar = timelineEventEntity.senderAvatar,
|
||||
sendState = timelineEventEntity.root?.sendState ?: SendState.UNKNOWN
|
||||
senderAvatar = timelineEventEntity.senderAvatar
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -57,9 +57,11 @@ internal class UploadContentWorker(context: Context, params: WorkerParameters) :
|
||||
override suspend fun doWork(): Result {
|
||||
val params = WorkerParamsFactory.fromData<Params>(inputData)
|
||||
?: return Result.success()
|
||||
Timber.v("Starting upload media work with params $params")
|
||||
|
||||
if (params.lastFailureMessage != null) {
|
||||
// Transmit the error
|
||||
Timber.v("Stop upload media work due to input failure")
|
||||
return Result.success(inputData)
|
||||
}
|
||||
|
||||
@ -121,7 +123,11 @@ internal class UploadContentWorker(context: Context, params: WorkerParameters) :
|
||||
|
||||
val progressListener = object : ProgressRequestBody.Listener {
|
||||
override fun onProgress(current: Long, total: Long) {
|
||||
contentUploadStateTracker.setProgress(eventId, current, total)
|
||||
if (isStopped) {
|
||||
contentUploadStateTracker.setFailure(eventId, Throwable("Cancelled"))
|
||||
} else {
|
||||
contentUploadStateTracker.setProgress(eventId, current, total)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -166,6 +172,7 @@ internal class UploadContentWorker(context: Context, params: WorkerParameters) :
|
||||
encryptedFileInfo: EncryptedFileInfo?,
|
||||
thumbnailUrl: String?,
|
||||
thumbnailEncryptedFileInfo: EncryptedFileInfo?): Result {
|
||||
Timber.v("handleSuccess $attachmentUrl, work is stopped $isStopped")
|
||||
contentUploadStateTracker.setSuccess(params.event.eventId!!)
|
||||
val event = updateEvent(params.event, attachmentUrl, encryptedFileInfo, thumbnailUrl, thumbnailEncryptedFileInfo)
|
||||
val sendParams = SendEventWorker.Params(params.userId, params.roomId, event)
|
||||
@ -210,6 +217,7 @@ internal class UploadContentWorker(context: Context, params: WorkerParameters) :
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
private fun MessageFileContent.update(url: String,
|
||||
encryptedFileInfo: EncryptedFileInfo?): MessageFileContent {
|
||||
return copy(
|
||||
|
@ -62,9 +62,7 @@ internal class RoomSummaryUpdater @Inject constructor(private val credentials: C
|
||||
roomId: String,
|
||||
membership: Membership? = null,
|
||||
roomSummary: RoomSyncSummary? = null,
|
||||
unreadNotifications: RoomSyncUnreadNotifications? = null,
|
||||
isDirect: Boolean? = null,
|
||||
directUserId: String? = null) {
|
||||
unreadNotifications: RoomSyncUnreadNotifications? = null) {
|
||||
val roomSummaryEntity = RoomSummaryEntity.where(realm, roomId).findFirst()
|
||||
?: realm.createObject(roomId)
|
||||
|
||||
@ -97,10 +95,6 @@ internal class RoomSummaryUpdater @Inject constructor(private val credentials: C
|
||||
.asSequence()
|
||||
.map { it.stateKey }
|
||||
|
||||
if (isDirect != null) {
|
||||
roomSummaryEntity.isDirect = isDirect
|
||||
roomSummaryEntity.directUserId = directUserId
|
||||
}
|
||||
roomSummaryEntity.displayName = roomDisplayNameResolver.resolve(roomId).toString()
|
||||
roomSummaryEntity.avatarUrl = roomAvatarResolver.resolve(roomId)
|
||||
roomSummaryEntity.topic = lastTopicEvent?.content.toModel<RoomTopicContent>()?.topic
|
||||
|
@ -86,7 +86,7 @@ internal class DefaultCreateRoomTask @Inject constructor(private val roomAPI: Ro
|
||||
this.isDirect = true
|
||||
}
|
||||
}.flatMap {
|
||||
val directChats = directChatsHelper.getDirectChats()
|
||||
val directChats = directChatsHelper.getLocalUserAccount()
|
||||
updateUserAccountDataTask.execute(UpdateUserAccountDataTask.DirectChatParams(directMessages = directChats))
|
||||
}.flatMap {
|
||||
Try.just(roomId)
|
||||
|
@ -29,6 +29,7 @@ import im.vector.matrix.android.internal.database.model.TimelineEventEntity
|
||||
import im.vector.matrix.android.internal.database.query.findWithSenderMembershipEvent
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.di.MoshiProvider
|
||||
import im.vector.matrix.android.internal.session.room.send.LocalEchoEventFactory
|
||||
import im.vector.matrix.android.internal.task.Task
|
||||
import im.vector.matrix.android.internal.util.tryTransactionSync
|
||||
import io.realm.Realm
|
||||
@ -63,7 +64,7 @@ internal class DefaultPruneEventTask @Inject constructor(private val monarchy: M
|
||||
val redactionEventEntity = EventEntity.where(realm, eventId = redactionEvent.eventId
|
||||
?: "").findFirst()
|
||||
?: return
|
||||
val isLocalEcho = redactionEventEntity.sendState == SendState.UNSENT
|
||||
val isLocalEcho = LocalEchoEventFactory.isLocalEchoId(redactionEvent.eventId ?: "")
|
||||
Timber.v("Redact event for ${redactionEvent.redacts} localEcho=$isLocalEcho")
|
||||
|
||||
val eventToPrune = EventEntity.where(realm, eventId = redactionEvent.redacts).findFirst()
|
||||
|
@ -17,25 +17,36 @@
|
||||
package im.vector.matrix.android.internal.session.room.send
|
||||
|
||||
import android.content.Context
|
||||
import androidx.work.BackoffPolicy
|
||||
import androidx.work.ExistingWorkPolicy
|
||||
import androidx.work.OneTimeWorkRequest
|
||||
import androidx.work.WorkManager
|
||||
import androidx.work.*
|
||||
import com.zhuinden.monarchy.Monarchy
|
||||
import im.vector.matrix.android.api.auth.data.Credentials
|
||||
import im.vector.matrix.android.api.session.content.ContentAttachmentData
|
||||
import im.vector.matrix.android.api.session.crypto.CryptoService
|
||||
import im.vector.matrix.android.api.session.events.model.Event
|
||||
import im.vector.matrix.android.api.session.events.model.EventType
|
||||
import im.vector.matrix.android.api.session.events.model.isTextMessage
|
||||
import im.vector.matrix.android.api.session.events.model.toModel
|
||||
import im.vector.matrix.android.api.session.room.model.message.MessageContent
|
||||
import im.vector.matrix.android.api.session.room.model.message.MessageType
|
||||
import im.vector.matrix.android.api.session.room.send.SendService
|
||||
import im.vector.matrix.android.api.session.room.send.SendState
|
||||
import im.vector.matrix.android.api.session.room.timeline.TimelineEvent
|
||||
import im.vector.matrix.android.api.util.Cancelable
|
||||
import im.vector.matrix.android.api.util.CancelableBag
|
||||
import im.vector.matrix.android.internal.database.mapper.asDomain
|
||||
import im.vector.matrix.android.internal.database.model.EventEntity
|
||||
import im.vector.matrix.android.internal.database.model.RoomEntity
|
||||
import im.vector.matrix.android.internal.database.model.TimelineEventEntity
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.session.content.UploadContentWorker
|
||||
import im.vector.matrix.android.internal.session.room.timeline.TimelineSendEventWorkCommon
|
||||
import im.vector.matrix.android.internal.util.CancelableWork
|
||||
import im.vector.matrix.android.internal.util.tryTransactionAsync
|
||||
import im.vector.matrix.android.internal.worker.WorkManagerUtil
|
||||
import im.vector.matrix.android.internal.worker.WorkManagerUtil.matrixOneTimeWorkRequestBuilder
|
||||
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.inject.Inject
|
||||
|
||||
@ -50,6 +61,7 @@ internal class DefaultSendService @Inject constructor(private val context: Conte
|
||||
private val monarchy: Monarchy)
|
||||
: SendService {
|
||||
|
||||
private val workerFutureListenerExecutor = Executors.newSingleThreadExecutor()
|
||||
override fun sendTextMessage(text: String, msgType: String, autoMarkdown: Boolean): Cancelable {
|
||||
val event = localEchoEventFactory.createTextEvent(roomId, msgType, text, autoMarkdown).also {
|
||||
saveLocalEcho(it)
|
||||
@ -70,7 +82,7 @@ internal class DefaultSendService @Inject constructor(private val context: Conte
|
||||
// Encrypted room handling
|
||||
return if (cryptoService.isRoomEncrypted(roomId)) {
|
||||
Timber.v("Send event in encrypted room")
|
||||
val encryptWork = createEncryptEventWork(event)
|
||||
val encryptWork = createEncryptEventWork(event, true)
|
||||
val sendWork = createSendEventWork(event)
|
||||
TimelineSendEventWorkCommon.postSequentialWorks(context, roomId, encryptWork, sendWork)
|
||||
CancelableWork(context, encryptWork.id)
|
||||
@ -94,25 +106,162 @@ internal class DefaultSendService @Inject constructor(private val context: Conte
|
||||
return CancelableWork(context, redactWork.id)
|
||||
}
|
||||
|
||||
override fun resendTextMessage(localEcho: TimelineEvent): Cancelable? {
|
||||
if (localEcho.root.isTextMessage()) {
|
||||
return sendEvent(localEcho.root)
|
||||
}
|
||||
return null
|
||||
|
||||
}
|
||||
|
||||
override fun resendMediaMessage(localEcho: TimelineEvent): Cancelable? {
|
||||
//TODO this need a refactoring of attachement sending
|
||||
// val clearContent = localEcho.root.getClearContent()
|
||||
// val messageContent = clearContent?.toModel<MessageContent>() ?: return null
|
||||
// when (messageContent.type) {
|
||||
// MessageType.MSGTYPE_IMAGE -> {
|
||||
// val imageContent = clearContent.toModel<MessageImageContent>() ?: return null
|
||||
// val url = imageContent.url ?: return null
|
||||
// if (url.startsWith("mxc://")) {
|
||||
// //TODO
|
||||
// } else {
|
||||
// //The image has not yet been sent
|
||||
// val attachmentData = ContentAttachmentData(
|
||||
// size = imageContent.info!!.size.toLong(),
|
||||
// mimeType = imageContent.info.mimeType!!,
|
||||
// width = imageContent.info.width.toLong(),
|
||||
// height = imageContent.info.height.toLong(),
|
||||
// name = imageContent.body,
|
||||
// path = imageContent.url,
|
||||
// type = ContentAttachmentData.Type.IMAGE
|
||||
// )
|
||||
// monarchy.runTransactionSync {
|
||||
// EventEntity.where(it,eventId = localEcho.root.eventId ?: "").findFirst()?.let {
|
||||
// it.sendState = SendState.UNSENT
|
||||
// }
|
||||
// }
|
||||
// return internalSendMedia(localEcho.root,attachmentData)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
return null
|
||||
|
||||
}
|
||||
|
||||
override fun deleteFailedEcho(localEcho: TimelineEvent) {
|
||||
monarchy.tryTransactionAsync { realm ->
|
||||
TimelineEventEntity.where(realm, eventId = localEcho.root.eventId
|
||||
?: "").findFirst()?.let {
|
||||
it.deleteFromRealm()
|
||||
}
|
||||
EventEntity.where(realm, eventId = localEcho.root.eventId
|
||||
?: "").findFirst()?.let {
|
||||
it.deleteFromRealm()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun clearSendingQueue() {
|
||||
TimelineSendEventWorkCommon.cancelAllWorks(context, roomId)
|
||||
WorkManager.getInstance(context).cancelUniqueWork(buildWorkIdentifier(UPLOAD_WORK))
|
||||
|
||||
matrixOneTimeWorkRequestBuilder<FakeSendWorker>()
|
||||
.build().let {
|
||||
TimelineSendEventWorkCommon.postWork(context, roomId, it, ExistingWorkPolicy.REPLACE)
|
||||
|
||||
//need to clear also image sending queue
|
||||
WorkManager.getInstance(context)
|
||||
.beginUniqueWork(buildWorkIdentifier(UPLOAD_WORK), ExistingWorkPolicy.REPLACE, it)
|
||||
.enqueue()
|
||||
}
|
||||
|
||||
monarchy.tryTransactionAsync { realm ->
|
||||
RoomEntity.where(realm, roomId).findFirst()?.let { room ->
|
||||
room.sendingTimelineEvents.forEach {
|
||||
it.root?.sendState = SendState.UNDELIVERED
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
override fun resendAllFailedMessages() {
|
||||
monarchy.tryTransactionAsync { realm ->
|
||||
RoomEntity.where(realm, roomId).findFirst()?.let { room ->
|
||||
room.sendingTimelineEvents.filter {
|
||||
it.root?.sendState?.hasFailed() ?: false
|
||||
}.sortedBy { it.root?.originServerTs ?: 0 }.forEach { timelineEventEntity ->
|
||||
timelineEventEntity.root?.let {
|
||||
val event = it.asDomain()
|
||||
when (event.getClearType()) {
|
||||
EventType.MESSAGE,
|
||||
EventType.REDACTION,
|
||||
EventType.REACTION -> {
|
||||
val content = event.getClearContent().toModel<MessageContent>()
|
||||
if (content != null) {
|
||||
when (content.type) {
|
||||
MessageType.MSGTYPE_EMOTE,
|
||||
MessageType.MSGTYPE_NOTICE,
|
||||
MessageType.MSGTYPE_LOCATION,
|
||||
MessageType.MSGTYPE_TEXT -> {
|
||||
it.sendState = SendState.UNSENT
|
||||
sendEvent(event)
|
||||
}
|
||||
MessageType.MSGTYPE_FILE,
|
||||
MessageType.MSGTYPE_VIDEO,
|
||||
MessageType.MSGTYPE_IMAGE,
|
||||
MessageType.MSGTYPE_AUDIO -> {
|
||||
//need to resend the attachement
|
||||
}
|
||||
else -> {
|
||||
Timber.e("Cannot resend message ${event.type} / ${content.type}")
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
Timber.e("Unsupported message to resend ${event.type}")
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
Timber.e("Unsupported message to resend ${event.type}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun sendMedia(attachment: ContentAttachmentData): Cancelable {
|
||||
// Create an event with the media file path
|
||||
val event = localEchoEventFactory.createMediaEvent(roomId, attachment).also {
|
||||
saveLocalEcho(it)
|
||||
}
|
||||
|
||||
return internalSendMedia(event, attachment)
|
||||
}
|
||||
|
||||
private fun internalSendMedia(localEcho: Event, attachment: ContentAttachmentData): CancelableWork {
|
||||
val isRoomEncrypted = cryptoService.isRoomEncrypted(roomId)
|
||||
|
||||
val uploadWork = createUploadMediaWork(event, attachment, isRoomEncrypted)
|
||||
val sendWork = createSendEventWork(event)
|
||||
val uploadWork = createUploadMediaWork(localEcho, attachment, isRoomEncrypted, startChain = true)
|
||||
val sendWork = createSendEventWork(localEcho)
|
||||
|
||||
if (isRoomEncrypted) {
|
||||
val encryptWork = createEncryptEventWork(event)
|
||||
val encryptWork = createEncryptEventWork(localEcho, false /*not start of chain, take input error*/)
|
||||
|
||||
WorkManager.getInstance(context)
|
||||
val op: Operation = WorkManager.getInstance(context)
|
||||
.beginUniqueWork(buildWorkIdentifier(UPLOAD_WORK), ExistingWorkPolicy.APPEND, uploadWork)
|
||||
.then(encryptWork)
|
||||
.then(sendWork)
|
||||
.enqueue()
|
||||
op.result.addListener(Runnable {
|
||||
if (op.result.isCancelled) {
|
||||
Timber.e("CHAINE WAS CANCELLED")
|
||||
} else if (op.state.value is Operation.State.FAILURE) {
|
||||
Timber.e("CHAINE DID FAIL")
|
||||
}
|
||||
}, workerFutureListenerExecutor)
|
||||
} else {
|
||||
WorkManager.getInstance(context)
|
||||
.beginUniqueWork(buildWorkIdentifier(UPLOAD_WORK), ExistingWorkPolicy.APPEND, uploadWork)
|
||||
@ -131,7 +280,7 @@ internal class DefaultSendService @Inject constructor(private val context: Conte
|
||||
return "${roomId}_$identifier"
|
||||
}
|
||||
|
||||
private fun createEncryptEventWork(event: Event): OneTimeWorkRequest {
|
||||
private fun createEncryptEventWork(event: Event, startChain: Boolean = false): OneTimeWorkRequest {
|
||||
// Same parameter
|
||||
val params = EncryptEventWorker.Params(credentials.userId, roomId, event)
|
||||
val sendWorkData = WorkerParamsFactory.toData(params)
|
||||
@ -139,6 +288,11 @@ internal class DefaultSendService @Inject constructor(private val context: Conte
|
||||
return matrixOneTimeWorkRequestBuilder<EncryptEventWorker>()
|
||||
.setConstraints(WorkManagerUtil.workConstraints)
|
||||
.setInputData(sendWorkData)
|
||||
.apply {
|
||||
if (startChain) {
|
||||
setInputMerger(NoMerger::class.java)
|
||||
}
|
||||
}
|
||||
.setBackoffCriteria(BackoffPolicy.LINEAR, BACKOFF_DELAY, TimeUnit.MILLISECONDS)
|
||||
.build()
|
||||
}
|
||||
@ -159,15 +313,24 @@ internal class DefaultSendService @Inject constructor(private val context: Conte
|
||||
return TimelineSendEventWorkCommon.createWork<RedactEventWorker>(redactWorkData)
|
||||
}
|
||||
|
||||
private fun createUploadMediaWork(event: Event, attachment: ContentAttachmentData, isRoomEncrypted: Boolean): OneTimeWorkRequest {
|
||||
private fun createUploadMediaWork(event: Event,
|
||||
attachment: ContentAttachmentData,
|
||||
isRoomEncrypted: Boolean,
|
||||
startChain: Boolean = false): OneTimeWorkRequest {
|
||||
val uploadMediaWorkerParams = UploadContentWorker.Params(credentials.userId, roomId, event, attachment, isRoomEncrypted)
|
||||
val uploadWorkData = WorkerParamsFactory.toData(uploadMediaWorkerParams)
|
||||
|
||||
return matrixOneTimeWorkRequestBuilder<UploadContentWorker>()
|
||||
.setConstraints(WorkManagerUtil.workConstraints)
|
||||
.apply {
|
||||
if (startChain) {
|
||||
setInputMerger(NoMerger::class.java)
|
||||
}
|
||||
}
|
||||
.setInputData(uploadWorkData)
|
||||
.setBackoffCriteria(BackoffPolicy.LINEAR, BACKOFF_DELAY, TimeUnit.MILLISECONDS)
|
||||
.build()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -29,6 +29,7 @@ import im.vector.matrix.android.internal.crypto.model.MXEncryptEventContentResul
|
||||
import im.vector.matrix.android.internal.worker.SessionWorkerParams
|
||||
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
|
||||
import im.vector.matrix.android.internal.worker.getSessionComponent
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import javax.inject.Inject
|
||||
|
||||
@ -49,10 +50,13 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
|
||||
@Inject lateinit var localEchoUpdater: LocalEchoUpdater
|
||||
|
||||
override fun doWork(): Result {
|
||||
|
||||
Timber.v("Start Encrypt work")
|
||||
val params = WorkerParamsFactory.fromData<Params>(inputData)
|
||||
?: return Result.success()
|
||||
?: return Result.success().also {
|
||||
Timber.v("Work cancelled due to input error from parent")
|
||||
}
|
||||
|
||||
Timber.v("Start Encrypt work for event ${params.event.eventId}")
|
||||
if (params.lastFailureMessage != null) {
|
||||
// Transmit the error
|
||||
return Result.success(inputData)
|
||||
@ -97,7 +101,7 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
|
||||
latch.await()
|
||||
|
||||
if (result != null) {
|
||||
var modifiedContent = HashMap(result?.eventContent)
|
||||
val modifiedContent = HashMap(result?.eventContent)
|
||||
params.keepKeys?.forEach { toKeep ->
|
||||
localEvent.content?.get(toKeep)?.let {
|
||||
//put it back in the encrypted thing
|
||||
|
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Copyright 2019 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package im.vector.matrix.android.internal.session.room.send
|
||||
|
||||
import android.content.Context
|
||||
import androidx.work.Worker
|
||||
import androidx.work.WorkerParameters
|
||||
|
||||
internal class FakeSendWorker(context: Context, params: WorkerParameters)
|
||||
: Worker(context, params) {
|
||||
|
||||
override fun doWork(): Result {
|
||||
return Result.success()
|
||||
}
|
||||
}
|
@ -21,15 +21,21 @@ import im.vector.matrix.android.api.session.room.send.SendState
|
||||
import im.vector.matrix.android.internal.database.model.EventEntity
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.util.tryTransactionAsync
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class LocalEchoUpdater @Inject constructor(private val monarchy: Monarchy) {
|
||||
|
||||
fun updateSendState(eventId: String, sendState: SendState) {
|
||||
Timber.v("Update local state of $eventId to ${sendState.name}")
|
||||
monarchy.tryTransactionAsync { realm ->
|
||||
val sendingEventEntity = EventEntity.where(realm, eventId).findFirst()
|
||||
if (sendingEventEntity != null) {
|
||||
sendingEventEntity.sendState = sendState
|
||||
if (sendState == SendState.SENT && sendingEventEntity.sendState == SendState.SYNCED) {
|
||||
//If already synced, do not put as sent
|
||||
} else {
|
||||
sendingEventEntity.sendState = sendState
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright 2019 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package im.vector.matrix.android.internal.session.room.send
|
||||
|
||||
import androidx.work.Data
|
||||
import androidx.work.InputMerger
|
||||
|
||||
class NoMerger : InputMerger() {
|
||||
override fun merge(inputs: MutableList<Data>): Data {
|
||||
return inputs.first()
|
||||
}
|
||||
}
|
@ -82,7 +82,10 @@ internal class SendEventWorker constructor(context: Context, params: WorkerParam
|
||||
Result.success()
|
||||
}
|
||||
}
|
||||
}, { Result.success() })
|
||||
}, {
|
||||
localEchoUpdater.updateSendState(event.eventId, SendState.SENT)
|
||||
Result.success()
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -206,6 +206,23 @@ internal class DefaultTimeline(
|
||||
}
|
||||
}
|
||||
|
||||
override fun pendingEventCount(): Int {
|
||||
var count = 0
|
||||
Realm.getInstance(realmConfiguration).use {
|
||||
count = RoomEntity.where(it,roomId).findFirst()?.sendingTimelineEvents?.count() ?: 0
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
override fun failedToDeliverEventCount(): Int {
|
||||
var count = 0
|
||||
Realm.getInstance(realmConfiguration).use {
|
||||
count = RoomEntity.where(it,roomId).findFirst()?.sendingTimelineEvents?.filter {
|
||||
it.root?.sendState?.hasFailed() ?: false
|
||||
}?.count() ?: 0
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
if (isStarted.compareAndSet(false, true)) {
|
||||
|
@ -51,9 +51,9 @@ internal object TimelineSendEventWorkCommon {
|
||||
}
|
||||
}
|
||||
|
||||
fun postWork(context: Context, roomId: String, workRequest: OneTimeWorkRequest) {
|
||||
fun postWork(context: Context, roomId: String, workRequest: OneTimeWorkRequest, policy: ExistingWorkPolicy = ExistingWorkPolicy.APPEND) {
|
||||
WorkManager.getInstance(context)
|
||||
.beginUniqueWork(buildWorkIdentifier(roomId), ExistingWorkPolicy.APPEND, workRequest)
|
||||
.beginUniqueWork(buildWorkIdentifier(roomId), policy, workRequest)
|
||||
.enqueue()
|
||||
}
|
||||
|
||||
@ -68,4 +68,8 @@ internal object TimelineSendEventWorkCommon {
|
||||
private fun buildWorkIdentifier(roomId: String): String {
|
||||
return "${roomId}_$SEND_WORK"
|
||||
}
|
||||
|
||||
fun cancelAllWorks(context: Context, roomId: String) {
|
||||
WorkManager.getInstance(context).cancelUniqueWork(buildWorkIdentifier(roomId))
|
||||
}
|
||||
}
|
@ -35,14 +35,10 @@ import im.vector.matrix.android.internal.database.mapper.asDomain
|
||||
import im.vector.matrix.android.internal.database.model.ChunkEntity
|
||||
import im.vector.matrix.android.internal.database.model.EventEntityFields
|
||||
import im.vector.matrix.android.internal.database.model.RoomEntity
|
||||
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
|
||||
import im.vector.matrix.android.internal.database.query.find
|
||||
import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom
|
||||
import im.vector.matrix.android.internal.database.query.isDirect
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService
|
||||
import im.vector.matrix.android.internal.session.user.accountdata.DirectChatsHelper
|
||||
import im.vector.matrix.android.internal.session.user.accountdata.UpdateUserAccountDataTask
|
||||
import im.vector.matrix.android.internal.session.mapWithProgress
|
||||
import im.vector.matrix.android.internal.session.notification.DefaultPushRuleService
|
||||
import im.vector.matrix.android.internal.session.notification.ProcessEventForPushTask
|
||||
@ -70,9 +66,7 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
||||
private val tokenStore: SyncTokenStore,
|
||||
private val pushRuleService: DefaultPushRuleService,
|
||||
private val processForPushTask: ProcessEventForPushTask,
|
||||
private val updateUserAccountDataTask: UpdateUserAccountDataTask,
|
||||
private val credentials: Credentials,
|
||||
private val directChatsHelper: DirectChatsHelper,
|
||||
private val taskExecutor: TaskExecutor) {
|
||||
|
||||
sealed class HandlingStrategy {
|
||||
@ -192,21 +186,7 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
||||
val chunkEntity = handleTimelineEvents(realm, roomEntity, roomSync.inviteState.events)
|
||||
roomEntity.addOrUpdate(chunkEntity)
|
||||
}
|
||||
val myUserStateEvent = RoomMembers(realm, roomId).getStateEvent(credentials.userId)
|
||||
val inviterId = myUserStateEvent?.sender
|
||||
val myUserRoomMember: RoomMember? = myUserStateEvent?.let { it.asDomain().content?.toModel() }
|
||||
val isDirect = myUserRoomMember?.isDirect
|
||||
if (isDirect == true && inviterId != null) {
|
||||
val isAlreadyDirect = RoomSummaryEntity.isDirect(realm, roomId)
|
||||
if (!isAlreadyDirect) {
|
||||
val directChatsMap = directChatsHelper.getDirectChats(include = Pair(inviterId, roomId))
|
||||
val updateUserAccountParams = UpdateUserAccountDataTask.DirectChatParams(
|
||||
directMessages = directChatsMap
|
||||
)
|
||||
updateUserAccountDataTask.configureWith(updateUserAccountParams).executeBy(taskExecutor)
|
||||
}
|
||||
}
|
||||
roomSummaryUpdater.update(realm, roomId, Membership.INVITE, isDirect = isDirect, directUserId = inviterId)
|
||||
roomSummaryUpdater.update(realm, roomId, Membership.INVITE)
|
||||
return roomEntity
|
||||
}
|
||||
|
||||
|
@ -17,11 +17,23 @@
|
||||
package im.vector.matrix.android.internal.session.sync
|
||||
|
||||
import arrow.core.Try
|
||||
import com.zhuinden.monarchy.Monarchy
|
||||
import im.vector.matrix.android.R
|
||||
import im.vector.matrix.android.api.auth.data.Credentials
|
||||
import im.vector.matrix.android.api.session.events.model.toModel
|
||||
import im.vector.matrix.android.api.session.room.model.RoomMember
|
||||
import im.vector.matrix.android.internal.crypto.CryptoManager
|
||||
import im.vector.matrix.android.internal.database.mapper.asDomain
|
||||
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
|
||||
import im.vector.matrix.android.internal.database.query.isDirect
|
||||
import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService
|
||||
import im.vector.matrix.android.internal.session.reportSubtask
|
||||
import im.vector.matrix.android.internal.session.room.membership.RoomMembers
|
||||
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
|
||||
import im.vector.matrix.android.internal.session.user.accountdata.DirectChatsHelper
|
||||
import im.vector.matrix.android.internal.session.user.accountdata.UpdateUserAccountDataTask
|
||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||
import im.vector.matrix.android.internal.task.configureWith
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
import kotlin.system.measureTimeMillis
|
||||
@ -88,9 +100,7 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl
|
||||
measureTimeMillis {
|
||||
reportSubtask(reporter, R.string.initial_sync_start_importing_account_data, 100, 0.1f) {
|
||||
Timber.v("Handle accountData")
|
||||
if (syncResponse.accountData != null) {
|
||||
userAccountDataSyncHandler.handle(syncResponse.accountData)
|
||||
}
|
||||
userAccountDataSyncHandler.handle(syncResponse.accountData, syncResponse.rooms?.invite)
|
||||
}
|
||||
}.also {
|
||||
Timber.v("Finish handling accountData in $it ms")
|
||||
@ -98,7 +108,6 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl
|
||||
|
||||
Timber.v("On sync completed")
|
||||
cryptoSyncHandler.onSyncCompleted(syncResponse)
|
||||
|
||||
}
|
||||
Timber.v("Finish handling sync in $measure ms")
|
||||
syncResponse
|
||||
|
@ -17,28 +17,45 @@
|
||||
package im.vector.matrix.android.internal.session.sync
|
||||
|
||||
import com.zhuinden.monarchy.Monarchy
|
||||
import im.vector.matrix.android.api.auth.data.Credentials
|
||||
import im.vector.matrix.android.api.session.events.model.toModel
|
||||
import im.vector.matrix.android.api.session.room.model.RoomMember
|
||||
import im.vector.matrix.android.internal.database.mapper.asDomain
|
||||
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
|
||||
import im.vector.matrix.android.internal.database.model.RoomSummaryEntityFields
|
||||
import im.vector.matrix.android.internal.database.query.getDirectRooms
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.session.room.membership.RoomMembers
|
||||
import im.vector.matrix.android.internal.session.sync.model.InvitedRoomSync
|
||||
import im.vector.matrix.android.internal.session.sync.model.UserAccountDataDirectMessages
|
||||
import im.vector.matrix.android.internal.session.sync.model.UserAccountDataSync
|
||||
import im.vector.matrix.android.internal.session.user.accountdata.DirectChatsHelper
|
||||
import im.vector.matrix.android.internal.session.user.accountdata.UpdateUserAccountDataTask
|
||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||
import im.vector.matrix.android.internal.task.configureWith
|
||||
import io.realm.Realm
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class UserAccountDataSyncHandler @Inject constructor(private val monarchy: Monarchy) {
|
||||
internal class UserAccountDataSyncHandler @Inject constructor(private val monarchy: Monarchy,
|
||||
private val credentials: Credentials,
|
||||
private val directChatsHelper: DirectChatsHelper,
|
||||
private val updateUserAccountDataTask: UpdateUserAccountDataTask,
|
||||
private val taskExecutor: TaskExecutor) {
|
||||
|
||||
fun handle(accountData: UserAccountDataSync) {
|
||||
accountData.list.forEach {
|
||||
fun handle(accountData: UserAccountDataSync?, invites: Map<String, InvitedRoomSync>?) {
|
||||
accountData?.list?.forEach {
|
||||
when (it) {
|
||||
is UserAccountDataDirectMessages -> handleDirectChatRooms(it)
|
||||
else -> return@forEach
|
||||
}
|
||||
}
|
||||
monarchy.doWithRealm { realm ->
|
||||
synchronizeWithServerIfNeeded(realm, invites)
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleDirectChatRooms(directMessages: UserAccountDataDirectMessages) {
|
||||
monarchy.runTransactionSync { realm ->
|
||||
|
||||
val oldDirectRooms = RoomSummaryEntity.getDirectRooms(realm)
|
||||
oldDirectRooms.forEach {
|
||||
it.isDirect = false
|
||||
@ -57,4 +74,35 @@ internal class UserAccountDataSyncHandler @Inject constructor(private val monarc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we get some direct chat invites, we synchronize the user account data including those.
|
||||
private fun synchronizeWithServerIfNeeded(realm: Realm, invites: Map<String, InvitedRoomSync>?) {
|
||||
if (invites.isNullOrEmpty()) return
|
||||
val directChats = directChatsHelper.getLocalUserAccount()
|
||||
var hasUpdate = false
|
||||
invites.forEach { (roomId, _) ->
|
||||
val myUserStateEvent = RoomMembers(realm, roomId).getStateEvent(credentials.userId)
|
||||
val inviterId = myUserStateEvent?.sender
|
||||
val myUserRoomMember: RoomMember? = myUserStateEvent?.let { it.asDomain().content?.toModel() }
|
||||
val isDirect = myUserRoomMember?.isDirect
|
||||
if (inviterId != null && inviterId != credentials.userId && isDirect == true) {
|
||||
directChats
|
||||
.getOrPut(inviterId, { arrayListOf() })
|
||||
.apply {
|
||||
if (contains(roomId)) {
|
||||
Timber.v("Direct chats already include room $roomId with user $inviterId")
|
||||
} else {
|
||||
add(roomId)
|
||||
hasUpdate = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (hasUpdate) {
|
||||
val updateUserAccountParams = UpdateUserAccountDataTask.DirectChatParams(
|
||||
directMessages = directChats
|
||||
)
|
||||
updateUserAccountDataTask.configureWith(updateUserAccountParams).executeBy(taskExecutor)
|
||||
}
|
||||
}
|
||||
}
|
@ -24,27 +24,24 @@ import io.realm.RealmConfiguration
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class DirectChatsHelper @Inject constructor(@SessionDatabase private val realmConfiguration: RealmConfiguration) {
|
||||
internal class DirectChatsHelper @Inject constructor(@SessionDatabase
|
||||
private val realmConfiguration: RealmConfiguration) {
|
||||
|
||||
fun getDirectChats(include: Pair<String, String>? = null, filterRoomId: String? = null): Map<String, List<String>> {
|
||||
/**
|
||||
* @return a map of userId <-> list of roomId
|
||||
*/
|
||||
fun getLocalUserAccount(filterRoomId: String? = null): MutableMap<String, MutableList<String>> {
|
||||
return Realm.getInstance(realmConfiguration).use { realm ->
|
||||
val currentDirectRooms = RoomSummaryEntity.getDirectRooms(realm)
|
||||
val directChatsMap = mutableMapOf<String, MutableList<String>>()
|
||||
for (directRoom in currentDirectRooms) {
|
||||
if (directRoom.roomId == filterRoomId) continue
|
||||
val directUserId = directRoom.directUserId ?: continue
|
||||
directChatsMap.getOrPut(directUserId, { arrayListOf() }).apply {
|
||||
add(directRoom.roomId)
|
||||
}
|
||||
}
|
||||
if (include != null) {
|
||||
directChatsMap.getOrPut(include.first, { arrayListOf() }).apply {
|
||||
if (contains(include.second)) {
|
||||
Timber.v("Direct chats already include room ${include.second} with user ${include.first}")
|
||||
} else {
|
||||
add(include.second)
|
||||
}
|
||||
}
|
||||
directChatsMap
|
||||
.getOrPut(directUserId, { arrayListOf() })
|
||||
.apply {
|
||||
add(directRoom.roomId)
|
||||
}
|
||||
}
|
||||
directChatsMap
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<resources>
|
||||
|
||||
<string name="event_status_sending_message">Sending message…</string>
|
||||
<string name="clear_timeline_send_queue">Clear sending queue</string>
|
||||
</resources>
|
Reference in New Issue
Block a user