-
Notifications
You must be signed in to change notification settings - Fork 15.9k
Port dag.test to Task SDK
#50300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Port dag.test to Task SDK
#50300
Conversation
5f00cac to
2257904
Compare
0fdd739 to
98c1675
Compare
amoghrajesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, no major concerns, a few qns here and there
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
Show resolved
Hide resolved
|
Oh i see you are using dag.test: https://github.com/apache/airflow/blob/main/devel-common/src/tests_common/test_utils/system_tests.py#L35-L75 |
Do you have more logs? Let's chat on slack and we can debug |
closes #45549 Key changes: - Moves `dag.test` implementation to Task SDK, leveraging the existing in-process execution infrastructure - Adds `JWTBearerTIPathDep` for proper task instance path validation - Updates `InProcessExecutionAPI` to support task instance validation - Removes legacy `dag.test` implementation from DAG class The changes ensure that `dag.test` uses the same execution path as regular task execution. (cherry picked from commit ef2da6f)
Follow-up of apache#50300 for running a single task.
Follow-up of apache#50300 for running a single task.
Follow-up of apache#50300 for running a single task.
Follow-up of apache#50300 for running a single task.
Follow-up of apache#50300 for running a single task.
Follow-up of apache#50300 for running a single task.
|
I think this PR introduces a bug for deferrable operators. If you look at AWS dashboard, all system tests in deferrable mode are failing. And they are failing since this PR has been merged. More detailsHere is the DAG I am using: with DAG(
dag_id=DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
tags=["example"],
catchup=False,
) as dag:
# This context contains the ENV_ID and any env variables requested when the
# task was built above. Access the info as you would any other TaskFlow task.
env_id = "722404908466step"
role_arn = "arn:aws:iam::722404908466:role/RoleSysTest_example_step_functions"
state_machine_arn = create_state_machine(env_id, role_arn)
# [START howto_operator_step_function_start_execution]
start_execution = StepFunctionStartExecutionOperator(
task_id="start_execution", state_machine_arn=state_machine_arn, deferrable=True
)
# [END howto_operator_step_function_start_execution]
execution_arn = start_execution.output
# [START howto_sensor_step_function_execution]
wait_for_execution = StepFunctionExecutionSensor(
task_id="wait_for_execution", execution_arn=execution_arn
)
# [END howto_sensor_step_function_execution]
wait_for_execution.poke_interval = 1
chain(
# TEST SETUP
state_machine_arn,
# TEST BODY
start_execution,
wait_for_execution,
)When I run this DAG with I can see this in logs: When executed in deferrable mode, the task Any chance someone knows why this happens? I'll look into the code but somebody has something in top of his mind, that will be faster :) |
Follow-up of apache#50300 for running a single task.
Fixed in #51182 |
closes #45549 Key changes: - Moves `dag.test` implementation to Task SDK, leveraging the existing in-process execution infrastructure - Adds `JWTBearerTIPathDep` for proper task instance path validation - Updates `InProcessExecutionAPI` to support task instance validation - Removes legacy `dag.test` implementation from DAG class The changes ensure that `dag.test` uses the same execution path as regular task execution. (cherry picked from commit ef2da6f)
Follow-up of apache#50300 for running a single task.
Follow-up of apache/airflow#50300 for running a single task. (cherry picked from commit 3a9858e7360129808274de1b8419f7abb7b6b9cb) GitOrigin-RevId: 6fc6c250c916aff8b8c06d4ce8d5232c9483f37e
Follow-up of apache/airflow#50300 for running a single task. GitOrigin-RevId: 3a9858e7360129808274de1b8419f7abb7b6b9cb


closes #45549
DAGs used to test
Filename:
example_simplest_dag.pyResult:
Async DAG:
Filename:
example_time_delta_sensor_async.pyResult:

Testing
I will add some more tests, but the existing tests (that use dag.test) are sufficient and catch most of the issues. Longer-term we should have Task SDK Integration/system tests that allow accessing DB. The current Task SDK unit test does not allow any DB interaction so tests can't go there.
TODO
Support
use_executorondag.test. It is used in system tests (based on Enable task sdk for system tests #48699)Handle DAG level callbacks
Handle
fail_fast, which currently relies on the DAG being present in theserialized_dagtable.Since we skip
task_runner.parseindag.test(), the DAG is not auto-serialized.airflow/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
Lines 374 to 378 in 952a004
Possible fix to add the following in
dag.test():Future Enhancements
ti.run&TI._run_raw_taskimplementation to use this code so we can nuke a lot of code in models.TaskInstance to completely remove a separate execution pathlogs, API server logs etcI will do (1) in the next few weeks as that is a big change too (affecting all providers) and (1) and (2) will be deferred for later.