Task with ALL_DONE trigger rule gets skipped #40539
Replies: 4 comments
-
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. |
Beta Was this translation helpful? Give feedback.
-
from airflow import DAG from airflow.utils.state import TaskInstanceState report_id= 99999 stageConfig1={} stageConfig2={} test_array = [] test_array.append(stageConfig) tabDays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] def get_day(**kwargs): def branch(ti):
def move_file_and_info(): move_file = PythonOperator( def bq_insert_call(): def df_task_call(): prev_task = None
|
Beta Was this translation helpful? Give feedback.
-
how is this related to the image you shared and to your code? If you still believe there is a bug please provide clear description of the issue and minimal reproduce example. |
Beta Was this translation helpful? Give feedback.
-
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.6.3
What happened?
PythonOperator job with TriggerRule.ALL_DONE skipped.
What you think should happen instead?
Task should trigger once all upstream task should be completed either failed, success or skipped
How to reproduce
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.utils.state import TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule
used to fatorize the code and avoid repetition
report_id= 99999
stageConfig={}
stageConfig['name']='aaaa'
stageConfig['address']='11111'
stageConfig1={}
stageConfig1['name']='bbbb'
stageConfig1['address']='22222'
stageConfig2={}
stageConfig2['name']='cccc'
stageConfig2['address']='33333'
test_array = []
test_array.append(stageConfig)
test_array.append(stageConfig1)
test_array.append(stageConfig2)
tabDays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
default_args = { '
'depends_on_past': False,
'start_date': datetime(2018, 6, 18),
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'Weekday',
default_args=default_args,
max_active_runs=1,
catchup=False,
schedule="*/5 * * * *")
returns the week day (monday, tuesday, etc.)
def get_day(**kwargs):
kwargs['ti'].xcom_push(key='day', value=datetime.now().weekday())
returns the name id of the task to launch (task_for_monday, task_for_tuesday, etc.)
def branch(ti):
print("$$$$$$$$$$")
#print(ti.upstream_task_ids) Not available
print(ti.task.get_direct_relatives(True))
print(ti.task.get_direct_relative_ids(True))
upstream_task_list = []
upstream_task_id_set = ti.task.get_direct_relative_ids(True)
def move_file_and_info():
print("move_file_and_info")
move_file = PythonOperator(
task_id='move_file',
trigger_rule=TriggerRule.ALL_DONE,
python_callable=move_file_and_info,
provide_context=True,
dag=dag
)
def bq_insert_call():
print("bq_insert....")
def df_task_call():
print("df_task....")
One dummy operator for each week day, all branched to the fork
#prev_task = fork
prev_task = None
for index, day in enumerate(test_array):
bq_insert = PythonOperator(task_id='task_bq_insert' + day['name'], dag=dag, python_callable=bq_insert_call, provide_context=True,)
t = PythonOperator(task_id='task_for_' + day['name'], dag=dag, python_callable=df_task_call, provide_context=True,)
Operating System
GCP managed service
Versions of Apache Airflow Providers
GCP
Deployment
Official Apache Airflow Helm Chart
Deployment details
NA
Anything else?
NA
Are you willing to submit PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions