Encrypt file + propagate error between chained workers

This commit is contained in:
Benoit Marty 2019-07-02 14:07:48 +02:00
parent f0e43d31f5
commit 994ee1d23f
8 changed files with 74 additions and 34 deletions

View File

@ -47,7 +47,8 @@ internal class UploadContentWorker(context: Context, params: WorkerParameters) :
val roomId: String,
val event: Event,
val attachment: ContentAttachmentData,
val isRoomEncrypted: Boolean
val isRoomEncrypted: Boolean,
override var lastFailureMessage: String? = null
) : SessionWorkerParams

@Inject lateinit var fileUploader: FileUploader
@ -57,6 +58,11 @@ internal class UploadContentWorker(context: Context, params: WorkerParameters) :
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()

if (params.lastFailureMessage != null) {
// Transmit the error
return Result.success(inputData)
}

val sessionComponent = getSessionComponent(params.userId) ?: return Result.success()
sessionComponent.inject(this)

@ -110,7 +116,7 @@ internal class UploadContentWorker(context: Context, params: WorkerParameters) :
uploadedFileEncryptedFileInfo = encryptionResult.encryptedFileInfo

fileUploader
.uploadByteArray(encryptionResult.encryptedByteArray, attachment.name, attachment.mimeType, progressListener)
.uploadByteArray(encryptionResult.encryptedByteArray, attachment.name, "application/octet-stream", progressListener)
} else {
fileUploader
.uploadFile(attachmentFile, attachment.name, attachment.mimeType, progressListener)
@ -118,7 +124,7 @@ internal class UploadContentWorker(context: Context, params: WorkerParameters) :

return contentUploadResponse
.fold(
{ handleFailure(params) },
{ handleFailure(params, it) },
{ handleSuccess(params, it.contentUri, uploadedFileEncryptedFileInfo, uploadedThumbnailUrl, uploadedThumbnailEncryptedFileInfo) }
)
}
@ -132,9 +138,15 @@ internal class UploadContentWorker(context: Context, params: WorkerParameters) :
}
}

private fun handleFailure(params: Params): Result {
private fun handleFailure(params: Params, failure: Throwable): Result {
contentUploadStateTracker.setFailure(params.event.eventId!!)
return Result.success()
return Result.success(
WorkerParamsFactory.toData(
params.copy(
lastFailureMessage = failure.localizedMessage
)
)
)
}

private fun handleSuccess(params: Params,

View File

@ -20,28 +20,26 @@ import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import arrow.core.Try
import com.squareup.inject.assisted.Assisted
import com.squareup.inject.assisted.AssistedInject
import com.squareup.moshi.JsonClass
import im.vector.matrix.android.internal.worker.DelegateWorkerFactory
import im.vector.matrix.android.internal.worker.SessionWorkerParams
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import im.vector.matrix.android.internal.worker.getSessionComponent
import javax.inject.Inject

internal class GetGroupDataWorker (context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {
internal class GetGroupDataWorker(context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {

@JsonClass(generateAdapter = true)
internal data class Params(
override val userId: String,
val groupIds: List<String>
): SessionWorkerParams
val groupIds: List<String>,
override var lastFailureMessage: String? = null
) : SessionWorkerParams

@Inject lateinit var getGroupDataTask: GetGroupDataTask

override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure()
?: return Result.failure()

val sessionComponent = getSessionComponent(params.userId) ?: return Result.success()
sessionComponent.inject(this)

View File

@ -31,10 +31,8 @@ import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.database.RealmLiveData
import im.vector.matrix.android.internal.database.helper.addSendingEvent
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.EventAnnotationsSummaryEntity
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.room.send.EncryptEventWorker
import im.vector.matrix.android.internal.session.room.send.LocalEchoEventFactory
@ -107,9 +105,12 @@ internal class DefaultRelationService @Inject constructor(private val context: C

//TODO duplicate with send service?
private fun createRedactEventWork(localEvent: Event, eventId: String, reason: String?): OneTimeWorkRequest {

val sendContentWorkerParams = RedactEventWorker.Params(credentials.userId, localEvent.eventId!!,
roomId, eventId, reason)
val sendContentWorkerParams = RedactEventWorker.Params(
credentials.userId,
localEvent.eventId!!,
roomId,
eventId,
reason)
val redactWorkData = WorkerParamsFactory.toData(sendContentWorkerParams)
return TimelineSendEventWorkCommon.createWork<RedactEventWorker>(redactWorkData)
}

View File

@ -39,7 +39,8 @@ internal class SendRelationWorker(context: Context, params: WorkerParameters) :
override val userId: String,
val roomId: String,
val event: Event,
val relationType: String? = null
val relationType: String? = null,
override var lastFailureMessage: String?
) : SessionWorkerParams

@Inject lateinit var roomAPI: RoomAPI
@ -48,6 +49,11 @@ internal class SendRelationWorker(context: Context, params: WorkerParameters) :
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure()

if (params.lastFailureMessage != null) {
// Transmit the error
return Result.success(inputData)
}

val sessionComponent = getSessionComponent(params.userId) ?: return Result.success()
sessionComponent.inject(this)


View File

@ -41,7 +41,8 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
val roomId: String,
val event: Event,
/**Do not encrypt these keys, keep them as is in encrypted content (e.g. m.relates_to)*/
val keepKeys: List<String>? = null
val keepKeys: List<String>? = null,
override var lastFailureMessage: String? = null
) : SessionWorkerParams

@Inject lateinit var crypto: CryptoService
@ -52,6 +53,11 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()

if (params.lastFailureMessage != null) {
// Transmit the error
return Result.success(inputData)
}

val sessionComponent = getSessionComponent(params.userId) ?: return Result.success()
sessionComponent.inject(this)

@ -105,16 +111,15 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
)
val nextWorkerParams = SendEventWorker.Params(params.userId, params.roomId, encryptedEvent)
return Result.success(WorkerParamsFactory.toData(nextWorkerParams))

} else {
val sendState = when (error) {
is Failure.CryptoError -> SendState.FAILED_UNKNOWN_DEVICES
else -> SendState.UNDELIVERED
}
localEchoUpdater.updateSendState(localEvent.eventId, sendState)
//always return success, or the chain will be stuck for ever!
val nextWorkerParams = SendEventWorker.Params(params.userId, params.roomId, localEvent, error?.localizedMessage ?: "Error")
return Result.success(WorkerParamsFactory.toData(nextWorkerParams))
}

val safeError = error
val sendState = when (safeError) {
is Failure.CryptoError -> SendState.FAILED_UNKNOWN_DEVICES
else -> SendState.UNDELIVERED
}
localEchoUpdater.updateSendState(localEvent.eventId, sendState)
//always return success, or the chain will be stuck for ever!
return Result.success()
}
}

