forked from GitHub-Mirror/riotX-android
Merge pull request #299 from vector-im/feature/dix_concurrent_sync
Fix / Push worker could launch concurrent syncs
This commit is contained in:
commit
baaf493cb4
@ -18,7 +18,6 @@ package im.vector.matrix.android.internal.session.sync
|
|||||||
|
|
||||||
import arrow.core.Try
|
import arrow.core.Try
|
||||||
import im.vector.matrix.android.internal.crypto.CryptoManager
|
import im.vector.matrix.android.internal.crypto.CryptoManager
|
||||||
import im.vector.matrix.android.internal.session.SessionScope
|
|
||||||
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
|
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
|
||||||
import timber.log.Timber
|
import timber.log.Timber
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
@ -29,9 +29,9 @@ import im.vector.matrix.android.internal.session.sync.model.SyncResponse
|
|||||||
import im.vector.matrix.android.internal.task.Task
|
import im.vector.matrix.android.internal.task.Task
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
|
||||||
internal interface SyncTask : Task<SyncTask.Params, SyncResponse> {
|
internal interface SyncTask : Task<SyncTask.Params, Unit> {
|
||||||
|
|
||||||
data class Params(val token: String?, var timeout: Long = 30_000L)
|
data class Params(var timeout: Long = 30_000L)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,15 +39,17 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI,
|
|||||||
private val credentials: Credentials,
|
private val credentials: Credentials,
|
||||||
private val filterRepository: FilterRepository,
|
private val filterRepository: FilterRepository,
|
||||||
private val syncResponseHandler: SyncResponseHandler,
|
private val syncResponseHandler: SyncResponseHandler,
|
||||||
private val sessionParamsStore: SessionParamsStore
|
private val sessionParamsStore: SessionParamsStore,
|
||||||
|
private val syncTokenStore: SyncTokenStore
|
||||||
) : SyncTask {
|
) : SyncTask {
|
||||||
|
|
||||||
|
|
||||||
override suspend fun execute(params: SyncTask.Params): Try<SyncResponse> {
|
override suspend fun execute(params: SyncTask.Params): Try<Unit> {
|
||||||
val requestParams = HashMap<String, String>()
|
val requestParams = HashMap<String, String>()
|
||||||
var timeout = 0L
|
var timeout = 0L
|
||||||
if (params.token != null) {
|
val token = syncTokenStore.getLastToken()
|
||||||
requestParams["since"] = params.token
|
if (token != null) {
|
||||||
|
requestParams["since"] = token
|
||||||
timeout = params.timeout
|
timeout = params.timeout
|
||||||
}
|
}
|
||||||
requestParams["timeout"] = timeout.toString()
|
requestParams["timeout"] = timeout.toString()
|
||||||
@ -65,9 +67,9 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI,
|
|||||||
// Transmit the throwable
|
// Transmit the throwable
|
||||||
throwable.failure()
|
throwable.failure()
|
||||||
}.flatMap { syncResponse ->
|
}.flatMap { syncResponse ->
|
||||||
syncResponseHandler.handleResponse(syncResponse, params.token, false)
|
syncResponseHandler.handleResponse(syncResponse, token, false)
|
||||||
|
}.map {
|
||||||
|
syncTokenStore.saveToken(it.nextBatch)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
@ -26,8 +26,6 @@ import im.vector.matrix.android.api.failure.MatrixError
|
|||||||
import im.vector.matrix.android.api.util.Cancelable
|
import im.vector.matrix.android.api.util.Cancelable
|
||||||
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
|
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
|
||||||
import im.vector.matrix.android.internal.session.sync.SyncTask
|
import im.vector.matrix.android.internal.session.sync.SyncTask
|
||||||
import im.vector.matrix.android.internal.session.sync.SyncTokenStore
|
|
||||||
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
|
|
||||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||||
import im.vector.matrix.android.internal.task.TaskThread
|
import im.vector.matrix.android.internal.task.TaskThread
|
||||||
import im.vector.matrix.android.internal.task.configureWith
|
import im.vector.matrix.android.internal.task.configureWith
|
||||||
@ -35,9 +33,6 @@ import timber.log.Timber
|
|||||||
import java.net.SocketTimeoutException
|
import java.net.SocketTimeoutException
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
private const val DEFAULT_LONG_POOL_TIMEOUT = 10_000L
|
|
||||||
private const val BACKGROUND_LONG_POOL_TIMEOUT = 0L
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Can execute periodic sync task.
|
* Can execute periodic sync task.
|
||||||
* An IntentService is used in conjunction with the AlarmManager and a Broadcast Receiver
|
* An IntentService is used in conjunction with the AlarmManager and a Broadcast Receiver
|
||||||
@ -49,7 +44,6 @@ open class SyncService : Service() {
|
|||||||
private var mIsSelfDestroyed: Boolean = false
|
private var mIsSelfDestroyed: Boolean = false
|
||||||
private var cancelableTask: Cancelable? = null
|
private var cancelableTask: Cancelable? = null
|
||||||
|
|
||||||
private lateinit var syncTokenStore: SyncTokenStore
|
|
||||||
private lateinit var syncTask: SyncTask
|
private lateinit var syncTask: SyncTask
|
||||||
private lateinit var networkConnectivityChecker: NetworkConnectivityChecker
|
private lateinit var networkConnectivityChecker: NetworkConnectivityChecker
|
||||||
private lateinit var taskExecutor: TaskExecutor
|
private lateinit var taskExecutor: TaskExecutor
|
||||||
@ -57,18 +51,12 @@ open class SyncService : Service() {
|
|||||||
|
|
||||||
var timer = Timer()
|
var timer = Timer()
|
||||||
|
|
||||||
var nextBatchDelay = 0L
|
|
||||||
var timeout = 10_000L
|
|
||||||
|
|
||||||
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
|
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
|
||||||
Timber.i("onStartCommand ${intent}")
|
Timber.i("onStartCommand ${intent}")
|
||||||
nextBatchDelay = 60_000L
|
|
||||||
timeout = 0
|
|
||||||
intent?.let {
|
intent?.let {
|
||||||
val userId = it.getStringExtra(EXTRA_USER_ID)
|
val userId = it.getStringExtra(EXTRA_USER_ID)
|
||||||
val sessionComponent = Matrix.getInstance(applicationContext).sessionManager.getSessionComponent(userId)
|
val sessionComponent = Matrix.getInstance(applicationContext).sessionManager.getSessionComponent(userId)
|
||||||
?: return@let
|
?: return@let
|
||||||
syncTokenStore = sessionComponent.syncTokenStore()
|
|
||||||
syncTask = sessionComponent.syncTask()
|
syncTask = sessionComponent.syncTask()
|
||||||
networkConnectivityChecker = sessionComponent.networkConnectivityChecker()
|
networkConnectivityChecker = sessionComponent.networkConnectivityChecker()
|
||||||
taskExecutor = sessionComponent.taskExecutor()
|
taskExecutor = sessionComponent.taskExecutor()
|
||||||
@ -105,7 +93,6 @@ open class SyncService : Service() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun doSync(once: Boolean = false) {
|
fun doSync(once: Boolean = false) {
|
||||||
var nextBatch = syncTokenStore.getLastToken()
|
|
||||||
if (!networkConnectivityChecker.isConnected()) {
|
if (!networkConnectivityChecker.isConnected()) {
|
||||||
Timber.v("Sync is Paused. Waiting...")
|
Timber.v("Sync is Paused. Waiting...")
|
||||||
//TODO Retry in ?
|
//TODO Retry in ?
|
||||||
@ -113,24 +100,22 @@ open class SyncService : Service() {
|
|||||||
override fun run() {
|
override fun run() {
|
||||||
doSync()
|
doSync()
|
||||||
}
|
}
|
||||||
}, 5_000L)
|
}, NO_NETWORK_DELAY)
|
||||||
} else {
|
} else {
|
||||||
Timber.v("Execute sync request with token $nextBatch and timeout $timeout")
|
Timber.v("Execute sync request with timeout 0")
|
||||||
val params = SyncTask.Params(nextBatch, timeout)
|
val params = SyncTask.Params(TIME_OUT)
|
||||||
cancelableTask = syncTask.configureWith(params)
|
cancelableTask = syncTask.configureWith(params)
|
||||||
.callbackOn(TaskThread.CALLER)
|
.callbackOn(TaskThread.CALLER)
|
||||||
.executeOn(TaskThread.CALLER)
|
.executeOn(TaskThread.CALLER)
|
||||||
.dispatchTo(object : MatrixCallback<SyncResponse> {
|
.dispatchTo(object : MatrixCallback<Unit> {
|
||||||
override fun onSuccess(data: SyncResponse) {
|
override fun onSuccess(data: Unit) {
|
||||||
cancelableTask = null
|
cancelableTask = null
|
||||||
nextBatch = data.nextBatch
|
|
||||||
syncTokenStore.saveToken(nextBatch)
|
|
||||||
if (!once) {
|
if (!once) {
|
||||||
timer.schedule(object : TimerTask() {
|
timer.schedule(object : TimerTask() {
|
||||||
override fun run() {
|
override fun run() {
|
||||||
doSync()
|
doSync()
|
||||||
}
|
}
|
||||||
}, nextBatchDelay)
|
}, NEXT_BATCH_DELAY)
|
||||||
} else {
|
} else {
|
||||||
//stop
|
//stop
|
||||||
stopMe()
|
stopMe()
|
||||||
@ -180,6 +165,10 @@ open class SyncService : Service() {
|
|||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
const val EXTRA_USER_ID = "EXTRA_USER_ID"
|
const val EXTRA_USER_ID = "EXTRA_USER_ID"
|
||||||
|
|
||||||
|
const val TIME_OUT = 0L
|
||||||
|
const val NEXT_BATCH_DELAY = 60_000L
|
||||||
|
const val NO_NETWORK_DELAY = 5_000L
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -25,10 +25,7 @@ import im.vector.matrix.android.api.failure.MatrixError
|
|||||||
import im.vector.matrix.android.api.session.sync.SyncState
|
import im.vector.matrix.android.api.session.sync.SyncState
|
||||||
import im.vector.matrix.android.api.util.Cancelable
|
import im.vector.matrix.android.api.util.Cancelable
|
||||||
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
|
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
|
||||||
import im.vector.matrix.android.internal.session.SessionScope
|
|
||||||
import im.vector.matrix.android.internal.session.sync.SyncTask
|
import im.vector.matrix.android.internal.session.sync.SyncTask
|
||||||
import im.vector.matrix.android.internal.session.sync.SyncTokenStore
|
|
||||||
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
|
|
||||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||||
import im.vector.matrix.android.internal.task.TaskThread
|
import im.vector.matrix.android.internal.task.TaskThread
|
||||||
import im.vector.matrix.android.internal.task.configureWith
|
import im.vector.matrix.android.internal.task.configureWith
|
||||||
@ -44,7 +41,6 @@ private const val DEFAULT_LONG_POOL_DELAY = 0L
|
|||||||
|
|
||||||
internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
||||||
private val networkConnectivityChecker: NetworkConnectivityChecker,
|
private val networkConnectivityChecker: NetworkConnectivityChecker,
|
||||||
private val syncTokenStore: SyncTokenStore,
|
|
||||||
private val backgroundDetectionObserver: BackgroundDetectionObserver,
|
private val backgroundDetectionObserver: BackgroundDetectionObserver,
|
||||||
private val taskExecutor: TaskExecutor
|
private val taskExecutor: TaskExecutor
|
||||||
) : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener {
|
) : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener {
|
||||||
@ -52,7 +48,6 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||||||
private var state: SyncState = SyncState.IDLE
|
private var state: SyncState = SyncState.IDLE
|
||||||
private var liveState = MutableLiveData<SyncState>()
|
private var liveState = MutableLiveData<SyncState>()
|
||||||
private val lock = Object()
|
private val lock = Object()
|
||||||
private var nextBatch = syncTokenStore.getLastToken()
|
|
||||||
private var cancelableTask: Cancelable? = null
|
private var cancelableTask: Cancelable? = null
|
||||||
|
|
||||||
init {
|
init {
|
||||||
@ -62,8 +57,6 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||||||
fun restart() = synchronized(lock) {
|
fun restart() = synchronized(lock) {
|
||||||
if (state is SyncState.PAUSED) {
|
if (state is SyncState.PAUSED) {
|
||||||
Timber.v("Resume sync...")
|
Timber.v("Resume sync...")
|
||||||
// Retrieve the last token, it may have been deleted in case of a clear cache
|
|
||||||
nextBatch = syncTokenStore.getLastToken()
|
|
||||||
updateStateTo(SyncState.RUNNING(catchingUp = true))
|
updateStateTo(SyncState.RUNNING(catchingUp = true))
|
||||||
lock.notify()
|
lock.notify()
|
||||||
}
|
}
|
||||||
@ -100,16 +93,14 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||||||
lock.wait()
|
lock.wait()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Timber.v("Execute sync request with token $nextBatch and timeout $DEFAULT_LONG_POOL_TIMEOUT")
|
Timber.v("Execute sync request with timeout $DEFAULT_LONG_POOL_TIMEOUT")
|
||||||
val latch = CountDownLatch(1)
|
val latch = CountDownLatch(1)
|
||||||
val params = SyncTask.Params(nextBatch, DEFAULT_LONG_POOL_TIMEOUT)
|
val params = SyncTask.Params(DEFAULT_LONG_POOL_TIMEOUT)
|
||||||
cancelableTask = syncTask.configureWith(params)
|
cancelableTask = syncTask.configureWith(params)
|
||||||
.callbackOn(TaskThread.CALLER)
|
.callbackOn(TaskThread.CALLER)
|
||||||
.executeOn(TaskThread.CALLER)
|
.executeOn(TaskThread.CALLER)
|
||||||
.dispatchTo(object : MatrixCallback<SyncResponse> {
|
.dispatchTo(object : MatrixCallback<Unit> {
|
||||||
override fun onSuccess(data: SyncResponse) {
|
override fun onSuccess(data: Unit) {
|
||||||
nextBatch = data.nextBatch
|
|
||||||
syncTokenStore.saveToken(nextBatch)
|
|
||||||
latch.countDown()
|
latch.countDown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,18 +18,16 @@ package im.vector.matrix.android.internal.session.sync.job
|
|||||||
import android.content.Context
|
import android.content.Context
|
||||||
import androidx.work.*
|
import androidx.work.*
|
||||||
import com.squareup.moshi.JsonClass
|
import com.squareup.moshi.JsonClass
|
||||||
import im.vector.matrix.android.api.failure.Failure
|
import im.vector.matrix.android.api.MatrixCallback
|
||||||
import im.vector.matrix.android.api.failure.MatrixError
|
import im.vector.matrix.android.api.util.Cancelable
|
||||||
import im.vector.matrix.android.internal.auth.SessionParamsStore
|
import im.vector.matrix.android.internal.session.sync.SyncTask
|
||||||
import im.vector.matrix.android.internal.network.executeRequest
|
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||||
import im.vector.matrix.android.internal.session.filter.FilterRepository
|
import im.vector.matrix.android.internal.task.TaskThread
|
||||||
import im.vector.matrix.android.internal.session.sync.SyncAPI
|
import im.vector.matrix.android.internal.task.configureWith
|
||||||
import im.vector.matrix.android.internal.session.sync.SyncResponseHandler
|
|
||||||
import im.vector.matrix.android.internal.session.sync.SyncTokenStore
|
|
||||||
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
|
|
||||||
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
|
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
|
||||||
import im.vector.matrix.android.internal.worker.getSessionComponent
|
import im.vector.matrix.android.internal.worker.getSessionComponent
|
||||||
import timber.log.Timber
|
import timber.log.Timber
|
||||||
|
import java.util.concurrent.CountDownLatch
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
|
||||||
@ -47,12 +45,10 @@ internal class SyncWorker(context: Context,
|
|||||||
val automaticallyRetry: Boolean = false
|
val automaticallyRetry: Boolean = false
|
||||||
)
|
)
|
||||||
|
|
||||||
@Inject lateinit var syncAPI: SyncAPI
|
@Inject
|
||||||
@Inject lateinit var filterRepository: FilterRepository
|
lateinit var syncTask: SyncTask
|
||||||
@Inject lateinit var syncResponseHandler: SyncResponseHandler
|
@Inject
|
||||||
@Inject lateinit var sessionParamsStore: SessionParamsStore
|
lateinit var taskExecutor: TaskExecutor
|
||||||
@Inject lateinit var syncTokenStore: SyncTokenStore
|
|
||||||
|
|
||||||
|
|
||||||
override suspend fun doWork(): Result {
|
override suspend fun doWork(): Result {
|
||||||
Timber.i("Sync work starting")
|
Timber.i("Sync work starting")
|
||||||
@ -60,37 +56,34 @@ internal class SyncWorker(context: Context,
|
|||||||
val sessionComponent = getSessionComponent(params.userId) ?: return Result.success()
|
val sessionComponent = getSessionComponent(params.userId) ?: return Result.success()
|
||||||
sessionComponent.inject(this)
|
sessionComponent.inject(this)
|
||||||
|
|
||||||
val requestParams = HashMap<String, String>()
|
|
||||||
requestParams["timeout"] = params.timeout.toString()
|
|
||||||
requestParams["filter"] = filterRepository.getFilter()
|
|
||||||
val token = syncTokenStore.getLastToken()?.also { requestParams["since"] = it }
|
|
||||||
Timber.i("Sync work last token $token")
|
|
||||||
|
|
||||||
return executeRequest<SyncResponse> {
|
val latch = CountDownLatch(1)
|
||||||
apiCall = syncAPI.sync(requestParams)
|
val taskParams = SyncTask.Params(0)
|
||||||
}.fold(
|
cancelableTask = syncTask.configureWith(taskParams)
|
||||||
{
|
.callbackOn(TaskThread.CALLER)
|
||||||
if (it is Failure.ServerError
|
.executeOn(TaskThread.CALLER)
|
||||||
&& it.error.code == MatrixError.UNKNOWN_TOKEN) {
|
.dispatchTo(object : MatrixCallback<Unit> {
|
||||||
sessionParamsStore.delete(params.userId)
|
override fun onSuccess(data: Unit) {
|
||||||
Result.failure()
|
latch.countDown()
|
||||||
} else {
|
|
||||||
Timber.i("Sync work failed $it")
|
|
||||||
Result.retry()
|
|
||||||
}
|
}
|
||||||
},
|
|
||||||
{
|
override fun onFailure(failure: Throwable) {
|
||||||
Timber.i("Sync work success next batch ${it.nextBatch}")
|
Timber.e(failure)
|
||||||
if (!isStopped) {
|
latch.countDown()
|
||||||
syncResponseHandler.handleResponse(it, token, false)
|
|
||||||
syncTokenStore.saveToken(it.nextBatch)
|
|
||||||
}
|
}
|
||||||
if (params.automaticallyRetry) Result.retry() else Result.success()
|
|
||||||
}
|
})
|
||||||
)
|
.executeBy(taskExecutor)
|
||||||
|
|
||||||
|
latch.await()
|
||||||
|
return Result.success()
|
||||||
}
|
}
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
|
|
||||||
|
|
||||||
|
private var cancelableTask: Cancelable? = null
|
||||||
|
|
||||||
fun requireBackgroundSync(context: Context, userId: String, serverTimeout: Long = 0) {
|
fun requireBackgroundSync(context: Context, userId: String, serverTimeout: Long = 0) {
|
||||||
val data = WorkerParamsFactory.toData(Params(userId, serverTimeout, false))
|
val data = WorkerParamsFactory.toData(Params(userId, serverTimeout, false))
|
||||||
val workRequest = OneTimeWorkRequestBuilder<SyncWorker>()
|
val workRequest = OneTimeWorkRequestBuilder<SyncWorker>()
|
||||||
@ -116,6 +109,7 @@ internal class SyncWorker(context: Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun stopAnyBackgroundSync(context: Context) {
|
fun stopAnyBackgroundSync(context: Context) {
|
||||||
|
cancelableTask?.cancel()
|
||||||
WorkManager.getInstance(context).cancelUniqueWork("BG_SYNCP")
|
WorkManager.getInstance(context).cancelUniqueWork("BG_SYNCP")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user