Skip to content

Commit

Permalink
Append concurrency tag to prevent issues between DAGs in staging (#5155)
Browse files Browse the repository at this point in the history
  • Loading branch information
krysal authored Nov 12, 2024
1 parent 1b64d10 commit 0fe2eb4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
2 changes: 1 addition & 1 deletion catalog/dags/common/sensors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def get_dags_with_concurrency_tag(
return [id for id in dag_ids if id not in {*excluded_dag_ids, running_dag_id}]


@task
@task(map_index_template="{{ task.op_kwargs['external_dag_id'] }}")
def wait_for_external_dag(
external_dag_id: str,
task_id: str | None = None,
Expand Down
28 changes: 17 additions & 11 deletions catalog/dags/data_refresh/dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@
DATA_REFRESH_POOL,
ENVIRONMENTS,
PRODUCTION,
STAGING,
Environment,
)
from common.sensors.constants import ES_CONCURRENCY_TAGS
from common.sensors.constants import ES_CONCURRENCY_TAGS, STAGING_DB_CONCURRENCY_TAG
from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor
from common.sensors.utils import wait_for_external_dags_with_tag
from data_refresh.alter_data import alter_table_data
Expand All @@ -69,7 +70,7 @@
def wait_for_conflicting_dags(
data_refresh_config: DataRefreshConfig,
external_dag_ids: list[str],
concurrency_tag: str,
concurrency_tags: list[str],
allow_concurrent_data_refreshes: bool,
):
# Wait to ensure that no other Data Refresh DAGs are running.
Expand All @@ -89,12 +90,15 @@ def wait_for_conflicting_dags(
# Realistically the data refresh is too slow to beat the index creation process,
# even if it was triggered immediately after one of these DAGs; however, it is
# always safer to avoid the possibility of the race condition altogether.
wait_for_external_dags_with_tag.override(group_id="wait_for_es_dags")(
tag=concurrency_tag,
# Exclude the other data refresh DAG ids for this environment, as waiting on these
# was handled in the previous task.
excluded_dag_ids=external_dag_ids,
)
for tag, group_id in zip(
concurrency_tags, ["wait_for_es_dags", "wait_for_staging_db_dags"]
):
wait_for_external_dags_with_tag.override(group_id=group_id)(
tag=tag,
# Exclude the other data refresh DAG ids for this environment, as waiting on these
# was handled in the previous task.
excluded_dag_ids=external_dag_ids,
)


@task
Expand Down Expand Up @@ -127,7 +131,9 @@ def create_data_refresh_dag(
**data_refresh_config.default_args,
}

concurrency_tag = ES_CONCURRENCY_TAGS.get(target_environment)
concurrency_tags = [ES_CONCURRENCY_TAGS.get(target_environment)]
if target_environment == STAGING:
concurrency_tags.append(STAGING_DB_CONCURRENCY_TAG)

dag = DAG(
dag_id=f"{target_environment}_{data_refresh_config.dag_id}",
Expand All @@ -143,7 +149,7 @@ def create_data_refresh_dag(
tags=[
"data_refresh",
f"{target_environment}_data_refresh",
concurrency_tag,
*concurrency_tags,
],
render_template_as_native_obj=True,
params={
Expand Down Expand Up @@ -183,7 +189,7 @@ def create_data_refresh_dag(
wait_for_dags = wait_for_conflicting_dags(
data_refresh_config,
external_dag_ids,
concurrency_tag,
concurrency_tags,
"{{ params.allow_concurrent_data_refreshes }}",
)

Expand Down

0 comments on commit 0fe2eb4

Please sign in to comment.