diff --git a/cosmos/config.py b/cosmos/config.py index c5e7a69a30..4798312124 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -57,6 +57,7 @@ class RenderConfig: dbt_executable_path: str | Path = get_system_dbt() env_vars: dict[str, str] | None = None dbt_project_path: InitVar[str | Path | None] = None + dbt_ls_path: Path | None = None project_path: Path | None = field(init=False) @@ -89,6 +90,15 @@ def validate_dbt_command(self, fallback_cmd: str | Path = "") -> None: f"<{self.dbt_executable_path}>" + (f" and <{fallback_cmd}>." if fallback_cmd else ".") ) + def is_dbt_ls_file_available(self) -> bool: + """ + Check if the `dbt ls` output is set and if the file exists. + """ + if not self.dbt_ls_path: + return False + + return self.dbt_ls_path.exists() + class ProjectConfig: """ @@ -285,7 +295,6 @@ class ExecutionConfig: dbt_executable_path: str | Path = field(default_factory=get_system_dbt) dbt_project_path: InitVar[str | Path | None] = None - project_path: Path | None = field(init=False) def __post_init__(self, dbt_project_path: str | Path | None) -> None: diff --git a/cosmos/constants.py b/cosmos/constants.py index 9aa38c34e6..96c5bdd070 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -27,6 +27,7 @@ class LoadMode(Enum): AUTOMATIC = "automatic" CUSTOM = "custom" DBT_LS = "dbt_ls" + DBT_LS_FILE = "dbt_ls_file" DBT_MANIFEST = "dbt_manifest" diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 92ef5e66fb..bae32f344b 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -163,6 +163,7 @@ def load( load_method = { LoadMode.CUSTOM: self.load_via_custom_parser, LoadMode.DBT_LS: self.load_via_dbt_ls, + LoadMode.DBT_LS_FILE: self.load_via_dbt_ls_file, LoadMode.DBT_MANIFEST: self.load_from_dbt_manifest, } @@ -277,6 +278,30 @@ def load_via_dbt_ls(self) -> None: logger.info("Total nodes: %i", len(self.nodes)) logger.info("Total filtered nodes: %i", len(self.nodes)) + def load_via_dbt_ls_file(self) -> None: + """ + This is between dbt ls and full manifest. It allows to use the output (needs to be json output) of the dbt ls as a + file stored in the image you run Cosmos on. The advantage is that you can use the parser from LoadMode.DBT_LS without + actually running dbt ls every time. BUT you will need one dbt ls file for each separate group. + + This technically should increase performance and also removes the necessity to have your whole dbt project copied + to the airflow image. + """ + logger.info("Trying to parse the dbt project `%s` using a dbt ls output file...", self.project.project_name) + + if not self.render_config.is_dbt_ls_file_available(): + raise CosmosLoadDbtException(f"Unable to load dbt ls file using {self.render_config.dbt_ls_path}") + + project_path = self.render_config.project_path + if not project_path: + raise CosmosLoadDbtException("Unable to load dbt ls file without RenderConfig.project_path") + with open(self.render_config.dbt_ls_path) as fp: # type: ignore[arg-type] + dbt_ls_output = fp.read() + nodes = parse_dbt_ls_output(project_path=project_path, ls_stdout=dbt_ls_output) + + self.nodes = nodes + self.filtered_nodes = nodes + def load_via_custom_parser(self) -> None: """ This is the least accurate way of loading `dbt` projects and filtering them out, since it uses custom Cosmos diff --git a/dev/dags/dbt/jaffle_shop/dbt_ls_models_staging.txt b/dev/dags/dbt/jaffle_shop/dbt_ls_models_staging.txt new file mode 100644 index 0000000000..b8cc902ec0 --- /dev/null +++ b/dev/dags/dbt/jaffle_shop/dbt_ls_models_staging.txt @@ -0,0 +1,9 @@ +14:26:04 Running with dbt=1.6.9 +14:26:04 Registered adapter: exasol=1.6.2 +14:26:04 Found 5 models, 3 seeds, 20 tests, 0 sources, 0 exposures, 0 metrics, 366 macros, 0 groups, 0 semantic models +{"name": "stg_customers", "resource_type": "model", "package_name": "jaffle_shop", "original_file_path": "models/staging/stg_customers.sql", "unique_id": "model.jaffle_shop.stg_customers", "alias": "stg_customers", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": [], "nodes": ["seed.jaffle_shop.raw_customers"]}} +{"name": "stg_orders", "resource_type": "model", "package_name": "jaffle_shop", "original_file_path": "models/staging/stg_orders.sql", "unique_id": "model.jaffle_shop.stg_orders", "alias": "stg_orders", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": [], "nodes": ["seed.jaffle_shop.raw_orders"]}} +{"name": "stg_payments", "resource_type": "model", "package_name": "jaffle_shop", "original_file_path": "models/staging/stg_payments.sql", "unique_id": "model.jaffle_shop.stg_payments", "alias": "stg_payments", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": [], "nodes": ["seed.jaffle_shop.raw_payments"]}} +{"name": "raw_customers", "resource_type": "seed", "package_name": "jaffle_shop", "original_file_path": "seeds/raw_customers.csv", "unique_id": "seed.jaffle_shop.raw_customers", "alias": "raw_customers", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "seed", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "quote_columns": null, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": []}} +{"name": "raw_orders", "resource_type": "seed", "package_name": "jaffle_shop", "original_file_path": "seeds/raw_orders.csv", "unique_id": "seed.jaffle_shop.raw_orders", "alias": "raw_orders", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "seed", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "quote_columns": null, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": []}} +{"name": "raw_payments", "resource_type": "seed", "package_name": "jaffle_shop", "original_file_path": "seeds/raw_payments.csv", "unique_id": "seed.jaffle_shop.raw_payments", "alias": "raw_payments", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "seed", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "quote_columns": null, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": []}} diff --git a/dev/dags/user_defined_profile.py b/dev/dags/user_defined_profile.py index ab30cdb2fe..032915d0ab 100644 --- a/dev/dags/user_defined_profile.py +++ b/dev/dags/user_defined_profile.py @@ -8,11 +8,12 @@ from airflow.decorators import dag from airflow.operators.empty import EmptyOperator -from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig +from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig, LoadMode DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) PROFILES_FILE_PATH = Path(DBT_ROOT_PATH, "jaffle_shop", "profiles.yml") +DBT_LS_PATH = Path(DBT_ROOT_PATH, "jaffle_shop", "dbt_ls_models_staging.txt") @dag( @@ -35,6 +36,10 @@ def user_defined_profile() -> None: target_name="dev", profiles_yml_filepath=PROFILES_FILE_PATH, ), + render_config=RenderConfig( + load_method=LoadMode.DBT_LS_FILE, + dbt_ls_path=DBT_LS_PATH, + ), operator_args={"append_env": True, "install_deps": True}, default_args={"retries": 2}, ) diff --git a/docs/configuration/parsing-methods.rst b/docs/configuration/parsing-methods.rst index fbf6e43bfd..ef50bdb4e6 100644 --- a/docs/configuration/parsing-methods.rst +++ b/docs/configuration/parsing-methods.rst @@ -8,12 +8,14 @@ Cosmos offers several options to parse your dbt project: - ``automatic``. Tries to find a user-supplied ``manifest.json`` file. If it can't find one, it will run ``dbt ls`` to generate one. If that fails, it will use Cosmos' dbt parser. - ``dbt_manifest``. Parses a user-supplied ``manifest.json`` file. This can be generated manually with dbt commands or via a CI/CD process. - ``dbt_ls``. Parses a dbt project directory using the ``dbt ls`` command. +- ``dbt_ls_file``. Parses a dbt project directory using the output of ``dbt ls`` command from a file. - ``custom``. Uses Cosmos' custom dbt parser, which extracts dependencies from your dbt's model code. There are benefits and drawbacks to each method: - ``dbt_manifest``: You have to generate the manifest file on your own. When using the manifest, Cosmos gets a complete set of metadata about your models. However, Cosmos uses its own selecting & excluding logic to determine which models to run, which may not be as robust as dbt's. - ``dbt_ls``: Cosmos will generate the manifest file for you. This method uses dbt's metadata AND dbt's selecting/excluding logic. This is the most robust method. However, this requires the dbt executable to be installed on your machine (either on the host directly or in a virtual environment). +- ``dbt_ls_file`` (new in 1.3): Path to a file containing the ``dbt ls`` output. To use this method, run ``dbt ls`` using ``--output json`` and store the output in a file. ``RenderConfig.select`` and ``RenderConfig.exclude`` will not work using this method. - ``custom``: Cosmos will parse your project and model files for you. This means that Cosmos will not have access to dbt's metadata. However, this method does not require the dbt executable to be installed on your machine. If you're using the ``local`` mode, you should use the ``dbt_ls`` method. @@ -75,6 +77,26 @@ To use this: # ..., ) +``dbt_ls_file`` +---------------- + +.. note:: + New in Cosmos 1.3. + +If you provide the output of ``dbt ls --output json`` as a file, you can use this to parse similar to ``dbt_ls``. +You can supply a ``dbt_ls_path`` parameter on the DbtDag / DbtTaskGroup with a path to a ``dbt_ls_output.txt`` file. +Check `this Dag `_ for an example. + +To use this: + +.. code-block:: python + + DbtDag( + render_config=RenderConfig( + load_method=LoadMode.DBT_MANIFEST, dbt_ls_path="/path/to/dbt_ls_file.txt" + ) + # ..., + ) ``custom`` ---------- diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 3b80424b61..a09cb054ca 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -25,6 +25,7 @@ SAMPLE_MANIFEST_PY = Path(__file__).parent.parent / "sample/manifest_python.json" SAMPLE_MANIFEST_MODEL_VERSION = Path(__file__).parent.parent / "sample/manifest_model_version.json" SAMPLE_MANIFEST_SOURCE = Path(__file__).parent.parent / "sample/manifest_source.json" +SAMPLE_DBT_LS_OUTPUT = Path(__file__).parent.parent / "sample/sample_dbt_ls.txt" @pytest.fixture @@ -112,6 +113,52 @@ def test_load_automatic_manifest_is_available(mock_load_from_dbt_manifest): assert mock_load_from_dbt_manifest.called +@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_file", return_value=None) +def test_load_automatic_dbt_ls_file_is_available(mock_load_via_dbt_ls_file): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + render_config = RenderConfig(dbt_ls_path=SAMPLE_DBT_LS_OUTPUT) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, render_config=render_config) + dbt_graph.load(method=LoadMode.DBT_LS_FILE, execution_mode=ExecutionMode.LOCAL) + assert mock_load_via_dbt_ls_file.called + + +def test_load_dbt_ls_file_without_file(): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + render_config = RenderConfig(dbt_ls_path=None) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, render_config=render_config) + with pytest.raises(CosmosLoadDbtException) as err_info: + dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_LS_FILE) + assert err_info.value.args[0] == "Unable to load dbt ls file using None" + + +def test_load_dbt_ls_file_without_project_path(): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + render_config = RenderConfig(dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, dbt_project_path=None) + dbt_graph = DbtGraph( + project=project_config, + profile_config=profile_config, + render_config=render_config, + ) + with pytest.raises(CosmosLoadDbtException) as err_info: + dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_LS_FILE) + assert err_info.value.args[0] == "Unable to load dbt ls file without RenderConfig.project_path" + + @patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", side_effect=None) @patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", return_value=None) def test_load_automatic_without_manifest_with_profile_yml(mock_load_via_dbt_ls, mock_load_via_custom_parser): @@ -202,8 +249,15 @@ def test_load_manifest_with_manifest(mock_load_from_dbt_manifest): @patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", return_value=None) @patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", return_value=None) @patch("cosmos.dbt.graph.DbtGraph.load_from_dbt_manifest", return_value=None) +@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_file", return_value=None) def test_load( - mock_load_from_dbt_manifest, mock_load_via_dbt_ls, mock_load_via_custom_parser, exec_mode, method, expected_function + mock_load_from_dbt_manifest, + mock_load_via_dbt_ls_file, + mock_load_via_dbt_ls, + mock_load_via_custom_parser, + exec_mode, + method, + expected_function, ): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) profile_config = ProfileConfig( @@ -725,6 +779,38 @@ def test_load_dbt_ls_and_manifest_with_model_version(load_method): } == set(dbt_graph.nodes["model.jaffle_shop.orders"].depends_on) +@pytest.mark.integration +def test_load_via_dbt_ls_file(): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + render_config = RenderConfig( + dbt_ls_path=SAMPLE_DBT_LS_OUTPUT, dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME + ) + dbt_graph = DbtGraph( + project=project_config, + profile_config=profile_config, + render_config=render_config, + ) + dbt_graph.load(method=LoadMode.DBT_LS_FILE, execution_mode=ExecutionMode.LOCAL) + + expected_dbt_nodes = { + "model.jaffle_shop.stg_customers": "stg_customers", + "model.jaffle_shop.stg_orders": "stg_orders", + "model.jaffle_shop.stg_payments": "stg_payments", + } + for unique_id, name in expected_dbt_nodes.items(): + assert unique_id in dbt_graph.nodes + assert name == dbt_graph.nodes[unique_id].name + # Test dependencies + assert {"seed.jaffle_shop.raw_customers"} == set(dbt_graph.nodes["model.jaffle_shop.stg_customers"].depends_on) + assert {"seed.jaffle_shop.raw_orders"} == set(dbt_graph.nodes["model.jaffle_shop.stg_orders"].depends_on) + assert {"seed.jaffle_shop.raw_payments"} == set(dbt_graph.nodes["model.jaffle_shop.stg_payments"].depends_on) + + @pytest.mark.parametrize( "stdout,returncode", [ diff --git a/tests/sample/sample_dbt_ls.txt b/tests/sample/sample_dbt_ls.txt new file mode 100644 index 0000000000..b356a5208c --- /dev/null +++ b/tests/sample/sample_dbt_ls.txt @@ -0,0 +1,6 @@ +14:26:04 Running with dbt=1.6.9 +14:26:04 Registered adapter: exasol=1.6.2 +14:26:04 Found 5 models, 3 seeds, 20 tests, 0 sources, 0 exposures, 0 metrics, 366 macros, 0 groups, 0 semantic models +{"name": "stg_customers", "resource_type": "model", "package_name": "jaffle_shop", "original_file_path": "models/staging/stg_customers.sql", "unique_id": "model.jaffle_shop.stg_customers", "alias": "stg_customers", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": [], "nodes": ["seed.jaffle_shop.raw_customers"]}} +{"name": "stg_orders", "resource_type": "model", "package_name": "jaffle_shop", "original_file_path": "models/staging/stg_orders.sql", "unique_id": "model.jaffle_shop.stg_orders", "alias": "stg_orders", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": [], "nodes": ["seed.jaffle_shop.raw_orders"]}} +{"name": "stg_payments", "resource_type": "model", "package_name": "jaffle_shop", "original_file_path": "models/staging/stg_payments.sql", "unique_id": "model.jaffle_shop.stg_payments", "alias": "stg_payments", "config": {"enabled": true, "alias": null, "schema": null, "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false}, "post-hook": [], "pre-hook": []}, "tags": [], "depends_on": {"macros": [], "nodes": ["seed.jaffle_shop.raw_payments"]}} diff --git a/tests/test_config.py b/tests/test_config.py index 734303a3e5..6fa53b10ca 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -176,6 +176,16 @@ def test_render_config_uses_default_if_exists(mock_which): assert render_config.dbt_executable_path == "user-dbt" +def test_is_dbt_ls_file_available_is_true(): + render_config = RenderConfig(dbt_ls_path=DBT_PROJECTS_ROOT_DIR / "sample_dbt_ls.txt") + assert render_config.is_dbt_ls_file_available() + + +def test_is_dbt_ls_file_available_is_false(): + render_config = RenderConfig(dbt_ls_path=None) + assert not render_config.is_dbt_ls_file_available() + + def test_render_config_env_vars_deprecated(): """RenderConfig.env_vars is deprecated since Cosmos 1.3, should warn user.""" with pytest.deprecated_call():