Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,37 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) ->
reset_dag_runs = data.pop('reset_dag_runs')
dry_run = data.pop('dry_run')
# We always pass dry_run here, otherwise this would try to confirm on the terminal!
task_instances = dag.clear(dry_run=True, dag_bag=get_airflow_app().dag_bag, **data)
dag_run_id = data.pop('dag_run_id', None)
future = data.pop('include_future', False)
past = data.pop('include_past', False)
downstream = data.pop('include_downstream', False)
upstream = data.pop('include_upstream', False)
if dag_run_id is not None:
dag_run: Optional[DR] = (
session.query(DR).filter(DR.dag_id == dag_id, DR.run_id == dag_run_id).one_or_none()
)
if dag_run is None:
error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
raise NotFound(error_message)
data['start_date'] = dag_run.logical_date
data['end_date'] = dag_run.logical_date
if past:
data['start_date'] = None
if future:
data['end_date'] = None
task_ids = data.pop('task_ids', None)
if task_ids is not None:
task_id = [task[0] if isinstance(task, tuple) else task for task in task_ids]
dag = dag.partial_subset(
task_ids_or_regex=task_id,
include_downstream=downstream,
include_upstream=upstream,
)

if len(dag.task_dict) > 1:
# If we had upstream/downstream etc then also include those!
task_ids.extend(tid for tid in dag.task_dict if tid != task_id)
task_instances = dag.clear(dry_run=True, dag_bag=get_airflow_app().dag_bag, task_ids=task_ids, **data)
if not dry_run:
clear_task_instances(
task_instances.all(),
Expand Down
29 changes: 27 additions & 2 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/ClearTaskInstance'
$ref: '#/components/schemas/ClearTaskInstances'

responses:
'200':
Expand Down Expand Up @@ -3701,7 +3701,7 @@ components:
type: boolean
default: true

ClearTaskInstance:
ClearTaskInstances:
type: object
properties:
dry_run:
Expand Down Expand Up @@ -3753,6 +3753,31 @@ components:
description: Set state of DAG runs to RUNNING.
type: boolean

dag_run_id:
type: string
description: The DagRun ID for this task instance
nullable: true

include_upstream:
description: If set to true, upstream tasks are also affected.
type: boolean
default: false

include_downstream:
description: If set to true, downstream tasks are also affected.
type: boolean
default: false

include_future:
description: If set to True, also tasks from future DAG Runs are affected.
type: boolean
default: false

include_past:
description: If set to True, also tasks from past DAG Runs are affected.
type: boolean
default: false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbovenzi @ephraimbuddy I have added the params but when I hit these params to test the API, I get an error saying Unknown field. Have anyone faced this issue? Can you tell me where I am going wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbovenzi @ephraimbuddy Could anyone help me in solving this issue? It would be very helpful for me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I am not too familiar with this side of things, but I don't seem to see dry_run in this file. Could that be it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you need to add these to the Marshmallow schema (you already added dag_run_id but not the others).

UpdateTaskInstancesState:
type: object
properties:
Expand Down
11 changes: 11 additions & 0 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ class ClearTaskInstanceFormSchema(Schema):
include_parentdag = fields.Boolean(load_default=False)
reset_dag_runs = fields.Boolean(load_default=False)
task_ids = fields.List(fields.String(), validate=validate.Length(min=1))
dag_run_id = fields.Str(load_default=None)
include_upstream = fields.Boolean(load_default=False)
include_downstream = fields.Boolean(load_default=False)
include_future = fields.Boolean(load_default=False)
include_past = fields.Boolean(load_default=False)

@validates_schema
def validate_form(self, data, **kwargs):
Expand All @@ -128,6 +133,12 @@ def validate_form(self, data, **kwargs):
if data["start_date"] and data["end_date"]:
if data["start_date"] > data["end_date"]:
raise ValidationError("end_date is sooner than start_date")
if data["start_date"] and data["end_date"] and data["dag_run_id"]:
raise ValidationError("Exactly one of dag_run_id or (start_date and end_date) must be provided")
if data["start_date"] and data["dag_run_id"]:
raise ValidationError("Exactly one of dag_run_id or start_date must be provided")
if data["end_date"] and data["dag_run_id"]:
raise ValidationError("Exactly one of dag_run_id or end_date must be provided")


class SetTaskInstanceStateFormSchema(Schema):
Expand Down
28 changes: 25 additions & 3 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,7 @@ export interface components {
*/
dry_run?: boolean;
};
ClearTaskInstance: {
ClearTaskInstances: {
/**
* @description If set, don't actually run this operation. The response will contain a list of task instances
* planned to be cleaned, but not modified in any way.
Expand Down Expand Up @@ -1617,6 +1617,28 @@ export interface components {
include_parentdag?: boolean;
/** @description Set state of DAG runs to RUNNING. */
reset_dag_runs?: boolean;
/** @description The DagRun ID for this task instance */
dag_run_id?: string | null;
/**
* @description If set to true, upstream tasks are also affected.
* @default false
*/
include_upstream?: boolean;
/**
* @description If set to true, downstream tasks are also affected.
* @default false
*/
include_downstream?: boolean;
/**
* @description If set to True, also tasks from future DAG Runs are affected.
* @default false
*/
include_future?: boolean;
/**
* @description If set to True, also tasks from past DAG Runs are affected.
* @default false
*/
include_past?: boolean;
};
UpdateTaskInstancesState: {
/**
Expand Down Expand Up @@ -2451,7 +2473,7 @@ export interface operations {
/** Parameters of action */
requestBody: {
content: {
"application/json": components["schemas"]["ClearTaskInstance"];
"application/json": components["schemas"]["ClearTaskInstances"];
};
};
};
Expand Down Expand Up @@ -4095,7 +4117,7 @@ export type ConfigSection = CamelCasedPropertiesDeep<components['schemas']['Conf
export type Config = CamelCasedPropertiesDeep<components['schemas']['Config']>;
export type VersionInfo = CamelCasedPropertiesDeep<components['schemas']['VersionInfo']>;
export type ClearDagRun = CamelCasedPropertiesDeep<components['schemas']['ClearDagRun']>;
export type ClearTaskInstance = CamelCasedPropertiesDeep<components['schemas']['ClearTaskInstance']>;
export type ClearTaskInstances = CamelCasedPropertiesDeep<components['schemas']['ClearTaskInstances']>;
export type UpdateTaskInstancesState = CamelCasedPropertiesDeep<components['schemas']['UpdateTaskInstancesState']>;
export type ListDagRunsForm = CamelCasedPropertiesDeep<components['schemas']['ListDagRunsForm']>;
export type ListTaskInstanceForm = CamelCasedPropertiesDeep<components['schemas']['ListTaskInstanceForm']>;
Expand Down
31 changes: 31 additions & 0 deletions tests/api_connexion/schemas/test_task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,37 @@ class TestClearTaskInstanceFormSchema(unittest.TestCase):
}
]
),
(
[
{
"dry_run": False,
"reset_dag_runs": True,
"dag_run_id": "scheduled__2022-06-19T00:00:00+00:00",
"start_date": "2022-08-03T00:00:00+00:00",
}
]
),
(
[
{
"dry_run": False,
"reset_dag_runs": True,
"dag_run_id": "scheduled__2022-06-19T00:00:00+00:00",
"end_date": "2022-08-03T00:00:00+00:00",
}
]
),
(
[
{
"dry_run": False,
"reset_dag_runs": True,
"dag_run_id": "scheduled__2022-06-19T00:00:00+00:00",
"end_date": "2022-08-04T00:00:00+00:00",
"start_date": "2022-08-03T00:00:00+00:00",
}
]
),
]
)
def test_validation_error(self, payload):
Expand Down