diff --git a/benchmark/experiment/airflow-test-cosmos-sync.yaml b/benchmark/experiment/airflow-test-cosmos-sync.yaml new file mode 100644 index 0000000..119807c --- /dev/null +++ b/benchmark/experiment/airflow-test-cosmos-sync.yaml @@ -0,0 +1,28 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: airflow-test-cosmos-sync +spec: + template: + spec: + containers: + - name: cosmos-sync + image: benchmark:0.0.3 + imagePullPolicy: Never + command: ["airflow", "dags", "test", "cosmos_bq_sync"] + env: + - name: AIRFLOW_CONN_GCP_GS_CONN + valueFrom: + secretKeyRef: + name: gcp-credentials + key: airflow-conn + resources: + # Equivalent to Astro's A10 instance + requests: + cpu: "2" + memory: "4Gi" + limits: + cpu: "2" + memory: "4Gi" + restartPolicy: Never + backoffLimit: 0 diff --git a/dags/cosmos_sync_dag.py b/dags/cosmos_sync_dag.py new file mode 100644 index 0000000..c44d5aa --- /dev/null +++ b/dags/cosmos_sync_dag.py @@ -0,0 +1,45 @@ +from datetime import datetime +from pathlib import Path + +from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig +from cosmos.operators._asynchronous.bigquery import DbtRunAirflowAsyncBigqueryOperator +from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping +from include.constants import BIGQUERY_DATASET, DBT_ADAPTER_VERSION, GCP_PROJECT_ID + +DBT_PROJECT_PATH = Path("/usr/local/airflow/dbt/altered_jaffle_shop") + + + +profile_config = ProfileConfig( + profile_name="altered_jaffle_shop", + target_name="dev", + profile_mapping=GoogleCloudServiceAccountDictProfileMapping( + conn_id="gcp_gs_conn", profile_args={"dataset": BIGQUERY_DATASET, "project": GCP_PROJECT_ID} + ), +) + + +cosmos_bq_sync = DbtDag( + # dbt/cosmos-specific parameters + project_config=ProjectConfig(DBT_PROJECT_PATH), + profile_config=profile_config, + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.AIRFLOW_ASYNC, + async_py_requirements=[f"dbt-bigquery=={DBT_ADAPTER_VERSION}"], + ), + # normal dag parameters + schedule=None, + start_date=datetime(2026, 1, 1), + catchup=False, + dag_id="cosmos_bq_sync", + tags=["simple"], + operator_args={ + "location": "US", + "install_deps": True, + "full_refresh": True, + }, +) + +for task in cosmos_bq_sync.tasks: + if isinstance(task, DbtRunAirflowAsyncBigqueryOperator): + task.deferrable = False