Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate public endpoint Set Task Instances State to FastAPI #44246

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

omkar-foss
Copy link
Collaborator

closes: #43752
related: #42370

This migrates the Set Task Instances State API from api_connexion to api_fastapi.

@boring-cyborg boring-cyborg bot added the area:UI Related to UI/UX. For Frontend Developers. label Nov 21, 2024
@omkar-foss omkar-foss self-assigned this Nov 21, 2024
@omkar-foss omkar-foss added the legacy api Whether legacy API changes should be allowed in PR label Nov 21, 2024
tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances"
)
task_instances_router = AirflowRouter(tags=["Task Instance"], prefix="/dags/{dag_id}")
task_instances_prefix = "/dagRuns/{dag_run_id}/taskInstances"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make task_instances_prefix all capitals to indicate constant?

"task_instances": [
{
"dag_id": "example_python_operator",
"dag_run_id": "TEST_DAG_RUN_ID",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"dag_run_id": "TEST_DAG_RUN_ID",
"dag_run_id": run_id,

@mock.patch("airflow.models.dag.DAG.set_task_instance_state")
def test_should_assert_call_mocked_api(self, mock_set_task_instance_state, test_client, session):
self.create_task_instances(session)
run_id = "TEST_DAG_RUN_ID"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
run_id = "TEST_DAG_RUN_ID"
run_id = "TEST_DAG_RUN_ID"
task_id = "print_the_context"

Copy link
Collaborator

@rawwar rawwar Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also,we denote them in capital letters to indicate that they are constants.

session.query(TaskInstance)
.join(TaskInstance.dag_run)
.options(contains_eager(TaskInstance.dag_run))
.filter(TaskInstance.task_id == "print_the_context")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.filter(TaskInstance.task_id == "print_the_context")
.filter(TaskInstance.task_id == task_id)

"/public/dags/example_python_operator/updateTaskInstancesState",
json={
"dry_run": True,
"task_id": "print_the_context",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"task_id": "print_the_context",
"task_id": task_id",

"dag_id": "example_python_operator",
"dag_run_id": "TEST_DAG_RUN_ID",
"logical_date": "2020-01-01T00:00:00Z",
"task_id": "print_the_context",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"task_id": "print_the_context",
"task_id": task_id,

Copy link
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need that anymore. Isn't it already handled by the more generic PATCH task instance now ?

"/updateTaskInstancesState",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST]),
)
def set_task_instances_state(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, I think it should be transformed as for dag_run into patch_dag_run.

A PATCH that can take only a state for now, but it will be easier to extend. It should also allow PATCHING the note.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed you already have a PR opened for that.

include_downstream: bool
include_future: bool
include_past: bool
new_state: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to include editing the Task Instance note at the same time like we do for changing a dag run.

task_id: str
dag_run_id: str = Field(validation_alias="run_id")
dag_id: str
logical_date: datetime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logical_date is no longer unique. Is it still necessary for serializing? @dstandish

Comment on lines +162 to +165
include_upstream: bool
include_downstream: bool
include_future: bool
include_past: bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make all of these False by default

@bbovenzi
Copy link
Contributor

I think we should only merge one of this and #44223 and include setting a note and all the include_* filters to update other task instance states at the same time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:UI Related to UI/UX. For Frontend Developers. legacy api Whether legacy API changes should be allowed in PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AIP-84 Set Task Instances State
4 participants