Skip to content

Conversation

@fweilun
Copy link
Contributor

@fweilun fweilun commented Jul 9, 2025

Closes: #52943

Draft PR.

This PR adds a EcsCannotPullContainerError exception to handle scenarios where ECS tasks fail to start due to image pull issues (e.g., CannotPullContainerError).

Test added: ✅ test_should_retry_eni_false_for_pull_failure

@fweilun fweilun requested review from eladkal and o-nikolas as code owners July 9, 2025 09:43
@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Jul 9, 2025
@fweilun fweilun changed the title Fix ecs retry condition Fix ecs/EcsRunTaskOperator retry condition Jul 9, 2025
@jason810496 jason810496 self-requested a review July 9, 2025 09:49
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the PR.

How about adding a new test to check whether does _check_success_task raise the new EcsCannotPullContainerError exception?

Here are some good references to add test case against _check_success_task:

@mock.patch.object(EcsBaseOperator, "client")
@mock.patch("airflow.providers.amazon.aws.utils.task_log_fetcher.AwsTaskLogFetcher")
def test_check_success_tasks_raises_cloudwatch_logs(self, log_fetcher_mock, client_mock):
self.ecs.arn = "arn"
self.ecs.task_log_fetcher = log_fetcher_mock
log_fetcher_mock.get_last_log_messages.return_value = ["1", "2", "3", "4", "5"]
client_mock.describe_tasks.return_value = {
"tasks": [{"containers": [{"name": "foo", "lastStatus": "STOPPED", "exitCode": 1}]}]
}
with pytest.raises(Exception) as ctx:
self.ecs._check_success_task()
assert str(ctx.value) == (
"This task is not in success state - last 10 logs from Cloudwatch:\n1\n2\n3\n4\n5"
)
client_mock.describe_tasks.assert_called_once_with(cluster="c", tasks=["arn"])

Comment on lines 34 to 39
def should_retry(exception: Exception):
"""Check if exception is related to ECS resource quota (CPU, MEM)."""
if isinstance(exception, EcsCannotPullContainerError):
return False