View File

@ -35,7 +35,8 @@ internal class RedactEventWorker(context: Context, params: WorkerParameters) : C
val txID: String,
val roomId: String,
val eventId: String,
val reason: String?
val reason: String?,
override var lastFailureMessage: String? = null
) : SessionWorkerParams

@Inject lateinit var roomAPI: RoomAPI
@ -44,6 +45,11 @@ internal class RedactEventWorker(context: Context, params: WorkerParameters) : C
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure()

if (params.lastFailureMessage != null) {
// Transmit the error
return Result.success(inputData)
}

val sessionComponent = getSessionComponent(params.userId) ?: return Result.success()
sessionComponent.inject(this)

@ -62,7 +68,9 @@ internal class RedactEventWorker(context: Context, params: WorkerParameters) : C
else -> {
//TODO mark as failed to send?
//always return success, or the chain will be stuck for ever!
Result.success()
Result.success(WorkerParamsFactory.toData(params.copy(
lastFailureMessage = it.localizedMessage
)))
}
}
}, {

View File

@ -38,14 +38,14 @@ internal class SendEventWorker constructor(context: Context, params: WorkerParam
internal data class Params(
override val userId: String,
val roomId: String,
val event: Event
val event: Event,
override var lastFailureMessage: String? = null
) : SessionWorkerParams

@Inject lateinit var localEchoUpdater: LocalEchoUpdater
@Inject lateinit var roomAPI: RoomAPI

override suspend fun doWork(): Result {

val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()

@ -57,6 +57,13 @@ internal class SendEventWorker constructor(context: Context, params: WorkerParam
return Result.success()
}

if (params.lastFailureMessage != null) {
localEchoUpdater.updateSendState(event.eventId, SendState.UNDELIVERED)

// Transmit the error
return Result.success(inputData)
}

localEchoUpdater.updateSendState(event.eventId, SendState.SENDING)
val result = executeRequest<SendResponse> {
apiCall = roomAPI.send(

View File

@ -18,4 +18,7 @@ package im.vector.matrix.android.internal.worker

interface SessionWorkerParams {
val userId: String
}

// Null is no error occurs. When chaining Workers, first step is to check that there is no lastFailureMessage from the previous workers
var lastFailureMessage: String?
}