Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4099,42 +4099,42 @@ def test_catchup_works_correctly(self, dag_maker):
) > (timezone.utcnow() - timedelta(days=2))


@pytest.mark.xfail(reason="Work out where this goes")
def test_task_with_upstream_skip_process_task_instances():
@pytest.mark.need_serialized_dag
def test_schedule_dag_run_with_upstream_skip(dag_maker, session):
"""
Test if _process_task_instances puts a task instance into SKIPPED state if any of its
Test if _schedule_dag_run puts a task instance into SKIPPED state if any of its
upstream tasks are skipped according to TriggerRuleDep.
"""
clear_db_runs()
with DAG(
dag_id='test_task_with_upstream_skip_dag', start_date=DEFAULT_DATE, schedule_interval=None
) as dag:
with dag_maker(
dag_id='test_task_with_upstream_skip_process_task_instances',
start_date=DEFAULT_DATE,
session=session,
):
dummy1 = EmptyOperator(task_id='dummy1')
dummy2 = EmptyOperator(task_id="dummy2")
dummy3 = EmptyOperator(task_id="dummy3")
[dummy1, dummy2] >> dummy3

# dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
dr = dag.create_dagrun(run_type=DagRunType.MANUAL, state=State.RUNNING, execution_date=DEFAULT_DATE)
dr = dag_maker.create_dagrun(state=State.RUNNING)
assert dr is not None

with create_session() as session:
tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
# Set dummy1 to skipped and dummy2 to success. dummy3 remains as none.
tis[dummy1.task_id].state = State.SKIPPED
tis[dummy2.task_id].state = State.SUCCESS
assert tis[dummy3.task_id].state == State.NONE
tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
# Set dummy1 to skipped and dummy2 to success. dummy3 remains as none.
tis[dummy1.task_id].state = State.SKIPPED
tis[dummy2.task_id].state = State.SUCCESS
assert tis[dummy3.task_id].state == State.NONE
session.flush()

# dag_runs = DagRun.find(dag_id='test_task_with_upstream_skip_dag')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk
I tried to get rid of xfail test in tests/jobs/test_scheduler_job.py::test_task_with_upstream_skip_process_task_instances but in this function the comment says it expects dummy3 to be skipped. But it is returned as None. Can you tell me the expected behaviour?

I tried to check similar place in code where they try to skip task and find out the state of downstream task but I couldn't find many.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really sure, I am afraid - those are pretty deep dive tests and they might require qutie a bit deeper knowledge (and time to look it up).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i took time to look but I am confused on this part. Let me try again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk Could i post this in slack and see if i could get some help in fixing this issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can try :) but likely in this case there are very few people who know it deeply enough to be able to help (and likely they will have to simply solve it rather than help :) - @ephraimbuddy , myself, @ashb, @uranusjr are the likely candidates :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think I picked up the wrong issue to fix.

# dag_file_processor._process_task_instances(dag, dag_runs=dag_runs)

with create_session() as session:
tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
assert tis[dummy1.task_id].state == State.SKIPPED
assert tis[dummy2.task_id].state == State.SUCCESS
# dummy3 should be skipped because dummy1 is skipped.
assert tis[dummy3.task_id].state == State.SKIPPED
scheduler_job = SchedulerJob(subdir=os.devnull)
scheduler_job._schedule_dag_run(dr, session)
session.flush()
tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
assert tis[dummy1.task_id].state == State.SKIPPED
assert tis[dummy2.task_id].state == State.SUCCESS
# dummy3 should be skipped because dummy1 is skipped.
assert tis[dummy3.task_id].state == State.SKIPPED


class TestSchedulerJobQueriesCount:
Expand Down