Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ jobs:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
DATABRICKS_HOST: mock
DATABRICKS_WAREHOUSE_ID: mock
DATABRICKS_TOKEN: mock
Expand Down Expand Up @@ -213,6 +216,9 @@ jobs:
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }}
DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}
Expand Down Expand Up @@ -275,6 +281,9 @@ jobs:
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }}
Expand Down Expand Up @@ -348,6 +357,9 @@ jobs:
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }}
AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }}
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }}
Expand Down
43 changes: 34 additions & 9 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
from pathlib import Path
from typing import Any, Callable, Iterator

from airflow.version import version as airflow_version

from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled
from cosmos.constants import (
DEFAULT_PROFILES_FILE_NAME,
FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP,
DbtResourceType,
ExecutionMode,
InvocationMode,
Expand All @@ -24,6 +27,7 @@
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.profiles import BaseProfileMapping
from cosmos.settings import AIRFLOW_IO_AVAILABLE

logger = get_logger(__name__)

Expand Down Expand Up @@ -150,6 +154,7 @@ def __init__(
seeds_relative_path: str | Path = "seeds",
snapshots_relative_path: str | Path = "snapshots",
manifest_path: str | Path | None = None,
manifest_conn_id: str | None = None,
project_name: str | None = None,
env_vars: dict[str, str] | None = None,
dbt_vars: dict[str, str] | None = None,
Expand All @@ -175,7 +180,25 @@ def __init__(
self.project_name = self.dbt_project_path.stem

if manifest_path:
self.manifest_path = Path(manifest_path)
manifest_path_str = str(manifest_path)
if not manifest_conn_id:
manifest_scheme = manifest_path_str.split("://")[0]
# Use the default Airflow connection ID for the scheme if it is not provided.
manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, None)

if manifest_conn_id is not None and not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object "
f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
f"Airflow 2.8 or later."
)

if AIRFLOW_IO_AVAILABLE:
from airflow.io.path import ObjectStoragePath

self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id)
else:
self.manifest_path = Path(manifest_path_str)

self.env_vars = env_vars
self.dbt_vars = dbt_vars
Expand All @@ -192,28 +215,30 @@ def validate_project(self) -> None:
"""

mandatory_paths = {}

# We validate the existence of paths added to the `mandatory_paths` map by calling the `exists()` method on each
# one. Starting with Cosmos 1.6.0, if the Airflow version is `>= 2.8.0` and a `manifest_path` is provided, we
# cast it to an `airflow.io.path.ObjectStoragePath` instance during `ProjectConfig` initialisation, and it
# includes the `exists()` method. For the remaining paths in the `mandatory_paths` map, we cast them to
# `pathlib.Path` objects to ensure that the subsequent `exists()` call while iterating on the `mandatory_paths`
# map works correctly for all paths, thereby validating the project.
if self.dbt_project_path:
project_yml_path = self.dbt_project_path / "dbt_project.yml"
mandatory_paths = {
"dbt_project.yml": project_yml_path,
"models directory ": self.models_path,
"dbt_project.yml": Path(project_yml_path) if project_yml_path else None,
Comment thread
pankajkoti marked this conversation as resolved.
"models directory ": Path(self.models_path) if self.models_path else None,
}
if self.manifest_path:
mandatory_paths["manifest"] = self.manifest_path

for name, path in mandatory_paths.items():
if path is None or not Path(path).exists():
if path is None or not path.exists():
raise CosmosValueError(f"Could not find {name} at {path}")

def is_manifest_available(self) -> bool:
"""
Check if the `dbt` project manifest is set and if the file exists.
"""
if not self.manifest_path:
return False

return self.manifest_path.exists()
return self.manifest_path.exists() if self.manifest_path else False


@dataclass
Expand Down
14 changes: 14 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from pathlib import Path

import aenum
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from packaging.version import Version

DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
Expand All @@ -28,6 +31,17 @@
PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")]


S3_FILE_SCHEME = "s3"
GS_FILE_SCHEME = "gs"
ABFS_FILE_SCHEME = "abfs"

FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP = {
S3_FILE_SCHEME: S3Hook.default_conn_name,
GS_FILE_SCHEME: GCSHook.default_conn_name,
ABFS_FILE_SCHEME: WasbHook.default_conn_name,
}


class LoadMode(Enum):
"""
Supported ways to load a `dbt` project into a `DbtGraph` instance.
Expand Down
8 changes: 6 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from functools import cached_property
from pathlib import Path
from subprocess import PIPE, Popen
from typing import Any
from typing import TYPE_CHECKING, Any

from airflow.models import Variable

Expand Down Expand Up @@ -611,7 +611,11 @@ def load_from_dbt_manifest(self) -> None:
raise CosmosLoadDbtException("Unable to load manifest without ExecutionConfig.dbt_project_path")

nodes = {}
with open(self.project.manifest_path) as fp: # type: ignore[arg-type]

