Skip to content

Flows Runs in AWS ECS Push Work Pool are stuck in LATE. Very very slowly trickling to PENDING. #18877

@monda-nicktorba

Description

@monda-nicktorba

Bug summary

We use 1 AWS ECS Push Work Pool for each of our customers to submit runs. At small volumes, they all seem to work great.

However, one of our customers submits hundreds of jobs to run each night. The concurrency limit on this work pool is always at least 20 and we do not use any concurrency limits on the deployments or queues that would block this. Most mornings, we see that a majority of the runs are 3+ hours late. Upon monitoring, the issue is that there are very long stretches all runs are sitting in LATE state.

They do not update to pending. I do not think there is an ECS issue here - we have seen infrastructure related errors, like hitting the TaskDefinition registration rate limit, but this is not that. They never update to pending - It seems like Prefect Cloud does not submit the runs to AWS ECS, despite there being no runs active in the work pool.

I think this may be related to #18429 (comment), but after posting in the community slack, @zzstoatzz said it'd be best to open up an issue here.

Below, I am putting the config of the work pool and the deployment. Please let me know what other info is helpful here. I redacted the name of the work pool since it is a customer name, but I can send it via slack dm so the team can check out the logs for all the late runs over the last week.

Deployment Config

❯ prefect deployment inspect data-product-refresh/product_refresh_deployment-deployment_name-on-demand-ecs


19:02:22.087 | DEBUG   | prefect.profiles - Using profile 'default'
19:02:22.585 | DEBUG   | prefect.client - Connecting to API at https://api.prefect.cloud/api/accounts/accountid/workspaces/workspaceid/
{
    'id': '3e7b2937-483e-4b46-adfc-70d902f482fc',
    'created': '2025-08-29T17:32:30.447546Z',
    'updated': '2025-09-02T20:10:49.820974Z',
    'name': 'product_refresh_deployment-deployment_name-on-demand-ecs',
    'version': None,
    'version_id': '068b74f4-9e54-7a62-8000-010c5c76a89d',
    'version_info': {'type': 'prefect:simple', 'version': ''},
    'branch': None,
    'base': None,
    'root': None,
    'description': None,
    'flow_id': 'f638faad-8701-406e-bf6e-f16aa73901de',
    'concurrency_limit': None,
    'global_concurrency_limit': None,
    'concurrency_options': None,
    'paused': False,
    'schedules': [],
    'job_variables': {'tenant': 'deployment_name', 'deployment_name': 'product_refresh_deployment-deployment_name-on-demand-ecs'},
    'parameters': {},
    'pull_steps': [
        {
            'prefect.deployments.steps.git_clone': {
                'branch': 'main',
                'repository': 'https://github.com/gitproject.git\t',
                'credentials': '{{ prefect.blocks.github-credentials.production-github-credential }}',
                'include_submodules': False
            }
        }
    ],
    'tags': [],
    'labels': {'prefect.flow.id': 'f638faad-8701-406e-bf6e-f16aa73901de'},
    'work_queue_name': 'on-demand',
    'last_polled': None,
    'parameter_openapi_schema': {},
    'path': None,
    'entrypoint': 'amplify_prefect/flows/pflow_product_refresh.py:data_product_refresh',
    'storage_document_id': '3c790c60-497c-4f46-8f3d-af6739052410',
    'infrastructure_document_id': None,
    'created_by': {'id': 'id', 'type': 'USER', 'display_value': 'nick'},
    'updated_by': {'id': 'id', 'type': 'USER', 'display_value': 'nick'},
    'work_queue_id': None,
    'enforce_parameter_schema': False,
    'work_pool_name': 'deployment_name-ecs',
    'status': 'READY',
    'automations': []
}

Work Pool Config

