Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4612.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Workaround to fetch all the pending toDevice events from a Synapse homeserver
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.api.logger.LoggerTag
Expand Down Expand Up @@ -71,6 +72,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
private var isStarted = false
private var isTokenValid = true
private var retryNoNetworkTask: TimerTask? = null
private var previousSyncResponseHasToDevice = false

private val activeCallListObserver = Observer<MutableList<MxCall>> { activeCalls ->
if (activeCalls.isEmpty() && backgroundDetectionObserver.isInBackground) {
Expand Down Expand Up @@ -171,12 +173,15 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
if (state !is SyncState.Running) {
updateStateTo(SyncState.Running(afterPause = true))
}
// No timeout after a pause
val timeout = state.let { if (it is SyncState.Running && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT }
val timeout = when {
previousSyncResponseHasToDevice -> 0L /* Force timeout to 0 */
state.let { it is SyncState.Running && it.afterPause } -> 0L /* No timeout after a pause */
else -> DEFAULT_LONG_POOL_TIMEOUT
}
Timber.tag(loggerTag.value).d("Execute sync request with timeout $timeout")
val params = SyncTask.Params(timeout, SyncPresence.Online)
val sync = syncScope.launch {
doSync(params)
previousSyncResponseHasToDevice = doSync(params)
}
runBlocking {
sync.join()
Expand All @@ -203,10 +208,14 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
}
}

private suspend fun doSync(params: SyncTask.Params) {
try {
/**
* Will return true if the sync response contains some toDevice events.
*/
private suspend fun doSync(params: SyncTask.Params): Boolean {
return try {
val syncResponse = syncTask.execute(params)
_syncFlow.emit(syncResponse)
syncResponse.toDevice?.events?.isNotEmpty().orFalse()
} catch (failure: Throwable) {
if (failure is Failure.NetworkConnection) {
canReachServer = false
Expand All @@ -229,6 +238,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
delay(RETRY_WAIT_TIME_MS)
}
}
false
} finally {
state.let {
if (it is SyncState.Running && it.afterPause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import androidx.work.BackoffPolicy
import androidx.work.ExistingWorkPolicy
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.network.NetworkConnectivityChecker
Expand All @@ -34,8 +35,8 @@ import timber.log.Timber
import java.util.concurrent.TimeUnit
import javax.inject.Inject

private const val DEFAULT_LONG_POOL_TIMEOUT = 6L
private const val DEFAULT_DELAY_TIMEOUT = 30_000L
private const val DEFAULT_LONG_POOL_TIMEOUT_SECONDS = 6L
private const val DEFAULT_DELAY_MILLIS = 30_000L

/**
* Possible previous worker: None
Expand All @@ -48,9 +49,12 @@ internal class SyncWorker(context: Context,
@JsonClass(generateAdapter = true)
internal data class Params(
override val sessionId: String,
val timeout: Long = DEFAULT_LONG_POOL_TIMEOUT,
val delay: Long = DEFAULT_DELAY_TIMEOUT,
// In seconds
val timeout: Long = DEFAULT_LONG_POOL_TIMEOUT_SECONDS,
// In milliseconds
val delay: Long = DEFAULT_DELAY_MILLIS,
val periodic: Boolean = false,
val forceImmediate: Boolean = false,
override val lastFailureMessage: String? = null
) : SessionWorkerParams

Expand All @@ -67,13 +71,26 @@ internal class SyncWorker(context: Context,
Timber.i("Sync work starting")

return runCatching {
doSync(params.timeout)
doSync(if (params.forceImmediate) 0 else params.timeout)
}.fold(
{
{ hasToDeviceEvents ->
Result.success().also {
if (params.periodic) {
// we want to schedule another one after delay
automaticallyBackgroundSync(workManagerProvider, params.sessionId, params.timeout, params.delay)
// we want to schedule another one after a delay, or immediately if hasToDeviceEvents
automaticallyBackgroundSync(
workManagerProvider = workManagerProvider,
sessionId = params.sessionId,
serverTimeoutInSeconds = params.timeout,
delayInSeconds = params.delay,
forceImmediate = hasToDeviceEvents
)
} else if (hasToDeviceEvents) {
// Previous response has toDevice events, request an immediate sync request
requireBackgroundSync(
workManagerProvider = workManagerProvider,
sessionId = params.sessionId,
serverTimeoutInSeconds = 0
)
}
}
},
Expand All @@ -94,16 +111,29 @@ internal class SyncWorker(context: Context,
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}

private suspend fun doSync(timeout: Long) {
/**
* Will return true if the sync response contains some toDevice events.
*/
private suspend fun doSync(timeout: Long): Boolean {
val taskParams = SyncTask.Params(timeout * 1000, SyncPresence.Offline)
syncTask.execute(taskParams)
val syncResponse = syncTask.execute(taskParams)
return syncResponse.toDevice?.events?.isNotEmpty().orFalse()
}

companion object {
private const val BG_SYNC_WORK_NAME = "BG_SYNCP"

fun requireBackgroundSync(workManagerProvider: WorkManagerProvider, sessionId: String, serverTimeout: Long = 0) {
val data = WorkerParamsFactory.toData(Params(sessionId, serverTimeout, 0L, false))
fun requireBackgroundSync(workManagerProvider: WorkManagerProvider,
sessionId: String,
serverTimeoutInSeconds: Long = 0) {
val data = WorkerParamsFactory.toData(
Params(
sessionId = sessionId,
timeout = serverTimeoutInSeconds,
delay = 0L,
periodic = false
)
)
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SyncWorker>()
.setConstraints(WorkManagerProvider.workConstraints)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
Expand All @@ -113,13 +143,24 @@ internal class SyncWorker(context: Context,
.enqueueUniqueWork(BG_SYNC_WORK_NAME, ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest)
}

fun automaticallyBackgroundSync(workManagerProvider: WorkManagerProvider, sessionId: String, serverTimeout: Long = 0, delayInSeconds: Long = 30) {
val data = WorkerParamsFactory.toData(Params(sessionId, serverTimeout, delayInSeconds, true))
fun automaticallyBackgroundSync(workManagerProvider: WorkManagerProvider,
sessionId: String,
serverTimeoutInSeconds: Long = 0,
delayInSeconds: Long = 30,
forceImmediate: Boolean = false) {
val data = WorkerParamsFactory.toData(
Params(
sessionId = sessionId,
timeout = serverTimeoutInSeconds,
delay = delayInSeconds,
forceImmediate = forceImmediate
)
)
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SyncWorker>()
.setConstraints(WorkManagerProvider.workConstraints)
.setInputData(data)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
.setInitialDelay(delayInSeconds, TimeUnit.SECONDS)
.setInitialDelay(if (forceImmediate) 0 else delayInSeconds, TimeUnit.SECONDS)
.build()
// Avoid risking multiple chains of syncs by replacing the existing chain
workManagerProvider.workManager
Expand Down