diff --git a/docs/apache-airflow-providers-dbt-cloud/index.rst b/docs/apache-airflow-providers-dbt-cloud/index.rst index ecfa51fc0ac59..03888f00669bf 100644 --- a/docs/apache-airflow-providers-dbt-cloud/index.rst +++ b/docs/apache-airflow-providers-dbt-cloud/index.rst @@ -43,7 +43,7 @@ Content :maxdepth: 1 :caption: Resources - Example DAGs + Example DAGs PyPI Repository Installing from sources diff --git a/docs/apache-airflow-providers-dbt-cloud/operators.rst b/docs/apache-airflow-providers-dbt-cloud/operators.rst index 3e625927a6ff2..8ae9cd3d2d42a 100644 --- a/docs/apache-airflow-providers-dbt-cloud/operators.rst +++ b/docs/apache-airflow-providers-dbt-cloud/operators.rst @@ -51,7 +51,7 @@ The below examples demonstrate how to instantiate DbtCloudRunJobOperator tasks w asynchronous waiting for run termination, respectively. To note, the ``account_id`` for the operators is referenced within the ``default_args`` of the example DAG. -.. exampleinclude:: /../../airflow/providers/dbt/cloud/example_dags/example_dbt_cloud.py +.. exampleinclude:: /../../tests/system/providers/dbt/example_dbt_cloud.py :language: python :dedent: 4 :start-after: [START howto_operator_dbt_cloud_run_job] @@ -60,7 +60,7 @@ referenced within the ``default_args`` of the example DAG. This next example also shows how to pass in custom runtime configuration (in this case for ``threads_override``) via the ``additional_run_config`` dictionary. -.. exampleinclude:: /../../airflow/providers/dbt/cloud/example_dags/example_dbt_cloud.py +.. exampleinclude:: /../../tests/system/providers/dbt/example_dbt_cloud.py :language: python :dedent: 4 :start-after: [START howto_operator_dbt_cloud_run_job_async] @@ -80,7 +80,7 @@ In the example below, the ``run_id`` value in the example below comes from the o DbtCloudRunJobOperator task by utilizing the ``.output`` property exposed for all operators. Also, to note, the ``account_id`` for the task is referenced within the ``default_args`` of the example DAG. -.. exampleinclude:: /../../airflow/providers/dbt/cloud/example_dags/example_dbt_cloud.py +.. exampleinclude:: /../../tests/system/providers/dbt/example_dbt_cloud.py :language: python :dedent: 4 :start-after: [START howto_operator_dbt_cloud_run_job_sensor] @@ -101,7 +101,7 @@ downloaded. For more information on dbt Cloud artifacts, reference `this documentation `__. -.. exampleinclude:: /../../airflow/providers/dbt/cloud/example_dags/example_dbt_cloud.py +.. exampleinclude:: /../../tests/system/providers/dbt/example_dbt_cloud.py :language: python :dedent: 4 :start-after: [START howto_operator_dbt_cloud_get_artifact] diff --git a/airflow/providers/dbt/cloud/example_dags/__init__.py b/tests/system/providers/dbt/__init__.py similarity index 99% rename from airflow/providers/dbt/cloud/example_dags/__init__.py rename to tests/system/providers/dbt/__init__.py index 13a83393a9124..217e5db960782 100644 --- a/airflow/providers/dbt/cloud/example_dags/__init__.py +++ b/tests/system/providers/dbt/__init__.py @@ -1,3 +1,4 @@ +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information diff --git a/airflow/providers/dbt/cloud/example_dags/example_dbt_cloud.py b/tests/system/providers/dbt/example_dbt_cloud.py similarity index 80% rename from airflow/providers/dbt/cloud/example_dags/example_dbt_cloud.py rename to tests/system/providers/dbt/example_dbt_cloud.py index aa7f220bd2715..e03c856519374 100644 --- a/airflow/providers/dbt/cloud/example_dags/example_dbt_cloud.py +++ b/tests/system/providers/dbt/example_dbt_cloud.py @@ -17,21 +17,26 @@ from datetime import datetime -from airflow.models import DAG, BaseOperator +from airflow.models import DAG try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore + from airflow.providers.dbt.cloud.operators.dbt import ( DbtCloudGetJobRunArtifactOperator, DbtCloudRunJobOperator, ) from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor from airflow.utils.edgemodifier import Label +from tests.system.utils import get_test_env_id + +ENV_ID = get_test_env_id() +DAG_ID = "example_dbt_cloud" with DAG( - dag_id="example_dbt_cloud", + dag_id=DAG_ID, default_args={"dbt_cloud_conn_id": "dbt", "account_id": 39151}, start_date=datetime(2021, 1, 1), schedule_interval=None, @@ -50,7 +55,7 @@ # [END howto_operator_dbt_cloud_run_job] # [START howto_operator_dbt_cloud_get_artifact] - get_run_results_artifact: BaseOperator = DbtCloudGetJobRunArtifactOperator( + get_run_results_artifact = DbtCloudGetJobRunArtifactOperator( task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json" ) # [END howto_operator_dbt_cloud_get_artifact] @@ -65,7 +70,7 @@ # [END howto_operator_dbt_cloud_run_job_async] # [START howto_operator_dbt_cloud_run_job_sensor] - job_run_sensor: BaseOperator = DbtCloudJobRunSensor( + job_run_sensor = DbtCloudJobRunSensor( task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20 ) # [END howto_operator_dbt_cloud_run_job_sensor] @@ -77,3 +82,15 @@ # Task dependency created via `XComArgs`: # trigger_job_run1 >> get_run_results_artifact # trigger_job_run2 >> job_run_sensor + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/utils/__init__.py b/tests/system/utils/__init__.py index 8346f83469298..e71b0e1628411 100644 --- a/tests/system/utils/__init__.py +++ b/tests/system/utils/__init__.py @@ -14,6 +14,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import os + from airflow.utils.state import State @@ -23,3 +25,7 @@ def test_run(): dag.run() return test_run + + +def get_test_env_id(env_var_name: str = "SYSTEM_TESTS_ENV_ID"): + return os.environ.get(env_var_name)