From d3ce4c491ced778513c448ab3453a582b4af3fd0 Mon Sep 17 00:00:00 2001 From: ganfra Date: Tue, 6 Aug 2019 11:45:06 +0200 Subject: [PATCH] Clean code after review --- .../internal/database/AsyncTransaction.kt | 69 +++---------------- .../network/NetworkConnectivityChecker.kt | 6 +- .../session/room/send/EncryptEventWorker.kt | 38 ++++------ .../android/internal/task/ConfigurableTask.kt | 7 +- .../android/internal/task/TaskExecutor.kt | 19 ++--- .../matrix/android/internal/util/Monarchy.kt | 2 +- .../internal/util/SuspendMatrixCallback.kt | 35 ++++++++++ 7 files changed, 67 insertions(+), 109 deletions(-) create mode 100644 matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/SuspendMatrixCallback.kt diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/AsyncTransaction.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/AsyncTransaction.kt index a4ff07f3..e1beefc2 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/AsyncTransaction.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/AsyncTransaction.kt @@ -17,74 +17,23 @@ package im.vector.matrix.android.internal.database import io.realm.Realm import io.realm.RealmConfiguration -import io.realm.internal.OsSharedRealm -import kotlinx.coroutines.CancellableContinuation -import kotlinx.coroutines.suspendCancellableCoroutine -import timber.log.Timber -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import java.util.concurrent.Future -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.isActive +import kotlinx.coroutines.withContext -private object AsyncTransactionThreadHolder { - val EXECUTOR: ExecutorService by lazy { - Executors.newSingleThreadExecutor() - } - -} - -private class AsyncTransactionRunnable(private val continuation: CancellableContinuation, - private val realmConfiguration: RealmConfiguration, - private val transaction: (realm: Realm) -> Unit) : Runnable { - - override fun run() { - if (Thread.currentThread().isInterrupted) { - return - } - var versionID: OsSharedRealm.VersionID? = null - var exception: Throwable? = null - - val bgRealm = Realm.getInstance(realmConfiguration) +suspend fun awaitTransaction(config: RealmConfiguration, transaction: suspend (realm: Realm) -> Unit) = withContext(Dispatchers.IO) { + Realm.getInstance(config).use { bgRealm -> bgRealm.beginTransaction() try { transaction(bgRealm) - if (Thread.currentThread().isInterrupted) { - return + if (isActive) { + bgRealm.commitTransaction() } - bgRealm.commitTransaction() - versionID = bgRealm.sharedRealm.versionID - } catch (e: Throwable) { - exception = e } finally { - try { - if (bgRealm.isInTransaction) { - bgRealm.cancelTransaction() - } - } finally { - bgRealm.close() + if (bgRealm.isInTransaction) { + bgRealm.cancelTransaction() } } - val backgroundException = exception - val backgroundVersionID = versionID - when { - backgroundVersionID != null -> continuation.resume(Unit) - backgroundException != null -> continuation.resumeWithException(backgroundException) - } } - -} - -suspend fun awaitTransaction(realmConfiguration: RealmConfiguration, transaction: (realm: Realm) -> Unit) { - return suspendCancellableCoroutine { continuation -> - var futureTask: Future<*>? = null - continuation.invokeOnCancellation { - Timber.v("Cancel database transaction") - futureTask?.cancel(true) - } - val runnable = AsyncTransactionRunnable(continuation, realmConfiguration, transaction) - futureTask = AsyncTransactionThreadHolder.EXECUTOR.submit(runnable) - } - } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/NetworkConnectivityChecker.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/NetworkConnectivityChecker.kt index 05dbe4c0..412cb73c 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/NetworkConnectivityChecker.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/network/NetworkConnectivityChecker.kt @@ -36,20 +36,20 @@ internal class NetworkConnectivityChecker @Inject constructor(context: Context) .build(context) private val merlinsBeard = MerlinsBeard.Builder().build(context) - private val listeners = ArrayList() + private val listeners = Collections.synchronizedList(ArrayList()) init { merlin.bind() merlin.registerDisconnectable { Timber.v("On Disconnect") - val localListeners = Collections.synchronizedList(listeners) + val localListeners = listeners.toList() localListeners.forEach { it.onDisconnect() } } merlin.registerConnectable { Timber.v("On Connect") - val localListeners = Collections.synchronizedList(listeners) + val localListeners = listeners.toList() localListeners.forEach { it.onConnect() } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/send/EncryptEventWorker.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/send/EncryptEventWorker.kt index 409a43f1..031ceb16 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/send/EncryptEventWorker.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/send/EncryptEventWorker.kt @@ -20,17 +20,16 @@ import android.content.Context import androidx.work.CoroutineWorker import androidx.work.WorkerParameters import com.squareup.moshi.JsonClass -import im.vector.matrix.android.api.MatrixCallback import im.vector.matrix.android.api.failure.Failure import im.vector.matrix.android.api.session.crypto.CryptoService import im.vector.matrix.android.api.session.events.model.Event import im.vector.matrix.android.api.session.room.send.SendState import im.vector.matrix.android.internal.crypto.model.MXEncryptEventContentResult +import im.vector.matrix.android.internal.util.awaitCallback import im.vector.matrix.android.internal.worker.SessionWorkerParams import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.getSessionComponent import timber.log.Timber -import java.util.concurrent.CountDownLatch import javax.inject.Inject internal class EncryptEventWorker(context: Context, params: WorkerParameters) @@ -71,44 +70,30 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters) } localEchoUpdater.updateSendState(localEvent.eventId, SendState.ENCRYPTING) - // TODO Better async handling - val latch = CountDownLatch(1) - - var result: MXEncryptEventContentResult? = null - var error: Throwable? = null val localMutableContent = HashMap(localEvent.content) params.keepKeys?.forEach { localMutableContent.remove(it) } + var error: Throwable? = null + var result: MXEncryptEventContentResult? = null try { - crypto.encryptEventContent(localMutableContent, localEvent.type, params.roomId, object : MatrixCallback { - override fun onSuccess(data: MXEncryptEventContentResult) { - result = data - latch.countDown() - } - - override fun onFailure(failure: Throwable) { - error = failure - latch.countDown() - } - }) - } catch (e: Throwable) { - error = e - latch.countDown() + result = awaitCallback { + crypto.encryptEventContent(localMutableContent, localEvent.type, params.roomId, it) + } + } catch (throwable: Throwable) { + error = throwable } - latch.await() - if (result != null) { - val modifiedContent = HashMap(result?.eventContent) + val modifiedContent = HashMap(result.eventContent) params.keepKeys?.forEach { toKeep -> localEvent.content?.get(toKeep)?.let { //put it back in the encrypted thing modifiedContent[toKeep] = it } } - val safeResult = result!!.copy(eventContent = modifiedContent) + val safeResult = result.copy(eventContent = modifiedContent) val encryptedEvent = localEvent.copy( type = safeResult.eventType, content = safeResult.eventContent @@ -122,7 +107,8 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters) } localEchoUpdater.updateSendState(localEvent.eventId, sendState) //always return success, or the chain will be stuck for ever! - val nextWorkerParams = SendEventWorker.Params(params.userId, params.roomId, localEvent, error?.localizedMessage ?: "Error") + val nextWorkerParams = SendEventWorker.Params(params.userId, params.roomId, localEvent, error?.localizedMessage + ?: "Error") return Result.success(WorkerParamsFactory.toData(nextWorkerParams)) } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/ConfigurableTask.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/ConfigurableTask.kt index d937810e..6896fe68 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/ConfigurableTask.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/ConfigurableTask.kt @@ -40,7 +40,7 @@ internal data class ConfigurableTask( val retryCount: Int, val callback: MatrixCallback -) : Task { +) : Task by task { class Builder( @@ -66,11 +66,6 @@ internal data class ConfigurableTask( ) } - - override suspend fun execute(params: PARAMS): RESULT { - return task.execute(params) - } - fun executeBy(taskExecutor: TaskExecutor): Cancelable { return taskExecutor.execute(this) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt index 41905b5b..c3f08b15 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/task/TaskExecutor.kt @@ -18,16 +18,12 @@ package im.vector.matrix.android.internal.task import im.vector.matrix.android.api.util.Cancelable -import im.vector.matrix.android.api.util.CancelableBag import im.vector.matrix.android.internal.di.MatrixScope import im.vector.matrix.android.internal.extensions.foldToCallback import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.util.CancelableCoroutine import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext +import kotlinx.coroutines.* import timber.log.Timber import javax.inject.Inject import kotlin.coroutines.EmptyCoroutineContext @@ -36,11 +32,11 @@ import kotlin.coroutines.EmptyCoroutineContext internal class TaskExecutor @Inject constructor(private val coroutineDispatchers: MatrixCoroutineDispatchers, private val networkConnectivityChecker: NetworkConnectivityChecker) { - private val cancelableBag = CancelableBag() + private val executorScope = CoroutineScope(SupervisorJob()) fun execute(task: ConfigurableTask): Cancelable { - val job = GlobalScope.launch(task.callbackThread.toDispatcher()) { + val job = executorScope.launch(task.callbackThread.toDispatcher()) { val resultOrFailure = runCatching { withContext(task.executionThread.toDispatcher()) { Timber.v("Enqueue task $task") @@ -60,14 +56,11 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers } .foldToCallback(task.callback) } - return CancelableCoroutine(job).also { - cancelableBag += it - } + return CancelableCoroutine(job) } - fun cancelAll() = synchronized(this) { - cancelableBag.cancel() - } + fun cancelAll() = executorScope.coroutineContext.cancelChildren() + private suspend fun retry( times: Int = Int.MAX_VALUE, diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/Monarchy.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/Monarchy.kt index be9b0b06..fc999922 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/Monarchy.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/Monarchy.kt @@ -22,7 +22,7 @@ import io.realm.Realm import io.realm.RealmModel import java.util.concurrent.atomic.AtomicReference -internal suspend fun Monarchy.awaitTransaction(transaction: (realm: Realm) -> Unit) { +internal suspend fun Monarchy.awaitTransaction(transaction: suspend (realm: Realm) -> Unit) { awaitTransaction(realmConfiguration, transaction) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/SuspendMatrixCallback.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/SuspendMatrixCallback.kt new file mode 100644 index 00000000..801578ac --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/util/SuspendMatrixCallback.kt @@ -0,0 +1,35 @@ +/* + + * Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + + */ +package im.vector.matrix.android.internal.util + +import im.vector.matrix.android.api.MatrixCallback +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +suspend inline fun awaitCallback(crossinline callback: (MatrixCallback) -> Unit) = suspendCoroutine { cont -> + callback(object : MatrixCallback { + override fun onFailure(failure: Throwable) { + cont.resumeWithException(failure) + } + + override fun onSuccess(data: T) { + cont.resume(data) + } + }) +} \ No newline at end of file