❯ prefect work-pool inspect "workpool_name-ecs"
19:01:26.574 | DEBUG   | prefect.profiles - Using profile 'default'
19:01:27.054 | DEBUG   | prefect.client - Connecting to API at https://api.prefect.cloud/api/accounts/accountid/workspaces/workspaceid/
WorkPool(
    id='id',
    created=DateTime(2025, 8, 29, 17, 32, 29, 953615, tzinfo=Timezone('UTC')),
    updated=DateTime(2025, 9, 7, 7, 33, 30, 438582, tzinfo=Timezone('UTC')),
    name='workpool_name-ecs',
    type='ecs:push',
    base_job_template={
        'variables': {
            'type': 'object',
            'properties': {
                'cpu': {
                    'type': 'integer',
                    'title': 'CPU',
                    'description': 'The amount of CPU to provide to the ECS task. Valid amounts are specified in the AWS documentation. If not provided, a default value of 1024 will be used 
unless present on the task definition.'
                },
                'env': {
                    'type': 'object',
                    'title': 'Environment Variables',
                    'default': {'ENVIRONMENT': 'production'},
                    'description': 'Environment variables to provide to the task run. These variables are set on the Prefect container at task runtime. These will not be set on the task 
definition.',
                    'additionalProperties': {'type': 'string'}
                },
                'name': {'type': 'string', 'title': 'Name', 'description': 'Name given to infrastructure created by a worker.'},
                'image': {
                    'type': 'string',
                    'title': 'Image',
                    'default': '143690486798.dkr.ecr.us-east-1.amazonaws.com/amplifydata-production-prefect-ecs:prod-3ee85cdb66a014df5c9acad85c014831a76cd9e4-b348',
                    'description': 'The image to use for the Prefect container in the task. If this value is not null, it will override the value in the task definition. This value defaults to a
Prefect base image matching your local versions.'
                },
                'family': {
                    'type': 'string',
                    'title': 'Family',
                    'default': 'production_workpool_name-ecs',
                    'description': 'A family for the task definition. If not provided, it will be inferred from the task definition. If the task definition does not have a family, the name will 
be generated. When flow and deployment metadata is available, the generated name will include their names. Values for this field will be slugified to match AWS character requirements.'
                },
                'labels': {'type': 'object', 'title': 'Labels', 'description': 'Labels applied to infrastructure created by a worker.', 'additionalProperties': {'type': 'string'}},
                'memory': {
                    'type': 'integer',
                    'title': 'Memory',
                    'description': 'The amount of memory to provide to the ECS task. Valid amounts are specified in the AWS documentation. If not provided, a default value of 2048 will be used 
unless present on the task definition.'
                },
                'vpc_id': {
                    'type': 'string',
                    'title': 'VPC ID',
                    'description': "The AWS VPC to link the task run to. This is only applicable when using the 'awsvpc' network mode for your task. FARGATE tasks require this network  mode, but
for EC2 tasks the default network mode is 'bridge'. If using the 'awsvpc' network mode and this field is null, your default VPC will be used. If no default VPC can be found, the task run will 
fail."
                },
                'cluster': {
                    'type': 'string',
                    'title': 'Cluster',
                    'default': 'arn:aws:ecs:us-east-1:id:cluster/production-workpool_name-prefect-worker-cluster',
                    'description': 'The ECS cluster to run the task in. An ARN or name may be provided. If not provided, the default cluster will be used.'
                },
                'command': {
                    'type': 'string',
                    'title': 'Command',
                    'description': 'The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker.'
                },
                'launch_type': {
                    'enum': ['FARGATE', 'EC2', 'EXTERNAL', 'FARGATE_SPOT'],
                    'type': 'string',
                    'title': 'Launch Type',
                    'default': 'FARGATE',
                    'description': "The type of ECS task run infrastructure that should be used. Note that 'FARGATE_SPOT' is not a formal ECS launch type, but we will configure the proper 
capacity provider strategy if set here."
                },
                'stream_output': {
                    'type': 'boolean',
                    'title': 'Stream Output',
                    'description': 'If enabled, logs will be streamed from the Prefect container to the local console. Unless you have configured AWS CloudWatch logs manually on your task 
definition, this requires the same prerequisites outlined in `configure_cloudwatch_logs`.'
                },
                'task_role_arn': {
                    'type': 'string',
                    'title': 'Task Role ARN',
                    'default': 'arn:aws:iam:::role/production-workpool_name-task-role',
                    'description': 'A role to attach to the task run. This controls the permissions of the task while it is running.'
                },
                'container_name': {
                    'type': 'string',
                    'title': 'Container Name',
                    'description': 'The name of the container flow run orchestration will occur in. If not specified, a default value of prefect will be used and if that is not found in the task
definition the first container will be used.'
                },
                'aws_credentials': {
                    'allOf': [{'$ref': '#/definitions/AwsCredentials'}],
                    'title': 'AWS Credentials',
                    'default': {'$ref': {'block_document_id': '47290b6a-85c9-4a71-820f-ba8b44890131'}},
                    'description': "The AWS credentials to use to connect to ECS. If not provided, credentials will be inferred from the local environment following AWS's boto client's rules."
                },
                'ephemeral_storage': {
                    'type': 'integer',
                    'title': 'Ephemeral Storage (GiB)',
                    'default': 21,
                    'maximum': 200,
                    'minimum': 21,
                    'description': 'Total ephemeral storage for the task (Fargate only). Defaults to 20 GiB; set 21–200 to increase.'
                },
                'execution_role_arn': {
                    'type': 'string',
                    'title': 'Execution Role ARN',
                    'default': 'arn:aws:iam::id:role/production-workpool_name-execution-role',
                    'description': 'An execution role to use for the task. This controls the permissions of the task when it is launching. If this value is not null, it will override the value 
in the task definition. An execution role must be provided to capture logs from the container.'
                },
                'task_definition_arn': {
                    'type': 'string',
                    'title': 'Task Definition Arn',
                    'description': 'An identifier for an existing task definition to use. If set, options that require changes to the task definition will be ignored. All contents of the task 
definition in the job configuration will be ignored.'
                },
                'network_configuration': {
                    'type': 'object',
                    'title': 'Network Configuration',
                    'description': "When `network_configuration` is supplied it will override ECS Worker'sawsvpcConfiguration that defined in the ECS task executing your workload. See the [AWS 
documentation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ecs-service-awsvpcconfiguration.html) for available options."
                },
                'cloudwatch_logs_prefix': {
                    'type': 'string',
                    'title': 'Cloudwatch Logs Prefix',
                    'description': 'When `configure_cloudwatch_logs` is enabled, this setting may be used to set a prefix for the log group. If not provided, the default prefix will be 
`prefect-logs_<work_pool_name>_<deployment_id>`. If `awslogs-stream-prefix` is present in `Cloudwatch logs options` this setting will be ignored.'
                },
                'cloudwatch_logs_options': {
                    'type': 'object',
                    'title': 'Cloudwatch Logs Options',
                    'default': {'awslogs-group': '/ecs/production/workpool_name-ecs-log-group', 'awslogs-region': 'us-east-1', 'awslogs-stream-prefix': 'ecs/workpool_name'},
                    'description': 'When `configure_cloudwatch_logs` is enabled, this setting may be used to pass additional options to the CloudWatch logs configuration or override the default 
options. See the [AWS documentation](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/using_awslogs.html#create_awslogs_logdriver_options) for available options. ',
                    'additionalProperties': {'type': 'string'}
                },
                'task_watch_poll_interval': {
                    'type': 'number',
                    'title': 'Task Watch Poll Interval',
                    'default': 5,
                    'description': 'The amount of time to wait between AWS API calls while monitoring the state of an ECS task.'
                },
                'configure_cloudwatch_logs': {
                    'type': 'boolean',
                    'title': 'Configure Cloudwatch Logs',
                    'default': True,
                    'description': 'If enabled, the Prefect container will be configured to send its output to the AWS CloudWatch logs service. This functionality requires an execution role with
logs:CreateLogStream, logs:CreateLogGroup, and logs:PutLogEvents permissions. The default for this field is `False` unless `stream_output` is set.'
                },
                'capacity_provider_strategy': {
                    'type': 'array',
                    'items': {'$ref': '#/definitions/CapacityProvider'},
                    'title': 'Capacity Provider Strategy',
                    'description': 'The capacity provider strategy to use when running the task. If a capacity provider strategy is specified, the selected launch type will be ignored.'
                },
                'task_start_timeout_seconds': {
                    'type': 'integer',
                    'title': 'Task Start Timeout Seconds',
                    'default': 300,
                    'description': 'The amount of time to watch for the start of the ECS task before marking it as failed. The task must enter a RUNNING state to be considered started.'
                },
                'auto_deregister_task_definition': {
                    'type': 'boolean',
                    'title': 'Auto Deregister Task Definition',
                    'default': False,
                    'description': 'If enabled, any task definitions that are created by this block will be deregistered. Existing task definitions linked by ARN will never be deregistered. 
Deregistering a task definition does not remove it from your AWS account, instead it will be marked as INACTIVE.'
                },
                'match_latest_revision_in_family': {
                    'type': 'boolean',
                    'title': 'Match Latest Revision In Family',
                    'default': True,
                    'description': 'If enabled, the most recent active revision in the task definition family will be compared against the desired ECS task configuration. If they are equal, the 
existing task definition will be used instead of registering a new one. If no family is specified the default family "prefect" will be used.'
                }
            },
            'definitions': {
                'AwsCredentials': {
                    'type': 'object',
                    'title': 'AwsCredentials',
                    'properties': {
                        'region_name': {'type': 'string', 'title': 'Region Name', 'description': 'The AWS Region where you want to create new connections.'},
                        'profile_name': {'type': 'string', 'title': 'Profile Name', 'description': 'The profile to use when creating your session.'},
                        'aws_access_key_id': {'type': 'string', 'title': 'AWS Access Key ID', 'description': 'A specific AWS access key ID.'},
                        'aws_session_token': {
                            'type': 'string',
                            'title': 'AWS Session Token',
                            'description': 'The session key for your AWS account. This is only needed when you are using temporary credentials.'
                        },
                        'aws_client_parameters': {
                            'allOf': [{'$ref': '#/definitions/AwsClientParameters'}],
                            'title': 'AWS Client Parameters',
                            'description': 'Extra parameters to initialize the Client.'
                        },
                        'aws_secret_access_key': {
                            'type': 'string',
                            'title': 'AWS Access Key Secret',
                            'format': 'password',
                            'writeOnly': True,
                            'description': 'A specific AWS secret access key.'
                        }
                    },
                    'description': 'Block used to manage authentication with AWS. AWS authentication is\nhandled via the `boto3` module. Refer to the\n[boto3 
docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html)\nfor more info about the possible credential configurations.',
                    'secret_fields': ['aws_secret_access_key'],
                    'block_type_slug': 'aws-credentials',
                    'block_schema_references': {}
                },
                'CapacityProvider': {
                    'type': 'object',
                    'title': 'CapacityProvider',
                    'required': ['capacityProvider', 'weight', 'base'],
                    'properties': {
                        'base': {'type': 'integer', 'title': 'Base'},
                        'weight': {'type': 'integer', 'title': 'Weight'},
                        'capacityProvider': {'type': 'string', 'title': 'Capacityprovider'}
                    },
                    'description': 'The capacity provider strategy to use when running the task.'
                },
                'AwsClientParameters': {
                    'type': 'object',
                    'title': 'AwsClientParameters',
                    'properties': {
                        'config': {'type': 'object', 'title': 'Botocore Config', 'description': 'Advanced configuration for Botocore clients.'},
                        'verify': {
                            'anyOf': [{'type': 'boolean'}, {'type': 'string', 'format': 'file-path'}],
                            'title': 'Verify',
                            'default': True,
                            'description': 'Whether or not to verify SSL certificates.'
                        },
                        'use_ssl': {'type': 'boolean', 'title': 'Use SSL', 'default': True, 'description': 'Whether or not to use SSL.'},
                        'api_version': {'type': 'string', 'title': 'API Version', 'description': 'The API version to use.'},
                        'endpoint_url': {'type': 'string', 'title': 'Endpoint URL', 'description': 'The complete URL to use for the constructed client.'},
                        'verify_cert_path': {'type': 'string', 'title': 'Certificate Authority Bundle File Path', 'format': 'file-path', 'description': 'Path to the CA cert bundle to use.'}
                    },
                    'description': 'Model used to manage extra parameters that you can pass when you initialize\nthe Client. If you want to find more information, see\n[boto3 
docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html)\nfor more info about the possible client configurations.\n\nAttributes:\n    api_version: The API 
version to use. By default, botocore will\n        use the latest API version when creating a client. You only need\n        to specify this parameter if you want to use a previous API version\n
of the client.\n    use_ssl: Whether or not to use SSL. By default, SSL is used.\n        Note that not all services support non-ssl connections.\n    verify: Whether or not to verify SSL 
certificates. By default\n        SSL certificates are verified. If False, SSL will still be used\n        (unless use_ssl is False), but SSL certificates\n        will not be verified. Passing 
a file path to this is deprecated.\n    verify_cert_path: A filename of the CA cert bundle to\n        use. You can specify this argument if you want to use a\n        different CA cert bundle 
than the one used by botocore.\n    endpoint_url: The complete URL to use for the constructed\n        client. Normally, botocore will automatically construct the\n        appropriate URL to use
when communicating with a service. You\n        can specify a complete URL (including the "http/https" scheme)\n        to override this behavior. If this value is provided,\n        then 
``use_ssl`` is ignored.\n    config: Advanced configuration for Botocore clients. See\n        [botocore docs](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html)\n
for more details.'
                }
            },
            'description': 'Variables for templating an ECS job.'
        },
        'job_configuration': {
            'env': '{{ env }}',
            'name': '{{ name }}',
            'labels': '{{ labels }}',
            'vpc_id': '{{ vpc_id }}',
            'cluster': '{{ cluster }}',
            'command': '{{ command }}',
            'stream_output': '{{ stream_output }}',
            'container_name': '{{ container_name }}',
            'aws_credentials': '{{ aws_credentials }}',
            'task_definition': {
                'cpu': '{{ cpu }}',
                'family': '{{ family }}',
                'memory': '{{ memory }}',
                'ephemeralStorage': {'sizeInGiB': '{{ ephemeral_storage }}'},
                'executionRoleArn': '{{ execution_role_arn }}',
                'containerDefinitions': [{'name': '{{ container_name }}', 'image': '{{ image }}'}]
            },
            'task_run_request': {
                'tags': '{{ labels }}',
                'cluster': '{{ cluster }}',
                'overrides': {
                    'cpu': '{{ cpu }}',
                    'memory': '{{ memory }}',
                    'taskRoleArn': '{{ task_role_arn }}',
                    'containerOverrides': [{'cpu': '{{ cpu }}', 'name': '{{ container_name }}', 'memory': '{{ memory }}', 'command': '{{ command }}', 'environment': '{{ env }}'}]
                },
                'launchType': '{{ launch_type }}',
                'taskDefinition': '{{ task_definition_arn }}',
                'capacityProviderStrategy': '{{ capacity_provider_strategy }}'
            },
            'network_configuration': '{{ network_configuration }}',
            'cloudwatch_logs_prefix': '{{ cloudwatch_logs_prefix }}',
            'cloudwatch_logs_options': '{{ cloudwatch_logs_options }}',
            'task_watch_poll_interval': '{{ task_watch_poll_interval }}',
            'configure_cloudwatch_logs': '{{ configure_cloudwatch_logs }}',
            'task_start_timeout_seconds': '{{ task_start_timeout_seconds }}',
            'auto_deregister_task_definition': '{{ auto_deregister_task_definition }}',
            'match_latest_revision_in_family': '{{ match_latest_revision_in_family }}'
        }
    },
    concurrency_limit=50,
    status=WorkPoolStatus.READY,
    storage_configuration=WorkPoolStorageConfiguration(),
    default_queue_id='id'
)

Version info

❯ prefect version                                                                                      
19:12:25.163 | DEBUG   | prefect.profiles - Using profile 'default'
Version:             3.4.4
API version:         0.8.4
Python version:      3.10.18
Git commit:          0367d7aa
Built:               Thu, May 29, 2025 09:37 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud
Pydantic version:    2.11.7

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingcloudRelated to Prefect Cloud

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions