Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6836.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes uncaught exceptions in the SyncWorker to cause the worker to become stuck in the failure state
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
@Inject lateinit var timelineSendEventWorkCommon: TimelineSendEventWorkCommon
@Inject lateinit var localEchoRepository: LocalEchoRepository

override fun doOnError(params: Params): Result {
override fun doOnError(params: Params, failureMessage: String): Result {
params.localEchoIds.forEach { localEchoIds ->
localEchoRepository.updateSendState(
eventId = localEchoIds.eventId,
Expand All @@ -63,7 +63,7 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
)
}

return super.doOnError(params)
return super.doOnError(params, failureMessage)
}

override fun injectWith(injector: SessionComponent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.matrix.android.sdk.internal.session.sync.SyncTask
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.startChain
import timber.log.Timber
import java.util.concurrent.TimeUnit
import javax.inject.Inject
Expand Down Expand Up @@ -136,6 +137,7 @@ internal class SyncWorker(context: Context, workerParameters: WorkerParameters,
.setConstraints(WorkManagerProvider.workConstraints)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
.setInputData(data)
.startChain(true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is the fix, matches the multiple event worker by ignoring previous results when appending new workers

.build()
workManagerProvider.workManager
.enqueueUniqueWork(BG_SYNC_WORK_NAME, ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ internal abstract class SessionSafeCoroutineWorker<PARAM : SessionWorkerParams>(

// Make sure to inject before handling error as you may need some dependencies to process them.
injectWith(sessionComponent)
if (params.lastFailureMessage != null) {
// Forward error to the next workers
doOnError(params)
} else {
doSafeWork(params)

when (val lastFailureMessage = params.lastFailureMessage) {
null -> doSafeWork(params)
else -> {
// Forward error to the next workers
doOnError(params, lastFailureMessage)
}
}
} catch (throwable: Throwable) {
buildErrorResult(params, throwable.localizedMessage ?: "error")
buildErrorResult(params, "${throwable::class.java.name}: ${throwable.localizedMessage ?: "N/A error message"}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added extra details about the error to help debug in the future

}
}

Expand All @@ -89,10 +91,10 @@ internal abstract class SessionSafeCoroutineWorker<PARAM : SessionWorkerParams>(
* This is called when the input parameters are correct, but contain an error from the previous worker.
*/
@CallSuper
open fun doOnError(params: PARAM): Result {
open fun doOnError(params: PARAM, failureMessage: String): Result {
// Forward the error
return Result.success(inputData)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously we were always using the inputData which contained the lastFailureMessage which mean the next time the work ran the data would be restore with the pre-existing failure, causing the worker to never execute doSafeWork

the fix is to consume the message and use the updated state for the result (so that it can be persisted without the lastFailureMessage)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so we do not forward the error anymore to the next worker, since it has been removed from the params parameter, right? (maybe update the comment above). The original idea to transmit the error was to ensure that the last worker of the chain will handle it, without breaking the whole worker chain forever.

It's still strange to me that the lastFailureMessage get restored when we start a new work.

Copy link
Contributor Author

@ouchadam ouchadam Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also surprised by this, I think it's caused by the lastFailureMessage causing a merge conflict in the input data

// requireBackgroundSync
val data = WorkerParamsFactory.toData(
        Params(
                sessionId = sessionId,
                timeout = serverTimeoutInSeconds,
                delay = 0L,
                periodic = false,
                random = UUID.randomUUID().toString()  // create a unique id for each work request
        )
)

Logging the work request data and the inputData on job execution

// No forced crash - with random

requireBackground sync params: Data {WORKER_PARAMS_JSON : {"random":"7675"}, }
doWork input: Data {WORKER_PARAMS_JSON : {"random":"7675"}, }
requireBackground sync params: Data {WORKER_PARAMS_JSON : {"random":"107a"}, }
doWork input: Data {WORKER_PARAMS_JSON : {"random":"107a"}, }
requireBackground sync params: Data {WORKER_PARAMS_JSON : {"random":"c502"}, }
doWork input: Data {WORKER_PARAMS_JSON : {"random":"c502"}, }
requireBackground sync params: Data {WORKER_PARAMS_JSON : {"random":"a22b"}, }
doWork input: Data {WORKER_PARAMS_JSON : {"random":"a22b"}, }


// Force crash

requireBackground sync params: Data {WORKER_PARAMS_JSON : {"random":"49b1"}, }
doWork input: Data {WORKER_PARAMS_JSON : {"random":"49b1"}, }
requireBackground sync params: Data {WORKER_PARAMS_JSON : {"random":"fca1"}, }
doWork input: Data {WORKER_PARAMS_JSON : {"random":"49b1","lastFailureMessage":"error"}, }
requireBackground sync params: Data {WORKER_PARAMS_JSON : {"random":"b7e6"}, }
doWork input: Data {WORKER_PARAMS_JSON : {"random":"49b1","lastFailureMessage":"error"}, }


// Force crash removed

requireBackground sync params: Data {WORKER_PARAMS_JSON : {"random":"97c9"}, }
doWork input: Data {WORKER_PARAMS_JSON : {"random":"49b1","lastFailureMessage":"error"}, }
requireBackground sync params: Data {WORKER_PARAMS_JSON : {"random":"3e36"}, }
doWork input: Data {WORKER_PARAMS_JSON : {"random":"49b1","lastFailureMessage":"error"}, }

notice that as soon as the lastFailureMessage is modified the request data becomes ignored, which I assume is caused by the unspecified ordering of the default OverwritingInputMerger https://developer.android.com/reference/androidx/work/InputMerger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after a bit more investigation, the default behaviour is -

  • CoroutineWorker/ListenableWorker do not call stop when doWork completes which means the worker often already "exists", causing ExistingWorkPolicy.APPEND_OR_REPLACE to use the append flow
  • When appending... Result payloads merge with the input payloads (eg Result.success(foobar)), in our case it means overwriting the json key and replacing the original request payload with the worker result
  • Successful SyncWorker provide no payload Result.success(), hence why they receive the new request payloads

another solution is to set an InputMerger which always favours the latest input specifically for the SyncWorker

class Merger : InputMerger() {
    override fun merge(inputs: MutableList<Data>): Data {
        return inputs.first()
    }
}

I'm not aware of the historical details around the other workers but as they're based on CoroutineWorker/ListenableWorker they may also suffer from the same issue of becoming stuck in the failure state if they use append policies

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another solution is to set an InputMerger which always favours the latest input specifically for the SyncWorker

This should be far less confusing IMHO. The API of the work manager is so strange...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes if there are chained workers we need to cancel using this mechanism, otherwise workers will be stuck if we return Failed state. This worker API is so bad...

Copy link
Member

@ganfra ganfra Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will break some flow if we consume the failure, as we always return Success, if there is a chain of workers, the next one will be running when it shouldn't...
We should probably let the Worker implementation decide if he wants to consume the error or not, instead of doing that in SessionSafeCoroutineWorker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in those cases, should we be using Result.Failure to cancel the chain? My understanding is that APPEND_OR_REPLACE would allow the chain to be recreated the next time a request comes in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rework the PR to make use of the chain extension (NoMerge), we can think about the worker flows as a separate ticket (as other workers may be getting stuck)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated 1fd1a4e

.also { Timber.e("Work cancelled due to input error from parent") }
.also { Timber.e("Work cancelled due to input error from parent: $failureMessage") }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

includes the failure message in the error log

}

companion object {
Expand Down