Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4612.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Workaround to fetch all the pending toDevice events from a Synapse homeserver
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,17 @@ internal class DefaultCryptoService @Inject constructor(
val currentCount = syncResponse.deviceOneTimeKeysCount.signedCurve25519 ?: 0
oneTimeKeysUploader.updateOneTimeKeyCount(currentCount)
}
if (isStarted()) {
// There is a limit of to_device events returned per sync.
// If we are in a case of such limited to_device sync we can't try to generate/upload
// new otk now, because there might be some pending olm pre-key to_device messages that would fail if we rotate
// the old otk too early. In this case we want to wait for the pending to_device before doing anything
// As per spec:
// If there is a large queue of send-to-device messages, the server should limit the number sent in each /sync response.
// 100 messages is recommended as a reasonable limit.
// The limit is not part of the spec, so it's probably safer to handle that when there are no more to_device ( so we are sure
// that there are no pending to_device
val toDevices = syncResponse.toDevice?.events.orEmpty()
if (isStarted() && toDevices.isEmpty()) {
// Make sure we process to-device messages before generating new one-time-keys #2782
deviceListManager.refreshOutdatedDeviceLists()
oneTimeKeysUploader.maybeUploadOneTimeKeys()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.api.logger.LoggerTag
Expand Down Expand Up @@ -71,6 +72,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
private var isStarted = false
private var isTokenValid = true
private var retryNoNetworkTask: TimerTask? = null
private var previousSyncResponseHasToDevice = false

private val activeCallListObserver = Observer<MutableList<MxCall>> { activeCalls ->
if (activeCalls.isEmpty() && backgroundDetectionObserver.isInBackground) {
Expand Down Expand Up @@ -171,12 +173,15 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
if (state !is SyncState.Running) {
updateStateTo(SyncState.Running(afterPause = true))
}
// No timeout after a pause
val timeout = state.let { if (it is SyncState.Running && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT }
val timeout = when {
previousSyncResponseHasToDevice -> 0L /* Force timeout to 0 */
state.let { it is SyncState.Running && it.afterPause } -> 0L /* No timeout after a pause */
else -> DEFAULT_LONG_POOL_TIMEOUT
}
Timber.tag(loggerTag.value).d("Execute sync request with timeout $timeout")
val params = SyncTask.Params(timeout, SyncPresence.Online)
val sync = syncScope.launch {
doSync(params)
previousSyncResponseHasToDevice = doSync(params)
}
runBlocking {
sync.join()
Expand All @@ -203,10 +208,14 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
}
}

private suspend fun doSync(params: SyncTask.Params) {
try {
/**
* Will return true if the sync response contains some toDevice events.
*/
private suspend fun doSync(params: SyncTask.Params): Boolean {
return try {
val syncResponse = syncTask.execute(params)
_syncFlow.emit(syncResponse)
syncResponse.toDevice?.events?.isNotEmpty().orFalse()
} catch (failure: Throwable) {
if (failure is Failure.NetworkConnection) {
canReachServer = false
Expand All @@ -229,6 +238,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
delay(RETRY_WAIT_TIME_MS)
}
}
false
} finally {
state.let {
if (it is SyncState.Running && it.afterPause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import androidx.work.BackoffPolicy
import androidx.work.ExistingWorkPolicy
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.network.NetworkConnectivityChecker
Expand All @@ -34,8 +35,8 @@ import timber.log.Timber
import java.util.concurrent.TimeUnit
import javax.inject.Inject

private const val DEFAULT_LONG_POOL_TIMEOUT = 6L
private const val DEFAULT_DELAY_TIMEOUT = 30_000L
private const val DEFAULT_LONG_POOL_TIMEOUT_SECONDS = 6L
private const val DEFAULT_DELAY_MILLIS = 30_000L

/**
* Possible previous worker: None
Expand All @@ -48,9 +49,12 @@ internal class SyncWorker(context: Context,
@JsonClass(generateAdapter = true)
internal data class Params(
override val sessionId: String,
val timeout: Long = DEFAULT_LONG_POOL_TIMEOUT,
val delay: Long = DEFAULT_DELAY_TIMEOUT,
// In seconds
val timeout: Long = DEFAULT_LONG_POOL_TIMEOUT_SECONDS,
// In milliseconds
val delay: Long = DEFAULT_DELAY_MILLIS,
val periodic: Boolean = false,
val forceImmediate: Boolean = false,
override val lastFailureMessage: String? = null
) : SessionWorkerParams

Expand All @@ -67,13 +71,26 @@ internal class SyncWorker(context: Context,
Timber.i("Sync work starting")

return runCatching {
doSync(params.timeout)
doSync(if (params.forceImmediate) 0 else params.timeout)
}.fold(
{
{ hasToDeviceEvents ->
Result.success().also {
if (params.periodic) {
// we want to schedule another one after delay
automaticallyBackgroundSync(workManagerProvider, params.sessionId, params.timeout, params.delay)
// we want to schedule another one after a delay, or immediately if hasToDeviceEvents
automaticallyBackgroundSync(
workManagerProvider = workManagerProvider,
sessionId = params.sessionId,
serverTimeoutInSeconds = params.timeout,
delayInSeconds = params.delay,
forceImmediate = hasToDeviceEvents
)
} else if (hasToDeviceEvents) {
// Previous response has toDevice events, request an immediate sync request
requireBackgroundSync(
workManagerProvider = workManagerProvider,
sessionId = params.sessionId,
serverTimeoutInSeconds = 0
)
}
}
},
Expand All @@ -94,16 +111,29 @@ internal class SyncWorker(context: Context,
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}

private suspend fun doSync(timeout: Long) {
/**
* Will return true if the sync response contains some toDevice events.
*/
private suspend fun doSync(timeout: Long): Boolean {
val taskParams = SyncTask.Params(timeout * 1000, SyncPresence.Offline)
syncTask.execute(taskParams)
val syncResponse = syncTask.execute(taskParams)
return syncResponse.toDevice?.events?.isNotEmpty().orFalse()
}

companion object {
private const val BG_SYNC_WORK_NAME = "BG_SYNCP"

fun requireBackgroundSync(workManagerProvider: WorkManagerProvider, sessionId: String, serverTimeout: Long = 0) {
val data = WorkerParamsFactory.toData(Params(sessionId, serverTimeout, 0L, false))
fun requireBackgroundSync(workManagerProvider: WorkManagerProvider,
sessionId: String,
serverTimeoutInSeconds: Long = 0) {
val data = WorkerParamsFactory.toData(
Params(
sessionId = sessionId,
timeout = serverTimeoutInSeconds,
delay = 0L,
periodic = false
)
)
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SyncWorker>()
.setConstraints(WorkManagerProvider.workConstraints)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
Expand All @@ -113,13 +143,24 @@ internal class SyncWorker(context: Context,
.enqueueUniqueWork(BG_SYNC_WORK_NAME, ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest)
}

fun automaticallyBackgroundSync(workManagerProvider: WorkManagerProvider, sessionId: String, serverTimeout: Long = 0, delayInSeconds: Long = 30) {
val data = WorkerParamsFactory.toData(Params(sessionId, serverTimeout, delayInSeconds, true))
fun automaticallyBackgroundSync(workManagerProvider: WorkManagerProvider,
sessionId: String,
serverTimeoutInSeconds: Long = 0,
delayInSeconds: Long = 30,
forceImmediate: Boolean = false) {
val data = WorkerParamsFactory.toData(
Params(
sessionId = sessionId,
timeout = serverTimeoutInSeconds,
delay = delayInSeconds,
forceImmediate = forceImmediate
)
)
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SyncWorker>()
.setConstraints(WorkManagerProvider.workConstraints)
.setInputData(data)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
.setInitialDelay(delayInSeconds, TimeUnit.SECONDS)
.setInitialDelay(if (forceImmediate) 0 else delayInSeconds, TimeUnit.SECONDS)
.build()
// Avoid risking multiple chains of syncs by replacing the existing chain
workManagerProvider.workManager
Expand Down