Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4612.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Workaround to fetch all the pending toDevice events from a Synapse homeserver
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.api.logger.LoggerTag
Expand Down Expand Up @@ -71,6 +72,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
private var isStarted = false
private var isTokenValid = true
private var retryNoNetworkTask: TimerTask? = null
private var previousSyncResponseHasToDevice = false

private val activeCallListObserver = Observer<MutableList<MxCall>> { activeCalls ->
if (activeCalls.isEmpty() && backgroundDetectionObserver.isInBackground) {
Expand Down Expand Up @@ -172,11 +174,16 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
updateStateTo(SyncState.Running(afterPause = true))
}
// No timeout after a pause
val timeout = state.let { if (it is SyncState.Running && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT }
val timeout = when {
previousSyncResponseHasToDevice -> 0L
.also { Timber.tag(loggerTag.value).d("Force timeout to 0") }
state.let { it is SyncState.Running && it.afterPause } -> 0L
else -> DEFAULT_LONG_POOL_TIMEOUT
}
Timber.tag(loggerTag.value).d("Execute sync request with timeout $timeout")
val params = SyncTask.Params(timeout, SyncPresence.Online)
val sync = syncScope.launch {
doSync(params)
previousSyncResponseHasToDevice = doSync(params)
}
runBlocking {
sync.join()
Expand All @@ -203,10 +210,14 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
}
}

private suspend fun doSync(params: SyncTask.Params) {
try {
/**
* Will return true if the sync response contains some toDevice events.
*/
private suspend fun doSync(params: SyncTask.Params): Boolean {
return try {
val syncResponse = syncTask.execute(params)
_syncFlow.emit(syncResponse)
syncResponse.toDevice?.events?.isNotEmpty().orFalse()
} catch (failure: Throwable) {
if (failure is Failure.NetworkConnection) {
canReachServer = false
Expand All @@ -229,6 +240,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
delay(RETRY_WAIT_TIME_MS)
}
}
false
} finally {
state.let {
if (it is SyncState.Running && it.afterPause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import androidx.work.BackoffPolicy
import androidx.work.ExistingWorkPolicy
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.network.NetworkConnectivityChecker
Expand Down Expand Up @@ -51,6 +52,7 @@ internal class SyncWorker(context: Context,
val timeout: Long = DEFAULT_LONG_POOL_TIMEOUT,
val delay: Long = DEFAULT_DELAY_TIMEOUT,
val periodic: Boolean = false,
val forceImmediate: Boolean = false,
override val lastFailureMessage: String? = null
) : SessionWorkerParams

Expand All @@ -67,13 +69,16 @@ internal class SyncWorker(context: Context,
Timber.i("Sync work starting")

return runCatching {
doSync(params.timeout)
doSync(if (params.forceImmediate) 0 else params.timeout)
}.fold(
{
{ hasToDeviceEvents ->
Result.success().also {
if (params.periodic) {
// we want to schedule another one after delay
automaticallyBackgroundSync(workManagerProvider, params.sessionId, params.timeout, params.delay)
// we want to schedule another one after a delay, or immediately if hasToDeviceEvents
automaticallyBackgroundSync(workManagerProvider, params.sessionId, params.timeout, params.delay, forceImmediate = hasToDeviceEvents)
} else if (hasToDeviceEvents) {
// Previous response has toDevice events, request an immediate sync request
requireBackgroundSync(workManagerProvider, params.sessionId, 0)
Copy link
Member Author

@bmarty bmarty Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a workaround actually, and we should always do that, even it the bug is fixed on Synapse, WDYT @BillCarsonFr ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes (workaround is to set timeout to 0, which is probably default here ?)

We should check if we try to redecrypt existing notifications after the catchup sync?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks.

We should check if we try to redecrypt existing notifications after the catchup sync?

To be honest I did not do any test, I'm just coding it. Not sure how I can reproduce the test conditions

}
}
},
Expand All @@ -94,9 +99,13 @@ internal class SyncWorker(context: Context,
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
}

private suspend fun doSync(timeout: Long) {
/**
* Will return true if the sync response contains some toDevice events.
*/
private suspend fun doSync(timeout: Long): Boolean {
val taskParams = SyncTask.Params(timeout * 1000, SyncPresence.Offline)
syncTask.execute(taskParams)
val syncResponse = syncTask.execute(taskParams)
return syncResponse.toDevice?.events?.isNotEmpty().orFalse()
}

companion object {
Expand All @@ -113,13 +122,17 @@ internal class SyncWorker(context: Context,
.enqueueUniqueWork(BG_SYNC_WORK_NAME, ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest)
}

fun automaticallyBackgroundSync(workManagerProvider: WorkManagerProvider, sessionId: String, serverTimeout: Long = 0, delayInSeconds: Long = 30) {
val data = WorkerParamsFactory.toData(Params(sessionId, serverTimeout, delayInSeconds, true))
fun automaticallyBackgroundSync(workManagerProvider: WorkManagerProvider,
sessionId: String,
serverTimeout: Long = 0,
delayInSeconds: Long = 30,
forceImmediate: Boolean = false) {
val data = WorkerParamsFactory.toData(Params(sessionId, serverTimeout, delayInSeconds, forceImmediate))
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SyncWorker>()
.setConstraints(WorkManagerProvider.workConstraints)
.setInputData(data)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
.setInitialDelay(delayInSeconds, TimeUnit.SECONDS)
.setInitialDelay(if (forceImmediate) 0 else delayInSeconds, TimeUnit.SECONDS)
.build()
// Avoid risking multiple chains of syncs by replacing the existing chain
workManagerProvider.workManager
Expand Down