From 9e77be794cfe985bff8d078af2e115f40f4bd000 Mon Sep 17 00:00:00 2001 From: Dana Date: Thu, 5 Oct 2023 12:44:13 +0300 Subject: [PATCH 1/2] this is the fisr step in debugging why the upload finisher gets executed multiple times by different worker pods --- tasks/upload_finisher.py | 51 ++++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index a4521996f..72915c035 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -1,8 +1,7 @@ import logging import re -from copy import deepcopy -import sentry_sdk +from redis.exceptions import LockError from shared.celery_config import ( compute_comparison_task_name, notify_task_name, @@ -79,22 +78,40 @@ async def run_async( commit = commits.first() assert commit, "Commit not found in database." redis_connection = get_redis_connection() - with redis_connection.lock(lock_name, timeout=60 * 5, blocking_timeout=5): - commit_yaml = UserYaml(commit_yaml) - db_session.commit() - commit.notified = False - db_session.commit() - result = await self.finish_reports_processing( - db_session, - commit, - commit_yaml, - processing_results, - report_code, - checkpoints, + try: + with redis_connection.lock(lock_name, timeout=60 * 5, blocking_timeout=5): + commit_yaml = UserYaml(commit_yaml) + db_session.commit() + commit.notified = False + db_session.commit() + result = await self.finish_reports_processing( + db_session, + commit, + commit_yaml, + processing_results, + report_code, + checkpoints, + ) + save_commit_measurements(commit) + self.invalidate_caches(redis_connection, commit) + log.info( + "Finished upload_finisher task", + extra=dict( + repoid=repoid, + commit=commitid, + parent_task=self.request.parent_id, + ), + ) + return result + except LockError: + log.warning( + "Unable to acquire lock for key %s. Retrying", + lock_name, + extra=dict( + commit=commitid, + repoid=repoid, + ), ) - save_commit_measurements(commit) - self.invalidate_caches(redis_connection, commit) - return result async def finish_reports_processing( self, From c95df1f25e5f6545132682887241d38a140ab584 Mon Sep 17 00:00:00 2001 From: Dana Date: Mon, 9 Oct 2023 16:17:59 +0300 Subject: [PATCH 2/2] modifying comment --- tasks/upload_finisher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 72915c035..46f26ec64 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -105,7 +105,7 @@ async def run_async( return result except LockError: log.warning( - "Unable to acquire lock for key %s. Retrying", + "Unable to acquire lock for key %s.", lock_name, extra=dict( commit=commitid,