forked from GitHub-Mirror/riotX-android
Task: use a builder with DSL and introduce Constraints (only boolean connectedToNetwork at the moment)
This commit is contained in:
parent
c413321a22
commit
4a74f58516
@ -72,7 +72,6 @@ import im.vector.matrix.android.internal.session.room.membership.RoomMembers
|
|||||||
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
|
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
|
||||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||||
import im.vector.matrix.android.internal.task.configureWith
|
import im.vector.matrix.android.internal.task.configureWith
|
||||||
import im.vector.matrix.android.internal.task.toConfigurableTask
|
|
||||||
import im.vector.matrix.android.internal.util.JsonCanonicalizer
|
import im.vector.matrix.android.internal.util.JsonCanonicalizer
|
||||||
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
|
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
|
||||||
import im.vector.matrix.android.internal.util.fetchCopied
|
import im.vector.matrix.android.internal.util.fetchCopied
|
||||||
@ -167,22 +166,25 @@ internal class CryptoManager @Inject constructor(
|
|||||||
|
|
||||||
override fun setDeviceName(deviceId: String, deviceName: String, callback: MatrixCallback<Unit>) {
|
override fun setDeviceName(deviceId: String, deviceName: String, callback: MatrixCallback<Unit>) {
|
||||||
setDeviceNameTask
|
setDeviceNameTask
|
||||||
.configureWith(SetDeviceNameTask.Params(deviceId, deviceName))
|
.configureWith(SetDeviceNameTask.Params(deviceId, deviceName)) {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun deleteDevice(deviceId: String, callback: MatrixCallback<Unit>) {
|
override fun deleteDevice(deviceId: String, callback: MatrixCallback<Unit>) {
|
||||||
deleteDeviceTask
|
deleteDeviceTask
|
||||||
.configureWith(DeleteDeviceTask.Params(deviceId))
|
.configureWith(DeleteDeviceTask.Params(deviceId)) {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun deleteDeviceWithUserPassword(deviceId: String, authSession: String?, password: String, callback: MatrixCallback<Unit>) {
|
override fun deleteDeviceWithUserPassword(deviceId: String, authSession: String?, password: String, callback: MatrixCallback<Unit>) {
|
||||||
deleteDeviceWithUserPasswordTask
|
deleteDeviceWithUserPasswordTask
|
||||||
.configureWith(DeleteDeviceWithUserPasswordTask.Params(deviceId, authSession, password))
|
.configureWith(DeleteDeviceWithUserPasswordTask.Params(deviceId, authSession, password)) {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -196,8 +198,9 @@ internal class CryptoManager @Inject constructor(
|
|||||||
|
|
||||||
override fun getDevicesList(callback: MatrixCallback<DevicesListResponse>) {
|
override fun getDevicesList(callback: MatrixCallback<DevicesListResponse>) {
|
||||||
getDevicesTask
|
getDevicesTask
|
||||||
.toConfigurableTask()
|
.configureWith {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -283,7 +286,7 @@ internal class CryptoManager @Inject constructor(
|
|||||||
/**
|
/**
|
||||||
* Close the crypto
|
* Close the crypto
|
||||||
*/
|
*/
|
||||||
fun close() = runBlocking(coroutineDispatchers.crypto){
|
fun close() = runBlocking(coroutineDispatchers.crypto) {
|
||||||
olmDevice.release()
|
olmDevice.release()
|
||||||
cryptoStore.close()
|
cryptoStore.close()
|
||||||
outgoingRoomKeyRequestManager.stop()
|
outgoingRoomKeyRequestManager.stop()
|
||||||
@ -1046,8 +1049,9 @@ internal class CryptoManager @Inject constructor(
|
|||||||
|
|
||||||
override fun clearCryptoCache(callback: MatrixCallback<Unit>) {
|
override fun clearCryptoCache(callback: MatrixCallback<Unit>) {
|
||||||
clearCryptoDataTask
|
clearCryptoDataTask
|
||||||
.toConfigurableTask()
|
.configureWith {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,10 +299,12 @@ internal class OutgoingRoomKeyRequestManager @Inject constructor(
|
|||||||
// TODO Change this two hard coded key to something better
|
// TODO Change this two hard coded key to something better
|
||||||
contentMap.setObject(recipient["userId"], recipient["deviceId"], message)
|
contentMap.setObject(recipient["userId"], recipient["deviceId"], message)
|
||||||
}
|
}
|
||||||
sendToDeviceTask.configureWith(SendToDeviceTask.Params(EventType.ROOM_KEY_REQUEST, contentMap, transactionId))
|
sendToDeviceTask
|
||||||
.dispatchTo(callback)
|
.configureWith(SendToDeviceTask.Params(EventType.ROOM_KEY_REQUEST, contentMap, transactionId)) {
|
||||||
.executeOn(TaskThread.CALLER)
|
this.callback = callback
|
||||||
.callbackOn(TaskThread.CALLER)
|
this.callbackThread = TaskThread.CALLER
|
||||||
|
this.executionThread = TaskThread.CALLER
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,7 +51,10 @@ import im.vector.matrix.android.internal.crypto.store.db.model.KeysBackupDataEnt
|
|||||||
import im.vector.matrix.android.internal.di.MoshiProvider
|
import im.vector.matrix.android.internal.di.MoshiProvider
|
||||||
import im.vector.matrix.android.internal.extensions.foldToCallback
|
import im.vector.matrix.android.internal.extensions.foldToCallback
|
||||||
import im.vector.matrix.android.internal.session.SessionScope
|
import im.vector.matrix.android.internal.session.SessionScope
|
||||||
import im.vector.matrix.android.internal.task.*
|
import im.vector.matrix.android.internal.task.Task
|
||||||
|
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||||
|
import im.vector.matrix.android.internal.task.TaskThread
|
||||||
|
import im.vector.matrix.android.internal.task.configureWith
|
||||||
import im.vector.matrix.android.internal.util.JsonCanonicalizer
|
import im.vector.matrix.android.internal.util.JsonCanonicalizer
|
||||||
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
|
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
|
||||||
import kotlinx.coroutines.GlobalScope
|
import kotlinx.coroutines.GlobalScope
|
||||||
@ -200,31 +203,32 @@ internal class KeysBackup @Inject constructor(
|
|||||||
keysBackupStateManager.state = KeysBackupState.Enabling
|
keysBackupStateManager.state = KeysBackupState.Enabling
|
||||||
|
|
||||||
createKeysBackupVersionTask
|
createKeysBackupVersionTask
|
||||||
.configureWith(createKeysBackupVersionBody)
|
.configureWith(createKeysBackupVersionBody) {
|
||||||
.dispatchTo(object : MatrixCallback<KeysVersion> {
|
this.callback = object : MatrixCallback<KeysVersion> {
|
||||||
override fun onSuccess(info: KeysVersion) {
|
override fun onSuccess(info: KeysVersion) {
|
||||||
// Reset backup markers.
|
// Reset backup markers.
|
||||||
cryptoStore.resetBackupMarkers()
|
cryptoStore.resetBackupMarkers()
|
||||||
|
|
||||||
val keyBackupVersion = KeysVersionResult()
|
val keyBackupVersion = KeysVersionResult()
|
||||||
keyBackupVersion.algorithm = createKeysBackupVersionBody.algorithm
|
keyBackupVersion.algorithm = createKeysBackupVersionBody.algorithm
|
||||||
keyBackupVersion.authData = createKeysBackupVersionBody.authData
|
keyBackupVersion.authData = createKeysBackupVersionBody.authData
|
||||||
keyBackupVersion.version = info.version
|
keyBackupVersion.version = info.version
|
||||||
|
|
||||||
// We can consider that the server does not have keys yet
|
// We can consider that the server does not have keys yet
|
||||||
keyBackupVersion.count = 0
|
keyBackupVersion.count = 0
|
||||||
keyBackupVersion.hash = null
|
keyBackupVersion.hash = null
|
||||||
|
|
||||||
enableKeysBackup(keyBackupVersion)
|
enableKeysBackup(keyBackupVersion)
|
||||||
|
|
||||||
callback.onSuccess(info)
|
callback.onSuccess(info)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onFailure(failure: Throwable) {
|
||||||
|
keysBackupStateManager.state = KeysBackupState.Disabled
|
||||||
|
callback.onFailure(failure)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
override fun onFailure(failure: Throwable) {
|
|
||||||
keysBackupStateManager.state = KeysBackupState.Disabled
|
|
||||||
callback.onFailure(failure)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -239,27 +243,29 @@ internal class KeysBackup @Inject constructor(
|
|||||||
keysBackupStateManager.state = KeysBackupState.Unknown
|
keysBackupStateManager.state = KeysBackupState.Unknown
|
||||||
}
|
}
|
||||||
|
|
||||||
deleteBackupTask.configureWith(DeleteBackupTask.Params(version))
|
deleteBackupTask
|
||||||
.dispatchTo(object : MatrixCallback<Unit> {
|
.configureWith(DeleteBackupTask.Params(version)) {
|
||||||
private fun eventuallyRestartBackup() {
|
this.callback = object : MatrixCallback<Unit> {
|
||||||
// Do not stay in KeysBackupState.Unknown but check what is available on the homeserver
|
private fun eventuallyRestartBackup() {
|
||||||
if (state == KeysBackupState.Unknown) {
|
// Do not stay in KeysBackupState.Unknown but check what is available on the homeserver
|
||||||
checkAndStartKeysBackup()
|
if (state == KeysBackupState.Unknown) {
|
||||||
|
checkAndStartKeysBackup()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onSuccess(data: Unit) {
|
||||||
|
eventuallyRestartBackup()
|
||||||
|
|
||||||
|
uiHandler.post { callback?.onSuccess(Unit) }
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onFailure(failure: Throwable) {
|
||||||
|
eventuallyRestartBackup()
|
||||||
|
|
||||||
|
uiHandler.post { callback?.onFailure(failure) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
override fun onSuccess(data: Unit) {
|
|
||||||
eventuallyRestartBackup()
|
|
||||||
|
|
||||||
uiHandler.post { callback?.onSuccess(Unit) }
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun onFailure(failure: Throwable) {
|
|
||||||
eventuallyRestartBackup()
|
|
||||||
|
|
||||||
uiHandler.post { callback?.onFailure(failure) }
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -355,9 +361,10 @@ internal class KeysBackup @Inject constructor(
|
|||||||
return getKeysBackupTrustBg(params)
|
return getKeysBackupTrustBg(params)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.configureWith(keysBackupVersion)
|
.configureWith(keysBackupVersion) {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
.executeOn(TaskThread.COMPUTATION)
|
this.executionThread = TaskThread.COMPUTATION
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -487,27 +494,28 @@ internal class KeysBackup @Inject constructor(
|
|||||||
|
|
||||||
// And send it to the homeserver
|
// And send it to the homeserver
|
||||||
updateKeysBackupVersionTask
|
updateKeysBackupVersionTask
|
||||||
.configureWith(UpdateKeysBackupVersionTask.Params(keysBackupVersion.version!!, updateKeysBackupVersionBody))
|
.configureWith(UpdateKeysBackupVersionTask.Params(keysBackupVersion.version!!, updateKeysBackupVersionBody)) {
|
||||||
.dispatchTo(object : MatrixCallback<Unit> {
|
this.callback = object : MatrixCallback<Unit> {
|
||||||
override fun onSuccess(data: Unit) {
|
override fun onSuccess(data: Unit) {
|
||||||
// Relaunch the state machine on this updated backup version
|
// Relaunch the state machine on this updated backup version
|
||||||
val newKeysBackupVersion = KeysVersionResult()
|
val newKeysBackupVersion = KeysVersionResult()
|
||||||
|
|
||||||
newKeysBackupVersion.version = keysBackupVersion.version
|
newKeysBackupVersion.version = keysBackupVersion.version
|
||||||
newKeysBackupVersion.algorithm = keysBackupVersion.algorithm
|
newKeysBackupVersion.algorithm = keysBackupVersion.algorithm
|
||||||
newKeysBackupVersion.count = keysBackupVersion.count
|
newKeysBackupVersion.count = keysBackupVersion.count
|
||||||
newKeysBackupVersion.hash = keysBackupVersion.hash
|
newKeysBackupVersion.hash = keysBackupVersion.hash
|
||||||
newKeysBackupVersion.authData = updateKeysBackupVersionBody.authData
|
newKeysBackupVersion.authData = updateKeysBackupVersionBody.authData
|
||||||
|
|
||||||
checkAndStartWithKeysBackupVersion(newKeysBackupVersion)
|
checkAndStartWithKeysBackupVersion(newKeysBackupVersion)
|
||||||
|
|
||||||
callback.onSuccess(data)
|
callback.onSuccess(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onFailure(failure: Throwable) {
|
||||||
|
callback.onFailure(failure)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
override fun onFailure(failure: Throwable) {
|
|
||||||
callback.onFailure(failure)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -753,49 +761,52 @@ internal class KeysBackup @Inject constructor(
|
|||||||
if (roomId != null && sessionId != null) {
|
if (roomId != null && sessionId != null) {
|
||||||
// Get key for the room and for the session
|
// Get key for the room and for the session
|
||||||
getRoomSessionDataTask
|
getRoomSessionDataTask
|
||||||
.configureWith(GetRoomSessionDataTask.Params(roomId, sessionId, version))
|
.configureWith(GetRoomSessionDataTask.Params(roomId, sessionId, version)) {
|
||||||
.dispatchTo(object : MatrixCallback<KeyBackupData> {
|
this.callback = object : MatrixCallback<KeyBackupData> {
|
||||||
override fun onSuccess(data: KeyBackupData) {
|
override fun onSuccess(data: KeyBackupData) {
|
||||||
// Convert to KeysBackupData
|
// Convert to KeysBackupData
|
||||||
val keysBackupData = KeysBackupData()
|
val keysBackupData = KeysBackupData()
|
||||||
keysBackupData.roomIdToRoomKeysBackupData = HashMap()
|
keysBackupData.roomIdToRoomKeysBackupData = HashMap()
|
||||||
val roomKeysBackupData = RoomKeysBackupData()
|
val roomKeysBackupData = RoomKeysBackupData()
|
||||||
roomKeysBackupData.sessionIdToKeyBackupData = HashMap()
|
roomKeysBackupData.sessionIdToKeyBackupData = HashMap()
|
||||||
roomKeysBackupData.sessionIdToKeyBackupData[sessionId] = data
|
roomKeysBackupData.sessionIdToKeyBackupData[sessionId] = data
|
||||||
keysBackupData.roomIdToRoomKeysBackupData[roomId] = roomKeysBackupData
|
keysBackupData.roomIdToRoomKeysBackupData[roomId] = roomKeysBackupData
|
||||||
|
|
||||||
callback.onSuccess(keysBackupData)
|
callback.onSuccess(keysBackupData)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onFailure(failure: Throwable) {
|
override fun onFailure(failure: Throwable) {
|
||||||
callback.onFailure(failure)
|
callback.onFailure(failure)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
} else if (roomId != null) {
|
} else if (roomId != null) {
|
||||||
// Get all keys for the room
|
// Get all keys for the room
|
||||||
getRoomSessionsDataTask
|
getRoomSessionsDataTask
|
||||||
.configureWith(GetRoomSessionsDataTask.Params(roomId, version))
|
.configureWith(GetRoomSessionsDataTask.Params(roomId, version)) {
|
||||||
.dispatchTo(object : MatrixCallback<RoomKeysBackupData> {
|
this.callback = object : MatrixCallback<RoomKeysBackupData> {
|
||||||
override fun onSuccess(data: RoomKeysBackupData) {
|
override fun onSuccess(data: RoomKeysBackupData) {
|
||||||
// Convert to KeysBackupData
|
// Convert to KeysBackupData
|
||||||
val keysBackupData = KeysBackupData()
|
val keysBackupData = KeysBackupData()
|
||||||
keysBackupData.roomIdToRoomKeysBackupData = HashMap()
|
keysBackupData.roomIdToRoomKeysBackupData = HashMap()
|
||||||
keysBackupData.roomIdToRoomKeysBackupData[roomId] = data
|
keysBackupData.roomIdToRoomKeysBackupData[roomId] = data
|
||||||
|
|
||||||
callback.onSuccess(keysBackupData)
|
callback.onSuccess(keysBackupData)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onFailure(failure: Throwable) {
|
override fun onFailure(failure: Throwable) {
|
||||||
callback.onFailure(failure)
|
callback.onFailure(failure)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
} else {
|
} else {
|
||||||
// Get all keys
|
// Get all keys
|
||||||
getSessionsDataTask
|
getSessionsDataTask
|
||||||
.configureWith(GetSessionsDataTask.Params(version))
|
.configureWith(GetSessionsDataTask.Params(version)) {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -850,45 +861,47 @@ internal class KeysBackup @Inject constructor(
|
|||||||
override fun getVersion(version: String,
|
override fun getVersion(version: String,
|
||||||
callback: MatrixCallback<KeysVersionResult?>) {
|
callback: MatrixCallback<KeysVersionResult?>) {
|
||||||
getKeysBackupVersionTask
|
getKeysBackupVersionTask
|
||||||
.configureWith(version)
|
.configureWith(version) {
|
||||||
.dispatchTo(object : MatrixCallback<KeysVersionResult> {
|
this.callback = object : MatrixCallback<KeysVersionResult> {
|
||||||
override fun onSuccess(data: KeysVersionResult) {
|
override fun onSuccess(data: KeysVersionResult) {
|
||||||
callback.onSuccess(data)
|
callback.onSuccess(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onFailure(failure: Throwable) {
|
override fun onFailure(failure: Throwable) {
|
||||||
if (failure is Failure.ServerError
|
if (failure is Failure.ServerError
|
||||||
&& failure.error.code == MatrixError.NOT_FOUND) {
|
&& failure.error.code == MatrixError.NOT_FOUND) {
|
||||||
// Workaround because the homeserver currently returns M_NOT_FOUND when there is no key backup
|
// Workaround because the homeserver currently returns M_NOT_FOUND when there is no key backup
|
||||||
callback.onSuccess(null)
|
callback.onSuccess(null)
|
||||||
} else {
|
} else {
|
||||||
// Transmit the error
|
// Transmit the error
|
||||||
callback.onFailure(failure)
|
callback.onFailure(failure)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun getCurrentVersion(callback: MatrixCallback<KeysVersionResult?>) {
|
override fun getCurrentVersion(callback: MatrixCallback<KeysVersionResult?>) {
|
||||||
getKeysBackupLastVersionTask
|
getKeysBackupLastVersionTask
|
||||||
.toConfigurableTask()
|
.configureWith {
|
||||||
.dispatchTo(object : MatrixCallback<KeysVersionResult> {
|
this.callback = object : MatrixCallback<KeysVersionResult> {
|
||||||
override fun onSuccess(data: KeysVersionResult) {
|
override fun onSuccess(data: KeysVersionResult) {
|
||||||
callback.onSuccess(data)
|
callback.onSuccess(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onFailure(failure: Throwable) {
|
override fun onFailure(failure: Throwable) {
|
||||||
if (failure is Failure.ServerError
|
if (failure is Failure.ServerError
|
||||||
&& failure.error.code == MatrixError.NOT_FOUND) {
|
&& failure.error.code == MatrixError.NOT_FOUND) {
|
||||||
// Workaround because the homeserver currently returns M_NOT_FOUND when there is no key backup
|
// Workaround because the homeserver currently returns M_NOT_FOUND when there is no key backup
|
||||||
callback.onSuccess(null)
|
callback.onSuccess(null)
|
||||||
} else {
|
} else {
|
||||||
// Transmit the error
|
// Transmit the error
|
||||||
callback.onFailure(failure)
|
callback.onFailure(failure)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1231,69 +1244,72 @@ internal class KeysBackup @Inject constructor(
|
|||||||
|
|
||||||
Timber.v("backupKeys: 4 - Sending request")
|
Timber.v("backupKeys: 4 - Sending request")
|
||||||
|
|
||||||
// Make the request
|
val sendingRequestCallback = object : MatrixCallback<BackupKeysResult> {
|
||||||
storeSessionDataTask
|
override fun onSuccess(data: BackupKeysResult) {
|
||||||
.configureWith(StoreSessionsDataTask.Params(keysBackupVersion!!.version!!, keysBackupData))
|
uiHandler.post {
|
||||||
.dispatchTo(object : MatrixCallback<BackupKeysResult> {
|
Timber.v("backupKeys: 5a - Request complete")
|
||||||
override fun onSuccess(data: BackupKeysResult) {
|
|
||||||
uiHandler.post {
|
|
||||||
Timber.v("backupKeys: 5a - Request complete")
|
|
||||||
|
|
||||||
// Mark keys as backed up
|
// Mark keys as backed up
|
||||||
cryptoStore.markBackupDoneForInboundGroupSessions(olmInboundGroupSessionWrappers)
|
cryptoStore.markBackupDoneForInboundGroupSessions(olmInboundGroupSessionWrappers)
|
||||||
|
|
||||||
if (olmInboundGroupSessionWrappers.size < KEY_BACKUP_SEND_KEYS_MAX_COUNT) {
|
if (olmInboundGroupSessionWrappers.size < KEY_BACKUP_SEND_KEYS_MAX_COUNT) {
|
||||||
Timber.v("backupKeys: All keys have been backed up")
|
Timber.v("backupKeys: All keys have been backed up")
|
||||||
onServerDataRetrieved(data.count, data.hash)
|
onServerDataRetrieved(data.count, data.hash)
|
||||||
|
|
||||||
// Note: Changing state will trigger the call to backupAllGroupSessionsCallback.onSuccess()
|
// Note: Changing state will trigger the call to backupAllGroupSessionsCallback.onSuccess()
|
||||||
keysBackupStateManager.state = KeysBackupState.ReadyToBackUp
|
keysBackupStateManager.state = KeysBackupState.ReadyToBackUp
|
||||||
} else {
|
} else {
|
||||||
Timber.v("backupKeys: Continue to back up keys")
|
Timber.v("backupKeys: Continue to back up keys")
|
||||||
keysBackupStateManager.state = KeysBackupState.WillBackUp
|
keysBackupStateManager.state = KeysBackupState.WillBackUp
|
||||||
|
|
||||||
backupKeys()
|
backupKeys()
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override fun onFailure(failure: Throwable) {
|
override fun onFailure(failure: Throwable) {
|
||||||
if (failure is Failure.ServerError) {
|
if (failure is Failure.ServerError) {
|
||||||
uiHandler.post {
|
uiHandler.post {
|
||||||
Timber.e(failure, "backupKeys: backupKeys failed.")
|
Timber.e(failure, "backupKeys: backupKeys failed.")
|
||||||
|
|
||||||
when (failure.error.code) {
|
when (failure.error.code) {
|
||||||
MatrixError.NOT_FOUND,
|
MatrixError.NOT_FOUND,
|
||||||
MatrixError.WRONG_ROOM_KEYS_VERSION -> {
|
MatrixError.WRONG_ROOM_KEYS_VERSION -> {
|
||||||
// Backup has been deleted on the server, or we are not using the last backup version
|
// Backup has been deleted on the server, or we are not using the last backup version
|
||||||
keysBackupStateManager.state = KeysBackupState.WrongBackUpVersion
|
keysBackupStateManager.state = KeysBackupState.WrongBackUpVersion
|
||||||
backupAllGroupSessionsCallback?.onFailure(failure)
|
|
||||||
resetBackupAllGroupSessionsListeners()
|
|
||||||
resetKeysBackupData()
|
|
||||||
keysBackupVersion = null
|
|
||||||
|
|
||||||
// Do not stay in KeysBackupState.WrongBackUpVersion but check what is available on the homeserver
|
|
||||||
checkAndStartKeysBackup()
|
|
||||||
}
|
|
||||||
else ->
|
|
||||||
// Come back to the ready state so that we will retry on the next received key
|
|
||||||
keysBackupStateManager.state = KeysBackupState.ReadyToBackUp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
uiHandler.post {
|
|
||||||
backupAllGroupSessionsCallback?.onFailure(failure)
|
backupAllGroupSessionsCallback?.onFailure(failure)
|
||||||
resetBackupAllGroupSessionsListeners()
|
resetBackupAllGroupSessionsListeners()
|
||||||
|
resetKeysBackupData()
|
||||||
|
keysBackupVersion = null
|
||||||
|
|
||||||
Timber.e("backupKeys: backupKeys failed.")
|
// Do not stay in KeysBackupState.WrongBackUpVersion but check what is available on the homeserver
|
||||||
|
checkAndStartKeysBackup()
|
||||||
// Retry a bit later
|
|
||||||
keysBackupStateManager.state = KeysBackupState.ReadyToBackUp
|
|
||||||
maybeBackupKeys()
|
|
||||||
}
|
}
|
||||||
|
else ->
|
||||||
|
// Come back to the ready state so that we will retry on the next received key
|
||||||
|
keysBackupStateManager.state = KeysBackupState.ReadyToBackUp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
} else {
|
||||||
|
uiHandler.post {
|
||||||
|
backupAllGroupSessionsCallback?.onFailure(failure)
|
||||||
|
resetBackupAllGroupSessionsListeners()
|
||||||
|
|
||||||
|
Timber.e("backupKeys: backupKeys failed.")
|
||||||
|
|
||||||
|
// Retry a bit later
|
||||||
|
keysBackupStateManager.state = KeysBackupState.ReadyToBackUp
|
||||||
|
maybeBackupKeys()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the request
|
||||||
|
storeSessionDataTask
|
||||||
|
.configureWith(StoreSessionsDataTask.Params(keysBackupVersion!!.version!!, keysBackupData)){
|
||||||
|
this.callback = sendingRequestCallback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,7 @@ import im.vector.matrix.android.internal.crypto.model.rest.*
|
|||||||
import im.vector.matrix.android.internal.crypto.store.IMXCryptoStore
|
import im.vector.matrix.android.internal.crypto.store.IMXCryptoStore
|
||||||
import im.vector.matrix.android.internal.crypto.tasks.SendToDeviceTask
|
import im.vector.matrix.android.internal.crypto.tasks.SendToDeviceTask
|
||||||
import im.vector.matrix.android.internal.session.SessionScope
|
import im.vector.matrix.android.internal.session.SessionScope
|
||||||
|
import im.vector.matrix.android.internal.task.TaskConstraints
|
||||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||||
import im.vector.matrix.android.internal.task.configureWith
|
import im.vector.matrix.android.internal.task.configureWith
|
||||||
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
|
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
|
||||||
@ -415,16 +416,18 @@ internal class DefaultSasVerificationService @Inject constructor(private val cre
|
|||||||
val contentMap = MXUsersDevicesMap<Any>()
|
val contentMap = MXUsersDevicesMap<Any>()
|
||||||
contentMap.setObject(userId, userDevice, cancelMessage)
|
contentMap.setObject(userId, userDevice, cancelMessage)
|
||||||
|
|
||||||
sendToDeviceTask.configureWith(SendToDeviceTask.Params(EventType.KEY_VERIFICATION_CANCEL, contentMap, transactionId))
|
sendToDeviceTask
|
||||||
.dispatchTo(object : MatrixCallback<Unit> {
|
.configureWith(SendToDeviceTask.Params(EventType.KEY_VERIFICATION_CANCEL, contentMap, transactionId)) {
|
||||||
override fun onSuccess(data: Unit) {
|
this.callback = object : MatrixCallback<Unit> {
|
||||||
Timber.v("## SAS verification [$transactionId] canceled for reason ${code.value}")
|
override fun onSuccess(data: Unit) {
|
||||||
}
|
Timber.v("## SAS verification [$transactionId] canceled for reason ${code.value}")
|
||||||
|
}
|
||||||
|
|
||||||
override fun onFailure(failure: Throwable) {
|
override fun onFailure(failure: Throwable) {
|
||||||
Timber.e(failure, "## SAS verification [$transactionId] failed to cancel.")
|
Timber.e(failure, "## SAS verification [$transactionId] failed to cancel.")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -287,23 +287,25 @@ internal abstract class SASVerificationTransaction(
|
|||||||
val contentMap = MXUsersDevicesMap<Any>()
|
val contentMap = MXUsersDevicesMap<Any>()
|
||||||
contentMap.setObject(otherUserId, otherDeviceId, keyToDevice)
|
contentMap.setObject(otherUserId, otherDeviceId, keyToDevice)
|
||||||
|
|
||||||
sendToDeviceTask.configureWith(SendToDeviceTask.Params(type, contentMap, transactionId))
|
sendToDeviceTask
|
||||||
.dispatchTo(object : MatrixCallback<Unit> {
|
.configureWith(SendToDeviceTask.Params(type, contentMap, transactionId)) {
|
||||||
override fun onSuccess(data: Unit) {
|
this.callback = object : MatrixCallback<Unit> {
|
||||||
Timber.v("## SAS verification [$transactionId] toDevice type '$type' success.")
|
override fun onSuccess(data: Unit) {
|
||||||
if (onDone != null) {
|
Timber.v("## SAS verification [$transactionId] toDevice type '$type' success.")
|
||||||
onDone()
|
if (onDone != null) {
|
||||||
} else {
|
onDone()
|
||||||
state = nextState
|
} else {
|
||||||
|
state = nextState
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onFailure(failure: Throwable) {
|
||||||
|
Timber.e("## SAS verification [$transactionId] failed to send toDevice in state : $state")
|
||||||
|
|
||||||
|
cancel(onErrorReason)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
override fun onFailure(failure: Throwable) {
|
|
||||||
Timber.e("## SAS verification [$transactionId] failed to send toDevice in state : $state")
|
|
||||||
|
|
||||||
cancel(onErrorReason)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,6 +24,8 @@ import timber.log.Timber
|
|||||||
import java.util.*
|
import java.util.*
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
import kotlin.collections.ArrayList
|
import kotlin.collections.ArrayList
|
||||||
|
import kotlin.coroutines.resume
|
||||||
|
import kotlin.coroutines.suspendCoroutine
|
||||||
|
|
||||||
@MatrixScope
|
@MatrixScope
|
||||||
internal class NetworkConnectivityChecker @Inject constructor(context: Context) {
|
internal class NetworkConnectivityChecker @Inject constructor(context: Context) {
|
||||||
@ -34,24 +36,41 @@ internal class NetworkConnectivityChecker @Inject constructor(context: Context)
|
|||||||
.build(context)
|
.build(context)
|
||||||
|
|
||||||
private val merlinsBeard = MerlinsBeard.Builder().build(context)
|
private val merlinsBeard = MerlinsBeard.Builder().build(context)
|
||||||
private val listeners = Collections.synchronizedList(ArrayList<Listener>())
|
private val listeners = ArrayList<Listener>()
|
||||||
|
|
||||||
init {
|
init {
|
||||||
merlin.bind()
|
merlin.bind()
|
||||||
merlin.registerDisconnectable {
|
merlin.registerDisconnectable {
|
||||||
Timber.v("On Disconnect")
|
Timber.v("On Disconnect")
|
||||||
listeners.forEach {
|
val localListeners = Collections.synchronizedList(listeners)
|
||||||
|
localListeners.forEach {
|
||||||
it.onDisconnect()
|
it.onDisconnect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
merlin.registerConnectable {
|
merlin.registerConnectable {
|
||||||
Timber.v("On Connect")
|
Timber.v("On Connect")
|
||||||
listeners.forEach {
|
val localListeners = Collections.synchronizedList(listeners)
|
||||||
|
localListeners.forEach {
|
||||||
it.onConnect()
|
it.onConnect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
suspend fun waitUntilConnected() {
|
||||||
|
if (isConnected()) {
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
suspendCoroutine<Unit> { continuation ->
|
||||||
|
register(object : Listener {
|
||||||
|
override fun onConnect() {
|
||||||
|
unregister(this)
|
||||||
|
continuation.resume(Unit)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fun register(listener: Listener) {
|
fun register(listener: Listener) {
|
||||||
listeners.add(listener)
|
listeners.add(listener)
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@ import im.vector.matrix.android.api.MatrixCallback
|
|||||||
import im.vector.matrix.android.api.session.cache.CacheService
|
import im.vector.matrix.android.api.session.cache.CacheService
|
||||||
import im.vector.matrix.android.internal.di.SessionDatabase
|
import im.vector.matrix.android.internal.di.SessionDatabase
|
||||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||||
import im.vector.matrix.android.internal.task.toConfigurableTask
|
import im.vector.matrix.android.internal.task.configureWith
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
|
||||||
internal class DefaultCacheService @Inject constructor(@SessionDatabase
|
internal class DefaultCacheService @Inject constructor(@SessionDatabase
|
||||||
@ -30,8 +30,9 @@ internal class DefaultCacheService @Inject constructor(@SessionDatabase
|
|||||||
override fun clearCache(callback: MatrixCallback<Unit>) {
|
override fun clearCache(callback: MatrixCallback<Unit>) {
|
||||||
taskExecutor.cancelAll()
|
taskExecutor.cancelAll()
|
||||||
clearCacheTask
|
clearCacheTask
|
||||||
.toConfigurableTask()
|
.configureWith {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -83,9 +83,10 @@ internal class DefaultPushRuleService @Inject constructor(
|
|||||||
|
|
||||||
override fun updatePushRuleEnableStatus(kind: String, pushRule: PushRule, enabled: Boolean, callback: MatrixCallback<Unit>): Cancelable {
|
override fun updatePushRuleEnableStatus(kind: String, pushRule: PushRule, enabled: Boolean, callback: MatrixCallback<Unit>): Cancelable {
|
||||||
return updatePushRuleEnableStatusTask
|
return updatePushRuleEnableStatusTask
|
||||||
.configureWith(UpdatePushRuleEnableStatusTask.Params(kind, pushRule, enabled))
|
.configureWith(UpdatePushRuleEnableStatusTask.Params(kind, pushRule, enabled)) {
|
||||||
|
this.callback = callback
|
||||||
|
}
|
||||||
// TODO Fetch the rules
|
// TODO Fetch the rules
|
||||||
.dispatchTo(callback)
|
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,7 +29,6 @@ import im.vector.matrix.android.internal.database.model.PusherEntity
|
|||||||
import im.vector.matrix.android.internal.database.query.where
|
import im.vector.matrix.android.internal.database.query.where
|
||||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||||
import im.vector.matrix.android.internal.task.configureWith
|
import im.vector.matrix.android.internal.task.configureWith
|
||||||
import im.vector.matrix.android.internal.task.toConfigurableTask
|
|
||||||
import im.vector.matrix.android.internal.worker.WorkManagerUtil
|
import im.vector.matrix.android.internal.worker.WorkManagerUtil
|
||||||
import im.vector.matrix.android.internal.worker.WorkManagerUtil.matrixOneTimeWorkRequestBuilder
|
import im.vector.matrix.android.internal.worker.WorkManagerUtil.matrixOneTimeWorkRequestBuilder
|
||||||
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
|
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
|
||||||
@ -50,7 +49,7 @@ internal class DefaultPusherService @Inject constructor(
|
|||||||
|
|
||||||
override fun refreshPushers() {
|
override fun refreshPushers() {
|
||||||
getPusherTask
|
getPusherTask
|
||||||
.toConfigurableTask()
|
.configureWith()
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,8 +84,9 @@ internal class DefaultPusherService @Inject constructor(
|
|||||||
override fun removeHttpPusher(pushkey: String, appId: String, callback: MatrixCallback<Unit>) {
|
override fun removeHttpPusher(pushkey: String, appId: String, callback: MatrixCallback<Unit>) {
|
||||||
val params = RemovePusherTask.Params(sessionParam.credentials.userId, pushkey, appId)
|
val params = RemovePusherTask.Params(sessionParam.credentials.userId, pushkey, appId)
|
||||||
removePusherTask
|
removePusherTask
|
||||||
.configureWith(params)
|
.configureWith(params) {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
//.enableRetry() ??
|
//.enableRetry() ??
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,6 @@ package im.vector.matrix.android.internal.session.pushers
|
|||||||
|
|
||||||
import com.zhuinden.monarchy.Monarchy
|
import com.zhuinden.monarchy.Monarchy
|
||||||
import im.vector.matrix.android.api.session.pushers.PusherState
|
import im.vector.matrix.android.api.session.pushers.PusherState
|
||||||
import im.vector.matrix.android.internal.database.awaitTransaction
|
|
||||||
import im.vector.matrix.android.internal.database.mapper.asDomain
|
import im.vector.matrix.android.internal.database.mapper.asDomain
|
||||||
import im.vector.matrix.android.internal.database.model.PusherEntity
|
import im.vector.matrix.android.internal.database.model.PusherEntity
|
||||||
import im.vector.matrix.android.internal.database.query.where
|
import im.vector.matrix.android.internal.database.query.where
|
||||||
@ -40,12 +39,13 @@ internal class DefaultRemovePusherTask @Inject constructor(
|
|||||||
) : RemovePusherTask {
|
) : RemovePusherTask {
|
||||||
|
|
||||||
override suspend fun execute(params: RemovePusherTask.Params) {
|
override suspend fun execute(params: RemovePusherTask.Params) {
|
||||||
val existing = Realm.getInstance(monarchy.realmConfiguration).use { realm ->
|
monarchy.awaitTransaction { realm ->
|
||||||
val existingEntity = PusherEntity.where(realm, params.userId, params.pushKey).findFirst()
|
val existingEntity = PusherEntity.where(realm, params.userId, params.pushKey).findFirst()
|
||||||
realm.awaitTransaction {
|
existingEntity?.state = PusherState.UNREGISTERING
|
||||||
existingEntity?.state = PusherState.UNREGISTERING
|
}
|
||||||
}
|
|
||||||
existingEntity?.asDomain()
|
val existing = Realm.getInstance(monarchy.realmConfiguration).use { realm ->
|
||||||
|
PusherEntity.where(realm, params.userId, params.pushKey).findFirst()?.asDomain()
|
||||||
} ?: throw Exception("No existing pusher")
|
} ?: throw Exception("No existing pusher")
|
||||||
|
|
||||||
val deleteBody = JsonPusher(
|
val deleteBody = JsonPusher(
|
||||||
|
@ -27,7 +27,6 @@ import im.vector.matrix.android.internal.session.room.directory.GetThirdPartyPro
|
|||||||
import im.vector.matrix.android.internal.session.room.membership.joining.JoinRoomTask
|
import im.vector.matrix.android.internal.session.room.membership.joining.JoinRoomTask
|
||||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||||
import im.vector.matrix.android.internal.task.configureWith
|
import im.vector.matrix.android.internal.task.configureWith
|
||||||
import im.vector.matrix.android.internal.task.toConfigurableTask
|
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
|
||||||
internal class DefaultRoomDirectoryService @Inject constructor(private val getPublicRoomTask: GetPublicRoomTask,
|
internal class DefaultRoomDirectoryService @Inject constructor(private val getPublicRoomTask: GetPublicRoomTask,
|
||||||
@ -39,22 +38,25 @@ internal class DefaultRoomDirectoryService @Inject constructor(private val getPu
|
|||||||
publicRoomsParams: PublicRoomsParams,
|
publicRoomsParams: PublicRoomsParams,
|
||||||
callback: MatrixCallback<PublicRoomsResponse>): Cancelable {
|
callback: MatrixCallback<PublicRoomsResponse>): Cancelable {
|
||||||
return getPublicRoomTask
|
return getPublicRoomTask
|
||||||
.configureWith(GetPublicRoomTask.Params(server, publicRoomsParams))
|
.configureWith(GetPublicRoomTask.Params(server, publicRoomsParams)) {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun joinRoom(roomId: String, callback: MatrixCallback<Unit>): Cancelable {
|
override fun joinRoom(roomId: String, callback: MatrixCallback<Unit>): Cancelable {
|
||||||
return joinRoomTask
|
return joinRoomTask
|
||||||
.configureWith(JoinRoomTask.Params(roomId))
|
.configureWith(JoinRoomTask.Params(roomId)) {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun getThirdPartyProtocol(callback: MatrixCallback<Map<String, ThirdPartyProtocol>>): Cancelable {
|
override fun getThirdPartyProtocol(callback: MatrixCallback<Map<String, ThirdPartyProtocol>>): Cancelable {
|
||||||
return getThirdPartyProtocolsTask
|
return getThirdPartyProtocolsTask
|
||||||
.toConfigurableTask()
|
.configureWith {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -46,8 +46,9 @@ internal class DefaultRoomService @Inject constructor(private val monarchy: Mona
|
|||||||
|
|
||||||
override fun createRoom(createRoomParams: CreateRoomParams, callback: MatrixCallback<String>): Cancelable {
|
override fun createRoom(createRoomParams: CreateRoomParams, callback: MatrixCallback<String>): Cancelable {
|
||||||
return createRoomTask
|
return createRoomTask
|
||||||
.configureWith(createRoomParams)
|
.configureWith(createRoomParams) {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,8 +75,9 @@ internal class DefaultRoomService @Inject constructor(private val monarchy: Mona
|
|||||||
|
|
||||||
override fun joinRoom(roomId: String, viaServers: List<String>, callback: MatrixCallback<Unit>): Cancelable {
|
override fun joinRoom(roomId: String, viaServers: List<String>, callback: MatrixCallback<Unit>): Cancelable {
|
||||||
return joinRoomTask
|
return joinRoomTask
|
||||||
.configureWith(JoinRoomTask.Params(roomId, viaServers))
|
.configureWith(JoinRoomTask.Params(roomId, viaServers)) {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -44,8 +44,10 @@ internal class DefaultMembershipService @Inject constructor(private val roomId:
|
|||||||
|
|
||||||
override fun loadRoomMembersIfNeeded(matrixCallback: MatrixCallback<Unit>): Cancelable {
|
override fun loadRoomMembersIfNeeded(matrixCallback: MatrixCallback<Unit>): Cancelable {
|
||||||
val params = LoadRoomMembersTask.Params(roomId, Membership.LEAVE)
|
val params = LoadRoomMembersTask.Params(roomId, Membership.LEAVE)
|
||||||
return loadRoomMembersTask.configureWith(params)
|
return loadRoomMembersTask
|
||||||
.dispatchTo(matrixCallback)
|
.configureWith(params) {
|
||||||
|
this.callback = matrixCallback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,22 +79,28 @@ internal class DefaultMembershipService @Inject constructor(private val roomId:
|
|||||||
|
|
||||||
override fun invite(userId: String, callback: MatrixCallback<Unit>): Cancelable {
|
override fun invite(userId: String, callback: MatrixCallback<Unit>): Cancelable {
|
||||||
val params = InviteTask.Params(roomId, userId)
|
val params = InviteTask.Params(roomId, userId)
|
||||||
return inviteTask.configureWith(params)
|
return inviteTask
|
||||||
.dispatchTo(callback)
|
.configureWith(params) {
|
||||||
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun join(viaServers: List<String>, callback: MatrixCallback<Unit>): Cancelable {
|
override fun join(viaServers: List<String>, callback: MatrixCallback<Unit>): Cancelable {
|
||||||
val params = JoinRoomTask.Params(roomId, viaServers)
|
val params = JoinRoomTask.Params(roomId, viaServers)
|
||||||
return joinTask.configureWith(params)
|
return joinTask
|
||||||
.dispatchTo(callback)
|
.configureWith(params) {
|
||||||
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun leave(callback: MatrixCallback<Unit>): Cancelable {
|
override fun leave(callback: MatrixCallback<Unit>): Cancelable {
|
||||||
val params = LeaveRoomTask.Params(roomId)
|
val params = LeaveRoomTask.Params(roomId)
|
||||||
return leaveRoomTask.configureWith(params)
|
return leaveRoomTask
|
||||||
.dispatchTo(callback)
|
.configureWith(params) {
|
||||||
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -37,17 +37,29 @@ internal class DefaultReadService @Inject constructor(private val roomId: String
|
|||||||
|
|
||||||
override fun markAllAsRead(callback: MatrixCallback<Unit>) {
|
override fun markAllAsRead(callback: MatrixCallback<Unit>) {
|
||||||
val params = SetReadMarkersTask.Params(roomId, markAllAsRead = true)
|
val params = SetReadMarkersTask.Params(roomId, markAllAsRead = true)
|
||||||
setReadMarkersTask.configureWith(params).dispatchTo(callback).executeBy(taskExecutor)
|
setReadMarkersTask
|
||||||
|
.configureWith(params) {
|
||||||
|
this.callback = callback
|
||||||
|
}
|
||||||
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun setReadReceipt(eventId: String, callback: MatrixCallback<Unit>) {
|
override fun setReadReceipt(eventId: String, callback: MatrixCallback<Unit>) {
|
||||||
val params = SetReadMarkersTask.Params(roomId, fullyReadEventId = null, readReceiptEventId = eventId)
|
val params = SetReadMarkersTask.Params(roomId, fullyReadEventId = null, readReceiptEventId = eventId)
|
||||||
setReadMarkersTask.configureWith(params).dispatchTo(callback).executeBy(taskExecutor)
|
setReadMarkersTask
|
||||||
|
.configureWith(params) {
|
||||||
|
this.callback = callback
|
||||||
|
}
|
||||||
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun setReadMarker(fullyReadEventId: String, callback: MatrixCallback<Unit>) {
|
override fun setReadMarker(fullyReadEventId: String, callback: MatrixCallback<Unit>) {
|
||||||
val params = SetReadMarkersTask.Params(roomId, fullyReadEventId = fullyReadEventId, readReceiptEventId = null)
|
val params = SetReadMarkersTask.Params(roomId, fullyReadEventId = fullyReadEventId, readReceiptEventId = null)
|
||||||
setReadMarkersTask.configureWith(params).dispatchTo(callback).executeBy(taskExecutor)
|
setReadMarkersTask
|
||||||
|
.configureWith(params) {
|
||||||
|
this.callback = callback
|
||||||
|
}
|
||||||
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -55,13 +67,13 @@ internal class DefaultReadService @Inject constructor(private val roomId: String
|
|||||||
var isEventRead = false
|
var isEventRead = false
|
||||||
monarchy.doWithRealm {
|
monarchy.doWithRealm {
|
||||||
val readReceipt = ReadReceiptEntity.where(it, roomId, credentials.userId).findFirst()
|
val readReceipt = ReadReceiptEntity.where(it, roomId, credentials.userId).findFirst()
|
||||||
?: return@doWithRealm
|
?: return@doWithRealm
|
||||||
val liveChunk = ChunkEntity.findLastLiveChunkFromRoom(it, roomId)
|
val liveChunk = ChunkEntity.findLastLiveChunkFromRoom(it, roomId)
|
||||||
?: return@doWithRealm
|
?: return@doWithRealm
|
||||||
val readReceiptIndex = liveChunk.timelineEvents.find(readReceipt.eventId)?.root?.displayIndex
|
val readReceiptIndex = liveChunk.timelineEvents.find(readReceipt.eventId)?.root?.displayIndex
|
||||||
?: Int.MIN_VALUE
|
?: Int.MIN_VALUE
|
||||||
val eventToCheckIndex = liveChunk.timelineEvents.find(eventId)?.root?.displayIndex
|
val eventToCheckIndex = liveChunk.timelineEvents.find(eventId)?.root?.displayIndex
|
||||||
?: Int.MAX_VALUE
|
?: Int.MAX_VALUE
|
||||||
isEventRead = eventToCheckIndex <= readReceiptIndex
|
isEventRead = eventToCheckIndex <= readReceiptIndex
|
||||||
}
|
}
|
||||||
return isEventRead
|
return isEventRead
|
||||||
|
@ -80,28 +80,30 @@ internal class DefaultRelationService @Inject constructor(private val context: C
|
|||||||
reaction,
|
reaction,
|
||||||
myUserId
|
myUserId
|
||||||
)
|
)
|
||||||
findReactionEventForUndoTask.configureWith(params)
|
val callback = object : MatrixCallback<FindReactionEventForUndoTask.Result> {
|
||||||
.enableRetry()
|
override fun onSuccess(data: FindReactionEventForUndoTask.Result) {
|
||||||
.dispatchTo(object : MatrixCallback<FindReactionEventForUndoTask.Result> {
|
if (data.redactEventId == null) {
|
||||||
override fun onSuccess(data: FindReactionEventForUndoTask.Result) {
|
Timber.w("Cannot find reaction to undo (not yet synced?)")
|
||||||
if (data.redactEventId == null) {
|
//TODO?
|
||||||
Timber.w("Cannot find reaction to undo (not yet synced?)")
|
}
|
||||||
//TODO?
|
data.redactEventId?.let { toRedact ->
|
||||||
}
|
|
||||||
data.redactEventId?.let { toRedact ->
|
|
||||||
|
|
||||||
val redactEvent = eventFactory.createRedactEvent(roomId, toRedact, null).also {
|
val redactEvent = eventFactory.createRedactEvent(roomId, toRedact, null).also {
|
||||||
saveLocalEcho(it)
|
saveLocalEcho(it)
|
||||||
}
|
|
||||||
val redactWork = createRedactEventWork(redactEvent, toRedact, null)
|
|
||||||
|
|
||||||
TimelineSendEventWorkCommon.postWork(context, roomId, redactWork)
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
val redactWork = createRedactEventWork(redactEvent, toRedact, null)
|
||||||
.executeBy(taskExecutor)
|
|
||||||
|
|
||||||
|
TimelineSendEventWorkCommon.postWork(context, roomId, redactWork)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
findReactionEventForUndoTask
|
||||||
|
.configureWith(params) {
|
||||||
|
this.retryCount = Int.MAX_VALUE
|
||||||
|
this.callback = callback
|
||||||
|
}
|
||||||
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO duplicate with send service?
|
//TODO duplicate with send service?
|
||||||
@ -167,8 +169,10 @@ internal class DefaultRelationService @Inject constructor(private val context: C
|
|||||||
|
|
||||||
override fun fetchEditHistory(eventId: String, callback: MatrixCallback<List<Event>>) {
|
override fun fetchEditHistory(eventId: String, callback: MatrixCallback<List<Event>>) {
|
||||||
val params = FetchEditHistoryTask.Params(roomId, cryptoService.isRoomEncrypted(roomId), eventId)
|
val params = FetchEditHistoryTask.Params(roomId, cryptoService.isRoomEncrypted(roomId), eventId)
|
||||||
fetchEditHistoryTask.configureWith(params)
|
fetchEditHistoryTask
|
||||||
.dispatchTo(callback)
|
.configureWith(params) {
|
||||||
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,8 +51,10 @@ internal class DefaultStateService @Inject constructor(private val roomId: Strin
|
|||||||
))
|
))
|
||||||
|
|
||||||
|
|
||||||
sendStateTask.configureWith(params)
|
sendStateTask
|
||||||
.dispatchTo(callback)
|
.configureWith(params) {
|
||||||
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@ import im.vector.matrix.android.internal.database.query.findIncludingEvent
|
|||||||
import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom
|
import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom
|
||||||
import im.vector.matrix.android.internal.database.query.where
|
import im.vector.matrix.android.internal.database.query.where
|
||||||
import im.vector.matrix.android.internal.database.query.whereInRoom
|
import im.vector.matrix.android.internal.database.query.whereInRoom
|
||||||
|
import im.vector.matrix.android.internal.task.TaskConstraints
|
||||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||||
import im.vector.matrix.android.internal.task.configureWith
|
import im.vector.matrix.android.internal.task.configureWith
|
||||||
import im.vector.matrix.android.internal.util.Debouncer
|
import im.vector.matrix.android.internal.util.Debouncer
|
||||||
@ -133,7 +134,7 @@ internal class DefaultTimeline(
|
|||||||
builtEventsIdMap[eventId]?.let { builtIndex ->
|
builtEventsIdMap[eventId]?.let { builtIndex ->
|
||||||
//Update the relation of existing event
|
//Update the relation of existing event
|
||||||
builtEvents[builtIndex]?.let { te ->
|
builtEvents[builtIndex]?.let { te ->
|
||||||
builtEvents[builtIndex] = eventEntity.asDomain()
|
builtEvents[builtIndex] = eventEntity.asDomain()
|
||||||
hasChanged = true
|
hasChanged = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -209,7 +210,7 @@ internal class DefaultTimeline(
|
|||||||
override fun pendingEventCount(): Int {
|
override fun pendingEventCount(): Int {
|
||||||
var count = 0
|
var count = 0
|
||||||
Realm.getInstance(realmConfiguration).use {
|
Realm.getInstance(realmConfiguration).use {
|
||||||
count = RoomEntity.where(it,roomId).findFirst()?.sendingTimelineEvents?.count() ?: 0
|
count = RoomEntity.where(it, roomId).findFirst()?.sendingTimelineEvents?.count() ?: 0
|
||||||
}
|
}
|
||||||
return count
|
return count
|
||||||
}
|
}
|
||||||
@ -217,7 +218,7 @@ internal class DefaultTimeline(
|
|||||||
override fun failedToDeliverEventCount(): Int {
|
override fun failedToDeliverEventCount(): Int {
|
||||||
var count = 0
|
var count = 0
|
||||||
Realm.getInstance(realmConfiguration).use {
|
Realm.getInstance(realmConfiguration).use {
|
||||||
count = RoomEntity.where(it,roomId).findFirst()?.sendingTimelineEvents?.filter {
|
count = RoomEntity.where(it, roomId).findFirst()?.sendingTimelineEvents?.filter {
|
||||||
it.root?.sendState?.hasFailed() ?: false
|
it.root?.sendState?.hasFailed() ?: false
|
||||||
}?.count() ?: 0
|
}?.count() ?: 0
|
||||||
}
|
}
|
||||||
@ -405,24 +406,27 @@ internal class DefaultTimeline(
|
|||||||
limit = limit)
|
limit = limit)
|
||||||
|
|
||||||
Timber.v("Should fetch $limit items $direction")
|
Timber.v("Should fetch $limit items $direction")
|
||||||
cancelableBag += paginationTask.configureWith(params)
|
cancelableBag += paginationTask
|
||||||
.enableRetry()
|
.configureWith(params) {
|
||||||
.dispatchTo(object : MatrixCallback<TokenChunkEventPersistor.Result> {
|
this.retryCount = Int.MAX_VALUE
|
||||||
override fun onSuccess(data: TokenChunkEventPersistor.Result) {
|
this.constraints = TaskConstraints(connectedToNetwork = true)
|
||||||
if (data == TokenChunkEventPersistor.Result.SUCCESS) {
|
this.callback = object : MatrixCallback<TokenChunkEventPersistor.Result> {
|
||||||
Timber.v("Success fetching $limit items $direction from pagination request")
|
override fun onSuccess(data: TokenChunkEventPersistor.Result) {
|
||||||
} else {
|
if (data == TokenChunkEventPersistor.Result.SUCCESS) {
|
||||||
// Database won't be updated, so we force pagination request
|
Timber.v("Success fetching $limit items $direction from pagination request")
|
||||||
BACKGROUND_HANDLER.post {
|
} else {
|
||||||
executePaginationTask(direction, limit)
|
// Database won't be updated, so we force pagination request
|
||||||
|
BACKGROUND_HANDLER.post {
|
||||||
|
executePaginationTask(direction, limit)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
override fun onFailure(failure: Throwable) {
|
override fun onFailure(failure: Throwable) {
|
||||||
Timber.v("Failure fetching $limit items $direction from pagination request")
|
Timber.v("Failure fetching $limit items $direction from pagination request")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,7 +19,7 @@ package im.vector.matrix.android.internal.session.signout
|
|||||||
import im.vector.matrix.android.api.MatrixCallback
|
import im.vector.matrix.android.api.MatrixCallback
|
||||||
import im.vector.matrix.android.api.session.signout.SignOutService
|
import im.vector.matrix.android.api.session.signout.SignOutService
|
||||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||||
import im.vector.matrix.android.internal.task.toConfigurableTask
|
import im.vector.matrix.android.internal.task.configureWith
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
|
||||||
internal class DefaultSignOutService @Inject constructor(private val signOutTask: SignOutTask,
|
internal class DefaultSignOutService @Inject constructor(private val signOutTask: SignOutTask,
|
||||||
@ -27,8 +27,9 @@ internal class DefaultSignOutService @Inject constructor(private val signOutTask
|
|||||||
|
|
||||||
override fun signOut(callback: MatrixCallback<Unit>) {
|
override fun signOut(callback: MatrixCallback<Unit>) {
|
||||||
signOutTask
|
signOutTask
|
||||||
.toConfigurableTask()
|
.configureWith {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,58 +104,57 @@ open class SyncService : Service() {
|
|||||||
} else {
|
} else {
|
||||||
Timber.v("Execute sync request with timeout 0")
|
Timber.v("Execute sync request with timeout 0")
|
||||||
val params = SyncTask.Params(TIME_OUT)
|
val params = SyncTask.Params(TIME_OUT)
|
||||||
cancelableTask = syncTask.configureWith(params)
|
cancelableTask = syncTask
|
||||||
.callbackOn(TaskThread.SYNC)
|
.configureWith(params) {
|
||||||
.executeOn(TaskThread.SYNC)
|
callbackThread = TaskThread.SYNC
|
||||||
.dispatchTo(object : MatrixCallback<Unit> {
|
executionThread = TaskThread.SYNC
|
||||||
override fun onSuccess(data: Unit) {
|
callback = object : MatrixCallback<Unit> {
|
||||||
cancelableTask = null
|
override fun onSuccess(data: Unit) {
|
||||||
if (!once) {
|
cancelableTask = null
|
||||||
timer.schedule(object : TimerTask() {
|
if (!once) {
|
||||||
override fun run() {
|
timer.schedule(object : TimerTask() {
|
||||||
doSync()
|
override fun run() {
|
||||||
}
|
doSync()
|
||||||
}, NEXT_BATCH_DELAY)
|
}
|
||||||
} else {
|
}, NEXT_BATCH_DELAY)
|
||||||
//stop
|
} else {
|
||||||
stopMe()
|
//stop
|
||||||
|
stopMe()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onFailure(failure: Throwable) {
|
||||||
|
Timber.e(failure)
|
||||||
|
cancelableTask = null
|
||||||
|
if (failure is Failure.NetworkConnection
|
||||||
|
&& failure.cause is SocketTimeoutException) {
|
||||||
|
// Timeout are not critical
|
||||||
|
timer.schedule(object : TimerTask() {
|
||||||
|
override fun run() {
|
||||||
|
doSync()
|
||||||
|
}
|
||||||
|
}, 5_000L)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (failure !is Failure.NetworkConnection
|
||||||
|
|| failure.cause is JsonEncodingException) {
|
||||||
|
// Wait 10s before retrying
|
||||||
|
timer.schedule(object : TimerTask() {
|
||||||
|
override fun run() {
|
||||||
|
doSync()
|
||||||
|
}
|
||||||
|
}, 5_000L)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (failure is Failure.ServerError
|
||||||
|
&& (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) {
|
||||||
|
// No token or invalid token, stop the thread
|
||||||
|
stopSelf()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
override fun onFailure(failure: Throwable) {
|
|
||||||
Timber.e(failure)
|
|
||||||
cancelableTask = null
|
|
||||||
if (failure is Failure.NetworkConnection
|
|
||||||
&& failure.cause is SocketTimeoutException) {
|
|
||||||
// Timeout are not critical
|
|
||||||
timer.schedule(object : TimerTask() {
|
|
||||||
override fun run() {
|
|
||||||
doSync()
|
|
||||||
}
|
|
||||||
}, 5_000L)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (failure !is Failure.NetworkConnection
|
|
||||||
|| failure.cause is JsonEncodingException) {
|
|
||||||
// Wait 10s before retrying
|
|
||||||
timer.schedule(object : TimerTask() {
|
|
||||||
override fun run() {
|
|
||||||
doSync()
|
|
||||||
}
|
|
||||||
}, 5_000L)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (failure is Failure.ServerError
|
|
||||||
&& (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) {
|
|
||||||
// No token or invalid token, stop the thread
|
|
||||||
stopSelf()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
})
|
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,40 +102,42 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||||||
Timber.v("[$this] Execute sync request with timeout $DEFAULT_LONG_POOL_TIMEOUT")
|
Timber.v("[$this] Execute sync request with timeout $DEFAULT_LONG_POOL_TIMEOUT")
|
||||||
val latch = CountDownLatch(1)
|
val latch = CountDownLatch(1)
|
||||||
val params = SyncTask.Params(DEFAULT_LONG_POOL_TIMEOUT)
|
val params = SyncTask.Params(DEFAULT_LONG_POOL_TIMEOUT)
|
||||||
cancelableTask = syncTask.configureWith(params)
|
|
||||||
.callbackOn(TaskThread.SYNC)
|
cancelableTask = syncTask.configureWith(params) {
|
||||||
.executeOn(TaskThread.SYNC)
|
this.callbackThread = TaskThread.SYNC
|
||||||
.dispatchTo(object : MatrixCallback<Unit> {
|
this.executionThread = TaskThread.SYNC
|
||||||
override fun onSuccess(data: Unit) {
|
this.callback = object : MatrixCallback<Unit> {
|
||||||
latch.countDown()
|
|
||||||
|
override fun onSuccess(data: Unit) {
|
||||||
|
latch.countDown()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onFailure(failure: Throwable) {
|
||||||
|
if (failure is Failure.NetworkConnection
|
||||||
|
&& failure.cause is SocketTimeoutException) {
|
||||||
|
// Timeout are not critical
|
||||||
|
Timber.v("Timeout")
|
||||||
|
} else {
|
||||||
|
Timber.e(failure)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onFailure(failure: Throwable) {
|
if (failure !is Failure.NetworkConnection
|
||||||
if (failure is Failure.NetworkConnection
|
|| failure.cause is JsonEncodingException) {
|
||||||
&& failure.cause is SocketTimeoutException) {
|
// Wait 10s before retrying
|
||||||
// Timeout are not critical
|
sleep(RETRY_WAIT_TIME_MS)
|
||||||
Timber.v("Timeout")
|
|
||||||
} else {
|
|
||||||
Timber.e(failure)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (failure !is Failure.NetworkConnection
|
|
||||||
|| failure.cause is JsonEncodingException) {
|
|
||||||
// Wait 10s before retrying
|
|
||||||
sleep(RETRY_WAIT_TIME_MS)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (failure is Failure.ServerError
|
|
||||||
&& (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) {
|
|
||||||
// No token or invalid token, stop the thread
|
|
||||||
updateStateTo(SyncState.KILLING)
|
|
||||||
}
|
|
||||||
|
|
||||||
latch.countDown()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
})
|
if (failure is Failure.ServerError
|
||||||
|
&& (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) {
|
||||||
|
// No token or invalid token, stop the thread
|
||||||
|
updateStateTo(SyncState.KILLING)
|
||||||
|
}
|
||||||
|
latch.countDown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
|
|
||||||
latch.await()
|
latch.await()
|
||||||
if (state is SyncState.RUNNING) {
|
if (state is SyncState.RUNNING) {
|
||||||
updateStateTo(SyncState.RUNNING(afterPause = false))
|
updateStateTo(SyncState.RUNNING(afterPause = false))
|
||||||
|
@ -61,7 +61,7 @@ internal class DefaultUserService @Inject constructor(private val monarchy: Mona
|
|||||||
|
|
||||||
override fun getUser(userId: String): User? {
|
override fun getUser(userId: String): User? {
|
||||||
val userEntity = monarchy.fetchCopied { UserEntity.where(it, userId).findFirst() }
|
val userEntity = monarchy.fetchCopied { UserEntity.where(it, userId).findFirst() }
|
||||||
?: return null
|
?: return null
|
||||||
|
|
||||||
return userEntity.asDomain()
|
return userEntity.asDomain()
|
||||||
}
|
}
|
||||||
@ -113,8 +113,9 @@ internal class DefaultUserService @Inject constructor(private val monarchy: Mona
|
|||||||
callback: MatrixCallback<List<User>>): Cancelable {
|
callback: MatrixCallback<List<User>>): Cancelable {
|
||||||
val params = SearchUserTask.Params(limit, search, excludedUserIds)
|
val params = SearchUserTask.Params(limit, search, excludedUserIds)
|
||||||
return searchUserTask
|
return searchUserTask
|
||||||
.configureWith(params)
|
.configureWith(params) {
|
||||||
.dispatchTo(callback)
|
this.callback = callback
|
||||||
|
}
|
||||||
.executeBy(taskExecutor)
|
.executeBy(taskExecutor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,54 +18,63 @@ package im.vector.matrix.android.internal.task
|
|||||||
|
|
||||||
import im.vector.matrix.android.api.MatrixCallback
|
import im.vector.matrix.android.api.MatrixCallback
|
||||||
import im.vector.matrix.android.api.util.Cancelable
|
import im.vector.matrix.android.api.util.Cancelable
|
||||||
|
import java.util.*
|
||||||
|
|
||||||
internal fun <PARAMS, RESULT> Task<PARAMS, RESULT>.configureWith(params: PARAMS): ConfigurableTask<PARAMS, RESULT> {
|
internal fun <PARAMS, RESULT> Task<PARAMS, RESULT>.configureWith(params: PARAMS, init: (ConfigurableTask.Builder<PARAMS, RESULT>.() -> Unit) = {}): ConfigurableTask<PARAMS, RESULT> {
|
||||||
return ConfigurableTask(this, params)
|
return ConfigurableTask.Builder(this, params).apply(init).build()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
internal fun <RESULT> Task<Unit, RESULT>.configureWith(init: (ConfigurableTask.Builder<Unit, RESULT>.() -> Unit) = {}): ConfigurableTask<Unit, RESULT> {
|
||||||
* Convert a Task to a ConfigurableTask without parameter
|
return configureWith(Unit, init)
|
||||||
*/
|
|
||||||
internal fun <RESULT> Task<Unit, RESULT>.toConfigurableTask(): ConfigurableTask<Unit, RESULT> {
|
|
||||||
return ConfigurableTask(this, Unit)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
internal data class ConfigurableTask<PARAMS, RESULT>(
|
internal data class ConfigurableTask<PARAMS, RESULT>(
|
||||||
val task: Task<PARAMS, RESULT>,
|
val task: Task<PARAMS, RESULT>,
|
||||||
val params: PARAMS,
|
val params: PARAMS,
|
||||||
val callbackThread: TaskThread = TaskThread.MAIN,
|
val id: UUID,
|
||||||
val executionThread: TaskThread = TaskThread.IO,
|
val callbackThread: TaskThread,
|
||||||
val retryCount: Int = 0,
|
val executionThread: TaskThread,
|
||||||
val callback: MatrixCallback<RESULT> = object : MatrixCallback<RESULT> {}
|
val constraints: TaskConstraints,
|
||||||
|
val retryCount: Int,
|
||||||
|
val callback: MatrixCallback<RESULT>
|
||||||
|
|
||||||
) : Task<PARAMS, RESULT> {
|
) : Task<PARAMS, RESULT> {
|
||||||
|
|
||||||
|
|
||||||
|
class Builder<PARAMS, RESULT>(
|
||||||
|
private val task: Task<PARAMS, RESULT>,
|
||||||
|
private val params: PARAMS,
|
||||||
|
var id: UUID = UUID.randomUUID(),
|
||||||
|
var callbackThread: TaskThread = TaskThread.MAIN,
|
||||||
|
var executionThread: TaskThread = TaskThread.IO,
|
||||||
|
var constraints: TaskConstraints = TaskConstraints(),
|
||||||
|
var retryCount: Int = 0,
|
||||||
|
var callback: MatrixCallback<RESULT> = object : MatrixCallback<RESULT> {}
|
||||||
|
) {
|
||||||
|
|
||||||
|
fun build() = ConfigurableTask(
|
||||||
|
task = task,
|
||||||
|
params = params,
|
||||||
|
id = id,
|
||||||
|
callbackThread = callbackThread,
|
||||||
|
executionThread = executionThread,
|
||||||
|
constraints = constraints,
|
||||||
|
retryCount = retryCount,
|
||||||
|
callback = callback
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
override suspend fun execute(params: PARAMS): RESULT {
|
override suspend fun execute(params: PARAMS): RESULT {
|
||||||
return task.execute(params)
|
return task.execute(params)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun callbackOn(thread: TaskThread): ConfigurableTask<PARAMS, RESULT> {
|
|
||||||
return copy(callbackThread = thread)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun executeOn(thread: TaskThread): ConfigurableTask<PARAMS, RESULT> {
|
|
||||||
return copy(executionThread = thread)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun dispatchTo(matrixCallback: MatrixCallback<RESULT>): ConfigurableTask<PARAMS, RESULT> {
|
|
||||||
return copy(callback = matrixCallback)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun enableRetry(retryCount: Int = Int.MAX_VALUE): ConfigurableTask<PARAMS, RESULT> {
|
|
||||||
return copy(retryCount = retryCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun executeBy(taskExecutor: TaskExecutor): Cancelable {
|
fun executeBy(taskExecutor: TaskExecutor): Cancelable {
|
||||||
return taskExecutor.execute(this)
|
return taskExecutor.execute(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun toString(): String {
|
override fun toString(): String {
|
||||||
return task.javaClass.name
|
return "${task.javaClass.name} with ID: $id"
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,22 @@
|
|||||||
|
/*
|
||||||
|
|
||||||
|
* Copyright 2019 New Vector Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
|
||||||
|
*/
|
||||||
|
package im.vector.matrix.android.internal.task
|
||||||
|
|
||||||
|
data class TaskConstraints(
|
||||||
|
val connectedToNetwork: Boolean = false
|
||||||
|
)
|
@ -21,6 +21,7 @@ import im.vector.matrix.android.api.util.Cancelable
|
|||||||
import im.vector.matrix.android.api.util.CancelableBag
|
import im.vector.matrix.android.api.util.CancelableBag
|
||||||
import im.vector.matrix.android.internal.di.MatrixScope
|
import im.vector.matrix.android.internal.di.MatrixScope
|
||||||
import im.vector.matrix.android.internal.extensions.foldToCallback
|
import im.vector.matrix.android.internal.extensions.foldToCallback
|
||||||
|
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
|
||||||
import im.vector.matrix.android.internal.util.CancelableCoroutine
|
import im.vector.matrix.android.internal.util.CancelableCoroutine
|
||||||
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
|
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
|
||||||
import kotlinx.coroutines.GlobalScope
|
import kotlinx.coroutines.GlobalScope
|
||||||
@ -32,7 +33,8 @@ import javax.inject.Inject
|
|||||||
import kotlin.coroutines.EmptyCoroutineContext
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
|
|
||||||
@MatrixScope
|
@MatrixScope
|
||||||
internal class TaskExecutor @Inject constructor(private val coroutineDispatchers: MatrixCoroutineDispatchers) {
|
internal class TaskExecutor @Inject constructor(private val coroutineDispatchers: MatrixCoroutineDispatchers,
|
||||||
|
private val networkConnectivityChecker: NetworkConnectivityChecker) {
|
||||||
|
|
||||||
private val cancelableBag = CancelableBag()
|
private val cancelableBag = CancelableBag()
|
||||||
|
|
||||||
@ -41,8 +43,13 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers
|
|||||||
val job = GlobalScope.launch(task.callbackThread.toDispatcher()) {
|
val job = GlobalScope.launch(task.callbackThread.toDispatcher()) {
|
||||||
val resultOrFailure = runCatching {
|
val resultOrFailure = runCatching {
|
||||||
withContext(task.executionThread.toDispatcher()) {
|
withContext(task.executionThread.toDispatcher()) {
|
||||||
Timber.v("Executing $task on ${Thread.currentThread().name}")
|
Timber.v("Enqueue task $task")
|
||||||
retry(task.retryCount) {
|
retry(task.retryCount) {
|
||||||
|
if (task.constraints.connectedToNetwork) {
|
||||||
|
Timber.v("Waiting network for $task")
|
||||||
|
networkConnectivityChecker.waitUntilConnected()
|
||||||
|
}
|
||||||
|
Timber.v("Execute task $task on ${Thread.currentThread().name}")
|
||||||
task.execute(task.params)
|
task.execute(task.params)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -74,6 +81,7 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers
|
|||||||
try {
|
try {
|
||||||
return block()
|
return block()
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
|
Timber.v("Retry task after $currentDelay ms")
|
||||||
delay(currentDelay)
|
delay(currentDelay)
|
||||||
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
|
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user