diff --git a/tasks/base.py b/tasks/base.py index ba59a891f..aac792baa 100644 --- a/tasks/base.py +++ b/tasks/base.py @@ -58,11 +58,15 @@ def apply_async(self, args=None, kwargs=None, **options): "soft_time_limit": extra_config.get("soft_timelimit", None), } options = {**options, **celery_compatible_config} + + opt_headers = options.pop("headers", {}) + opt_headers = opt_headers if opt_headers is not None else {} + # Pass current time in task headers so we can emit a metric of # how long the task was in the queue for current_time = datetime.now() headers = { - **options.pop("headers", {}), + **opt_headers, "created_timestamp": current_time.isoformat(), } return super().apply_async(args=args, kwargs=kwargs, headers=headers, **options) diff --git a/tasks/upload.py b/tasks/upload.py index 633ef1591..2787524c1 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -46,6 +46,16 @@ CHUNK_SIZE = 3 +def _prepare_kwargs_for_retry(repoid, commitid, report_code, kwargs): + kwargs.update( + { + "repoid": repoid, + "commitid": commitid, + "report_code": report_code, + } + ) + + class UploadTask(BaseCodecovTask): """The first of a series of tasks designed to process an `upload` made by the user @@ -166,7 +176,8 @@ async def run_async( ), ), ) - self.retry(countdown=60, args=args, kwargs=kwargs) + _prepare_kwargs_for_retry(repoid, commitid, report_code, kwargs) + self.retry(countdown=60, kwargs=kwargs) try: with redis_connection.lock( lock_name, @@ -216,9 +227,8 @@ async def run_async( commit=commitid, repoid=repoid, countdown=int(retry_countdown) ), ) - self.retry( - max_retries=3, countdown=retry_countdown, args=args, kwargs=kwargs - ) + _prepare_kwargs_for_retry(repoid, commitid, report_code, kwargs) + self.retry(max_retries=3, countdown=retry_countdown, kwargs=kwargs) async def run_async_within_lock( self, @@ -265,7 +275,8 @@ async def run_async_within_lock( repoid=repoid, commit=commitid, countdown=retry_countdown ), ) - self.retry(countdown=retry_countdown, args=args, kwargs=kwargs) + _prepare_kwargs_for_retry(repoid, commitid, report_code, kwargs) + self.retry(countdown=retry_countdown, kwargs=kwargs) try: checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs) @@ -343,7 +354,8 @@ async def run_async_within_lock( "Commit not yet ready to build its initial report. Retrying in 60s.", extra=dict(repoid=commit.repoid, commit=commit.commitid), ) - self.retry(countdown=60, args=args, kwargs=kwargs) + _prepare_kwargs_for_retry(repoid, commitid, report_code, kwargs) + self.retry(countdown=60, kwargs=kwargs) argument_list = [] for arguments in self.lists_of_arguments(redis_connection, repoid, commitid): normalized_arguments = self.normalize_upload_arguments(