if TYPE_CHECKING:
assert self.project.manifest_path is not None # pragma: no cover

with self.project.manifest_path.open() as fp:
manifest = json.load(fp)

resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
Expand Down
4 changes: 4 additions & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import airflow
from airflow.configuration import conf
from airflow.version import version as airflow_version
from packaging.version import Version

from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE

Expand All @@ -24,3 +26,5 @@
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)

AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")
97 changes: 82 additions & 15 deletions dev/dags/cosmos_manifest_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.profiles import DbtProfileConfigVars, PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

execution_config = ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop")

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
Expand All @@ -22,22 +27,84 @@
),
)

# [START local_example]
cosmos_manifest_example = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:seeds/raw_customers.csv"]),
execution_config=ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop"),
operator_args={"install_deps": True},
# normal dag parameters
render_config = RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:seeds/raw_customers.csv"])


@dag(
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="cosmos_manifest_example",
default_args={"retries": 2},
)
# [END local_example]
def cosmos_manifest_example() -> None:

pre_dbt = EmptyOperator(task_id="pre_dbt")

# [START local_example]
local_example = DbtTaskGroup(
group_id="local_example",
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END local_example]

# [START aws_s3_example]
aws_s3_example = DbtTaskGroup(
group_id="aws_s3_example",
project_config=ProjectConfig(
manifest_path="s3://cosmos-manifest-test/manifest.json",
manifest_conn_id="aws_s3_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used.
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END aws_s3_example]

# [START gcp_gs_example]
gcp_gs_example = DbtTaskGroup(
group_id="gcp_gs_example",
project_config=ProjectConfig(
manifest_path="gs://cosmos-manifest-test/manifest.json",
manifest_conn_id="gcp_gs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `google_cloud_default` is used.
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END gcp_gs_example]

# [START azure_abfs_example]
azure_abfs_example = DbtTaskGroup(
group_id="azure_abfs_example",
project_config=ProjectConfig(
manifest_path="abfs://cosmos-manifest-test/manifest.json",
manifest_conn_id="azure_abfs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `wasb_default` is used.
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=render_config,
execution_config=execution_config,
operator_args={"install_deps": True},
)
# [END azure_abfs_example]

post_dbt = EmptyOperator(task_id="post_dbt")

(pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt)


cosmos_manifest_example()
59 changes: 48 additions & 11 deletions docs/configuration/parsing-methods.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,63 @@ When you don't supply an argument to the ``load_mode`` parameter (or you supply

To use this method, you don't need to supply any additional config. This is the default.


``dbt_manifest``
----------------

If you already have a ``manifest.json`` file created by dbt, Cosmos will parse the manifest to generate your DAG.

You can supply a ``manifest_path`` parameter on the DbtDag / DbtTaskGroup with a path to a ``manifest.json`` file.

To use this:
Before Cosmos 1.6.0, the path to ``manifest.json`` supplied via the ``DbtDag`` / ``DbtTaskGroup`` ``manifest_path``
argument accepted only local paths. However, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, you can supply a
a remote path (e.g., an S3 URL) too. For supporting remote paths, Cosmos leverages the
`Airflow Object Storage <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/objectstorage.html>`_
feature released in Airflow 2.8.0.
For remote paths, you can specify a ``manifest_conn_id``, which is an
Airflow connection ID containing the credentials to access the remote path. If you do not specify a
``manifest_conn_id``, Cosmos will use the default connection ID specific to the scheme, identified using the Airflow
hook's ``default_conn_id`` corresponding to the URL's scheme.

.. code-block:: python
Examples of how to supply ``manifest.json`` using ``manifest_path`` argument:

- Local path:

.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py
:language: python
:start-after: [START local_example]
:end-before: [END local_example]

- AWS S3 URL (available since Cosmos 1.6):

Ensure that you have the required dependencies installed to use the S3 URL. You can install the required dependencies
using the following command: ``pip install "astronomer-cosmos[amazon]"``

.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py
:language: python
:start-after: [START aws_s3_example]
:end-before: [END aws_s3_example]

- GCP GCS URL (available since Cosmos 1.6):

Ensure that you have the required dependencies installed to use the GCS URL. You can install the required dependencies
using the following command: ``pip install "astronomer-cosmos[google]"``

.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py
:language: python
:start-after: [START gcp_gs_example]
:end-before: [END gcp_gs_example]

- Azure Blob Storage URL (available since Cosmos 1.6):

Ensure that you have the required dependencies installed to use the Azure blob URL. You can install the required
dependencies using the following command: ``pip install "astronomer-cosmos[microsoft]"``

.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py
:language: python
:start-after: [START azure_abfs_example]
:end-before: [END azure_abfs_example]

DbtDag(
project_config=ProjectConfig(
manifest_path="/path/to/manifest.json",
),
render_config=RenderConfig(
load_method=LoadMode.DBT_MANIFEST,
),
# ...,
)

``dbt_ls``
----------
Expand Down
Loading