From 458a2ac1db72eca776f38c4fe2a69d8d6cf68a14 Mon Sep 17 00:00:00 2001 From: Bowrna Date: Fri, 6 May 2022 11:59:02 +0530 Subject: [PATCH] clear specific dag run TI --- .../endpoints/task_instance_endpoint.py | 32 ++++++++++++++++++- airflow/api_connexion/openapi/v1.yaml | 29 +++++++++++++++-- .../schemas/task_instance_schema.py | 11 +++++++ airflow/www/static/js/types/api-generated.ts | 28 ++++++++++++++-- .../schemas/test_task_instance_schema.py | 31 ++++++++++++++++++ 5 files changed, 125 insertions(+), 6 deletions(-) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 6cc3e784e62a3..910df4c929f2e 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -437,7 +437,37 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) -> reset_dag_runs = data.pop('reset_dag_runs') dry_run = data.pop('dry_run') # We always pass dry_run here, otherwise this would try to confirm on the terminal! - task_instances = dag.clear(dry_run=True, dag_bag=get_airflow_app().dag_bag, **data) + dag_run_id = data.pop('dag_run_id', None) + future = data.pop('include_future', False) + past = data.pop('include_past', False) + downstream = data.pop('include_downstream', False) + upstream = data.pop('include_upstream', False) + if dag_run_id is not None: + dag_run: Optional[DR] = ( + session.query(DR).filter(DR.dag_id == dag_id, DR.run_id == dag_run_id).one_or_none() + ) + if dag_run is None: + error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}' + raise NotFound(error_message) + data['start_date'] = dag_run.logical_date + data['end_date'] = dag_run.logical_date + if past: + data['start_date'] = None + if future: + data['end_date'] = None + task_ids = data.pop('task_ids', None) + if task_ids is not None: + task_id = [task[0] if isinstance(task, tuple) else task for task in task_ids] + dag = dag.partial_subset( + task_ids_or_regex=task_id, + include_downstream=downstream, + include_upstream=upstream, + ) + + if len(dag.task_dict) > 1: + # If we had upstream/downstream etc then also include those! + task_ids.extend(tid for tid in dag.task_dict if tid != task_id) + task_instances = dag.clear(dry_run=True, dag_bag=get_airflow_app().dag_bag, task_ids=task_ids, **data) if not dry_run: clear_task_instances( task_instances.all(), diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 58dad1bc1dbe9..60faaa6c42495 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -569,7 +569,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/ClearTaskInstance' + $ref: '#/components/schemas/ClearTaskInstances' responses: '200': @@ -3701,7 +3701,7 @@ components: type: boolean default: true - ClearTaskInstance: + ClearTaskInstances: type: object properties: dry_run: @@ -3753,6 +3753,31 @@ components: description: Set state of DAG runs to RUNNING. type: boolean + dag_run_id: + type: string + description: The DagRun ID for this task instance + nullable: true + + include_upstream: + description: If set to true, upstream tasks are also affected. + type: boolean + default: false + + include_downstream: + description: If set to true, downstream tasks are also affected. + type: boolean + default: false + + include_future: + description: If set to True, also tasks from future DAG Runs are affected. + type: boolean + default: false + + include_past: + description: If set to True, also tasks from past DAG Runs are affected. + type: boolean + default: false + UpdateTaskInstancesState: type: object properties: diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index 74824dbaf87c6..26851b02f63c5 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -119,6 +119,11 @@ class ClearTaskInstanceFormSchema(Schema): include_parentdag = fields.Boolean(load_default=False) reset_dag_runs = fields.Boolean(load_default=False) task_ids = fields.List(fields.String(), validate=validate.Length(min=1)) + dag_run_id = fields.Str(load_default=None) + include_upstream = fields.Boolean(load_default=False) + include_downstream = fields.Boolean(load_default=False) + include_future = fields.Boolean(load_default=False) + include_past = fields.Boolean(load_default=False) @validates_schema def validate_form(self, data, **kwargs): @@ -128,6 +133,12 @@ def validate_form(self, data, **kwargs): if data["start_date"] and data["end_date"]: if data["start_date"] > data["end_date"]: raise ValidationError("end_date is sooner than start_date") + if data["start_date"] and data["end_date"] and data["dag_run_id"]: + raise ValidationError("Exactly one of dag_run_id or (start_date and end_date) must be provided") + if data["start_date"] and data["dag_run_id"]: + raise ValidationError("Exactly one of dag_run_id or start_date must be provided") + if data["end_date"] and data["dag_run_id"]: + raise ValidationError("Exactly one of dag_run_id or end_date must be provided") class SetTaskInstanceStateFormSchema(Schema): diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 64c70f633d2e6..c015b1fda2f13 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1577,7 +1577,7 @@ export interface components { */ dry_run?: boolean; }; - ClearTaskInstance: { + ClearTaskInstances: { /** * @description If set, don't actually run this operation. The response will contain a list of task instances * planned to be cleaned, but not modified in any way. @@ -1617,6 +1617,28 @@ export interface components { include_parentdag?: boolean; /** @description Set state of DAG runs to RUNNING. */ reset_dag_runs?: boolean; + /** @description The DagRun ID for this task instance */ + dag_run_id?: string | null; + /** + * @description If set to true, upstream tasks are also affected. + * @default false + */ + include_upstream?: boolean; + /** + * @description If set to true, downstream tasks are also affected. + * @default false + */ + include_downstream?: boolean; + /** + * @description If set to True, also tasks from future DAG Runs are affected. + * @default false + */ + include_future?: boolean; + /** + * @description If set to True, also tasks from past DAG Runs are affected. + * @default false + */ + include_past?: boolean; }; UpdateTaskInstancesState: { /** @@ -2451,7 +2473,7 @@ export interface operations { /** Parameters of action */ requestBody: { content: { - "application/json": components["schemas"]["ClearTaskInstance"]; + "application/json": components["schemas"]["ClearTaskInstances"]; }; }; }; @@ -4095,7 +4117,7 @@ export type ConfigSection = CamelCasedPropertiesDeep; export type VersionInfo = CamelCasedPropertiesDeep; export type ClearDagRun = CamelCasedPropertiesDeep; -export type ClearTaskInstance = CamelCasedPropertiesDeep; +export type ClearTaskInstances = CamelCasedPropertiesDeep; export type UpdateTaskInstancesState = CamelCasedPropertiesDeep; export type ListDagRunsForm = CamelCasedPropertiesDeep; export type ListTaskInstanceForm = CamelCasedPropertiesDeep; diff --git a/tests/api_connexion/schemas/test_task_instance_schema.py b/tests/api_connexion/schemas/test_task_instance_schema.py index cef9c7a357951..cd8c3a4e2013f 100644 --- a/tests/api_connexion/schemas/test_task_instance_schema.py +++ b/tests/api_connexion/schemas/test_task_instance_schema.py @@ -177,6 +177,37 @@ class TestClearTaskInstanceFormSchema(unittest.TestCase): } ] ), + ( + [ + { + "dry_run": False, + "reset_dag_runs": True, + "dag_run_id": "scheduled__2022-06-19T00:00:00+00:00", + "start_date": "2022-08-03T00:00:00+00:00", + } + ] + ), + ( + [ + { + "dry_run": False, + "reset_dag_runs": True, + "dag_run_id": "scheduled__2022-06-19T00:00:00+00:00", + "end_date": "2022-08-03T00:00:00+00:00", + } + ] + ), + ( + [ + { + "dry_run": False, + "reset_dag_runs": True, + "dag_run_id": "scheduled__2022-06-19T00:00:00+00:00", + "end_date": "2022-08-04T00:00:00+00:00", + "start_date": "2022-08-03T00:00:00+00:00", + } + ] + ), ] ) def test_validation_error(self, payload):