Skip to content

Commit

Permalink
Temporarily change BigQueryTablePartitionExistenceSensors to run in…
Browse files Browse the repository at this point in the history
… reschedule mode (bug 1932180) (#6542)

* Temporarily change `BigQueryTablePartitionExistenceSensor`s to run in reschedule mode.

There's a bug in those sensors when they run in deferrable mode where the secondary queries they run to detect partitions fail, which is being fixed in apache/airflow#44225.

* Fix `test_dag_with_bigquery_table_sensors`.
  • Loading branch information
sean-rose authored Nov 20, 2024
1 parent 635e3f4 commit 19237a9
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
5 changes: 3 additions & 2 deletions bigquery_etl/query_scheduling/templates/airflow_dag.j2
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,12 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
table_id={{ table_name | format_repr }},
partition_id={{ table_partition_sensor_task.partition_id | format_repr }},
gcp_conn_id='google_cloud_shared_prod',
deferrable=True,
deferrable=False,
mode='reschedule',
{% if table_partition_sensor_task.poke_interval != None -%}
poke_interval={{ table_partition_sensor_task.poke_interval | format_timedelta | format_repr }},
{% else -%}
poke_interval=datetime.timedelta(minutes=5),
poke_interval=datetime.timedelta(minutes=15),
{% endif -%}
{% if table_partition_sensor_task.timeout != None -%}
timeout={{ table_partition_sensor_task.timeout | format_timedelta | format_repr }},
Expand Down
3 changes: 2 additions & 1 deletion tests/data/dags/test_dag_with_bigquery_table_sensors
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ with DAG(
table_id="baz",
partition_id="{{ ds_nodash }}",
gcp_conn_id="google_cloud_shared_prod",
deferrable=True,
deferrable=False,
mode="reschedule",
poke_interval=datetime.timedelta(seconds=900),
timeout=datetime.timedelta(seconds=21600),
retries=3,
Expand Down

1 comment on commit 19237a9

@dataops-ci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integration report for "Temporarily change BigQueryTablePartitionExistenceSensors to run in reschedule mode (bug 1932180) (#6542)"

sql.diff

Click to expand!
diff -bur --no-dereference --new-file /tmp/workspace/main-generated-sql/dags/bqetl_google_search_console.py /tmp/workspace/generated-sql/dags/bqetl_google_search_console.py
--- /tmp/workspace/main-generated-sql/dags/bqetl_google_search_console.py	2024-11-20 21:32:32.000000000 +0000
+++ /tmp/workspace/generated-sql/dags/bqetl_google_search_console.py	2024-11-20 21:34:12.000000000 +0000
@@ -67,8 +67,9 @@
             table_id="searchdata_url_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )
@@ -81,8 +82,9 @@
             table_id="searchdata_url_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )
@@ -95,8 +97,9 @@
             table_id="searchdata_url_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )
@@ -109,8 +112,9 @@
             table_id="searchdata_url_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )
@@ -123,8 +127,9 @@
             table_id="searchdata_url_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )
@@ -137,8 +142,9 @@
             table_id="searchdata_url_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )
@@ -151,8 +157,9 @@
             table_id="searchdata_site_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )
@@ -165,8 +172,9 @@
             table_id="searchdata_site_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )
@@ -179,8 +187,9 @@
             table_id="searchdata_site_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )
@@ -193,8 +202,9 @@
             table_id="searchdata_site_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )
@@ -207,8 +217,9 @@
             table_id="searchdata_site_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )
@@ -221,8 +232,9 @@
             table_id="searchdata_site_impression",
             partition_id="{{ data_interval_start.subtract(days=1) | ds_nodash }}",
             gcp_conn_id="google_cloud_shared_prod",
-            deferrable=True,
-            poke_interval=datetime.timedelta(minutes=5),
+            deferrable=False,
+            mode="reschedule",
+            poke_interval=datetime.timedelta(minutes=15),
             timeout=datetime.timedelta(hours=8),
         )
     )

Link to full diff

Please sign in to comment.