Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

include limit and offset in request body schema for List task instances (batch) endpoint #43479

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
ti_query = base_query.options(
joinedload(TI.rendered_task_instance_fields), joinedload(TI.task_instance_note)
)
ti_query = ti_query.offset(data["page_offset"]).limit(data["page_limit"])
# using execute because we want the SlaMiss entity. Scalars don't return None for missing entities
task_instances = session.execute(ti_query).all()

Expand Down
9 changes: 9 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5097,6 +5097,15 @@ components:
ListTaskInstanceForm:
type: object
properties:
page_offset:
type: integer
minimum: 0
description: The number of items to skip before starting to collect the result set.
page_limit:
type: integer
minimum: 1
default: 100
description: The numbers of items to return.
dag_ids:
type: array
items:
Expand Down
7 changes: 7 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2233,6 +2233,13 @@ export interface components {
end_date_lte?: string;
};
ListTaskInstanceForm: {
/** @description The number of items to skip before starting to collect the result set. */
page_offset?: number;
/**
* @description The numbers of items to return.
* @default 100
*/
page_limit?: number;
/**
* @description Return objects with specific DAG IDs.
* The value can be repeated to retrieve multiple matching values (OR condition).
Expand Down
50 changes: 50 additions & 0 deletions tests/api_connexion/endpoints/test_task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,56 @@ def test_should_raise_400_for_naive_and_bad_datetime(self, payload, expected, se
assert response.status_code == 400
assert expected in response.json["detail"]

def test_should_respond_200_for_pagination(self, session):
dag_id = "example_python_operator"

self.create_task_instances(
session,
task_instances=[
{"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))} for i in range(10)
],
dag_id=dag_id,
)

# First 5 items
response_batch1 = self.client.post(
"/api/v1/dags/~/dagRuns/~/taskInstances/list",
environ_overrides={"REMOTE_USER": "test"},
json={"page_limit": 5, "page_offset": 0},
)
assert response_batch1.status_code == 200, response_batch1.json
num_entries_batch1 = len(response_batch1.json["task_instances"])
assert num_entries_batch1 == 5
assert len(response_batch1.json["task_instances"]) == 5

# 5 items after that
response_batch2 = self.client.post(
"/api/v1/dags/~/dagRuns/~/taskInstances/list",
environ_overrides={"REMOTE_USER": "test"},
json={"page_limit": 5, "page_offset": 5},
)
assert response_batch2.status_code == 200, response_batch2.json
num_entries_batch2 = len(response_batch2.json["task_instances"])
assert num_entries_batch2 > 0
assert len(response_batch2.json["task_instances"]) > 0

# Match
ti_count = 9
assert response_batch1.json["total_entries"] == response_batch2.json["total_entries"] == ti_count
assert (num_entries_batch1 + num_entries_batch2) == ti_count
assert response_batch1 != response_batch2

# default limit and offset
response_batch3 = self.client.post(
"/api/v1/dags/~/dagRuns/~/taskInstances/list",
environ_overrides={"REMOTE_USER": "test"},
json={},
)

num_entries_batch3 = len(response_batch3.json["task_instances"])
assert num_entries_batch3 == ti_count
assert len(response_batch3.json["task_instances"]) == ti_count


class TestPostClearTaskInstances(TestTaskInstanceEndpoint):
@pytest.mark.parametrize(
Expand Down