if isinstance(exception, EcsOperatorError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, let the EcsCannotPullContainerError error fail fast instead of retrying should be fine right ?

Based on the Documentation - CannotPullContainer task errors in Amazon ECS, it's more like configuration error from user instead of system instability.

cc @o-nikolas , @eladkal

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there're cases suitable for retry with a reasonable wait time. e.g.,

ERROR: toomanyrequests: Too Many Requests or You have reached your pull rate limit.

@fweilun
Copy link
Contributor Author

fweilun commented Jul 9, 2025

Nice! Thanks for the PR.

How about adding a new test to check whether does _check_success_task raise the new EcsCannotPullContainerError exception?

Here are some good references to add test case against _check_success_task:

@mock.patch.object(EcsBaseOperator, "client")
@mock.patch("airflow.providers.amazon.aws.utils.task_log_fetcher.AwsTaskLogFetcher")
def test_check_success_tasks_raises_cloudwatch_logs(self, log_fetcher_mock, client_mock):
self.ecs.arn = "arn"
self.ecs.task_log_fetcher = log_fetcher_mock
log_fetcher_mock.get_last_log_messages.return_value = ["1", "2", "3", "4", "5"]
client_mock.describe_tasks.return_value = {
"tasks": [{"containers": [{"name": "foo", "lastStatus": "STOPPED", "exitCode": 1}]}]
}
with pytest.raises(Exception) as ctx:
self.ecs._check_success_task()
assert str(ctx.value) == (
"This task is not in success state - last 10 logs from Cloudwatch:\n1\n2\n3\n4\n5"
)
client_mock.describe_tasks.assert_called_once_with(cluster="c", tasks=["arn"])

Done! Test case added.



def should_retry(exception: Exception):
"""Check if exception is related to ECS resource quota (CPU, MEM)."""
Copy link
Contributor

@dominikhei dominikhei Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a tiny nit, but maybe adjust the docstring to incorporate the new behavior?

Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey thanks for this PR. The ECS operator is a very popular operator so it gets a lot of users and so a lot of niche failure mechanisms come up.

Some things I'm confused or wary about with this PR:

  1. Wasn't #52943 about more than just container pull errors? It's about any configuration error that's stopping the container from starting up initially, rather than failing with the runtime (i.e. airflow) code. Why are we focusing on just one case here?
  2. We seem to be leaning towards a complicated control flow with custom exceptions within the ECS operator. I'm not sure things really need to be this complicated.
  3. Why are we updating a retry function that was meant to just be for ENIs with container pull exception handling? Does this retry need to be more generic? Do we need to rethink things?

Copy link
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good IMO, but would like to wait for the existing comments to be resolved

Comment on lines 34 to 39
def should_retry(exception: Exception):
"""Check if exception is related to ECS resource quota (CPU, MEM)."""
if isinstance(exception, EcsCannotPullContainerError):
return False

if isinstance(exception, EcsOperatorError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there're cases suitable for retry with a reasonable wait time. e.g.,

ERROR: toomanyrequests: Too Many Requests or You have reached your pull rate limit.

@Lee-W
Copy link
Member

Lee-W commented Jul 10, 2025

also, CI needs to be fixed

@fweilun
Copy link
Contributor Author

fweilun commented Jul 10, 2025

I wonder whether removing the retry mechanism might make more sense here.
#53083

@jason810496
Copy link
Member

I wonder whether removing the retry mechanism might make more sense here. #53083

+0 for leaving retry to task level retry by Airflow instead of retrying at _check_success_task function level. Both works, no strong option for them.

It seems to be more simple. However, as mentioned in #53083 (review) removing the function level retry mechanism will introduce breaking change.

IMO, if we go for removing function level retry mechanism (#53083) then we might need to close this one.

Or, if we decide to remain the function level retry for compatibility, I think by adding CannotPullContainerError str in should_retry_eni function

def should_retry_eni(exception: Exception):
"""Check if exception is related to ENI (Elastic Network Interfaces)."""
if isinstance(exception, EcsTaskFailToStart):
return any(
eni_reason in exception.message
for eni_reason in ["network interface provisioning", "ResourceInitializationError"]
)

remove the following explicit error handling and remove the EcsCannotPullContainerError

          if "CannotPullContainerError" in task.get("stoppedReason", ""):
                raise EcsCannotPullContainerError(
                    f"The task failed to start due to: {task.get('stoppedReason', '')}"
                )

might be more simple and more generic.

Then we can go through the possible error of stoppedReason and add the missing retryable error ( CannotPullContainerError in this case ) to ["network interface provisioning", "ResourceInitializationError"] list to make it more robust ( maybe make this list as an enum or global constant ).

@o-nikolas
Copy link
Contributor

I wonder whether removing the retry mechanism might make more sense here. #53083

+0 for leaving retry to task level retry by Airflow instead of retrying at _check_success_task function level. Both works, no strong option for them.

It seems to be more simple. However, as mentioned in #53083 (review) removing the function level retry mechanism will introduce breaking change.

IMO, if we go for removing function level retry mechanism (#53083) then we might need to close this one.

Or, if we decide to remain the function level retry for compatibility, I think by adding CannotPullContainerError str in should_retry_eni function

def should_retry_eni(exception: Exception):
"""Check if exception is related to ENI (Elastic Network Interfaces)."""
if isinstance(exception, EcsTaskFailToStart):
return any(
eni_reason in exception.message
for eni_reason in ["network interface provisioning", "ResourceInitializationError"]
)

remove the following explicit error handling and remove the EcsCannotPullContainerError

          if "CannotPullContainerError" in task.get("stoppedReason", ""):
                raise EcsCannotPullContainerError(
                    f"The task failed to start due to: {task.get('stoppedReason', '')}"
                )

might be more simple and more generic.

Then we can go through the possible error of stoppedReason and add the missing retryable error ( CannotPullContainerError in this case ) to ["network interface provisioning", "ResourceInitializationError"] list to make it more robust ( maybe make this list as an enum or global constant ).

Just to +1 again, full reply here I think we should make every effort to simplify this one. If the mechanism is truly broken then we can just mark it as a bug fix and then we don't need to worry about breaking changes. It's just determining if it's broken in all/most cases which needs confirming by perhaps @vladimirbinshtok (the original requester). Once we have that we can go with this PR.

@jason810496
Copy link
Member

jason810496 commented Jul 14, 2025

Sorry @fweilun, we have to close this one as #53083 will resolve the issue.
Big thanks for your help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Airflow task is marked as succeeded on the EcsTaskFailToStart exception

5 participants