Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

from airflow.models import Variable

try:
import orjson
except ImportError: # pragma: no cover
orjson = None # type: ignore[assignment]

if TYPE_CHECKING:
try:
# Airflow 3 onwards
Expand Down Expand Up @@ -1213,6 +1218,31 @@ def _apply_manifest_node_selection(self, nodes: dict[str, DbtNode], manifest: di
exclude=self.render_config.exclude,
)

def _load_manifest_from_file(self, manifest_path: Path | ObjectStoragePath) -> dict[str, Any]:
"""
Load and parse a dbt manifest JSON file.

Uses orjson for faster parsing if enabled and available, otherwise falls back to standard json.

Args:
manifest_path: Path to the manifest.json file

Returns:
Parsed manifest dictionary

Raises:
CosmosLoadDbtException: If orjson is enabled but not installed
"""
if settings.enable_orjson_parser and orjson:
return orjson.loads(manifest_path.read_bytes()) # type: ignore[no-any-return]
elif settings.enable_orjson_parser:
raise CosmosLoadDbtException(
"orjson is not installed. Install it with: pip install 'astronomer-cosmos[orjson]'"
)
else:
with manifest_path.open() as fp:
return json.load(fp) # type: ignore[no-any-return]

def load_from_dbt_manifest(self) -> None:
"""
This approach accurately loads `dbt` projects using the `manifest.json` dbt manifest artifact.
Expand All @@ -1236,8 +1266,7 @@ def load_from_dbt_manifest(self) -> None:
if TYPE_CHECKING:
assert self.project.manifest_path is not None # pragma: no cover

with self.project.manifest_path.open() as fp:
manifest = json.load(fp) or {}
manifest = self._load_manifest_from_file(self.project.manifest_path)

project_path = self.execution_config.project_path
nodes = self._load_nodes_from_manifest_data(manifest, project_path)
Expand Down
3 changes: 3 additions & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,6 @@ def convert_to_boolean(value: str | None) -> bool:
# Debug mode - when enabled, Cosmos will track and push memory utilization to XCom
enable_debug_mode = conf.getboolean("cosmos", "enable_debug_mode", fallback=False)
debug_memory_poll_interval_seconds = conf.getfloat("cosmos", "debug_memory_poll_interval_seconds", fallback=0.5)

# Experimental: use orjson for faster dbt manifest.json parsing (disabled by default)
enable_orjson_parser = conf.getboolean("cosmos", "enable_orjson_parser", fallback=False)
17 changes: 17 additions & 0 deletions docs/reference/configs/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,23 @@ This page lists all available Airflow configurations that affect ``astronomer-co
- Default: ``0.5``
- Environment Variable: ``AIRFLOW__COSMOS__DEBUG_MEMORY_POLL_INTERVAL_SECONDS``

.. _enable_orjson_parser:

`enable_orjson_parser`_:
(Experimental, introduced in Cosmos 1.15.0) When enabled, Cosmos uses `orjson <https://github.com/ijl/orjson>`_ to parse
``manifest.json`` files instead of the standard library ``json`` module. orjson is a fast, Rust-based JSON
library that can significantly reduce DAG parsing time for large dbt projects with big manifests.

Benchmarks show up to 40% faster parsing compared to the standard ``json`` module, with the improvement
scaling with manifest file size.

Requires the optional ``orjson`` dependency: ``pip install 'astronomer-cosmos[orjson]'``.
If this setting is ``True`` but ``orjson`` is not installed, Cosmos raises a
``CosmosLoadDbtException`` at parse time with an actionable error message.

- Default: ``False``
- Environment Variable: ``AIRFLOW__COSMOS__ENABLE_ORJSON_PARSER``

.. _watcher_dbt_execution_queue:

`watcher_dbt_execution_queue`_:
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ dbt-sqlserver = ["dbt-sqlserver"]
dbt-teradata = ["dbt-teradata"]
dbt-vertica = ["dbt-vertica<=1.5.4"]
openlineage = ["openlineage-integration-common!=1.15.0", "openlineage-airflow"]
orjson = ["orjson"]
amazon = [
"apache-airflow-providers-amazon[s3fs]>=3.0.0",
]
Expand Down Expand Up @@ -167,6 +168,7 @@ dependencies = [
"types-python-dateutil",
"Werkzeug<3.0.0",
"pytest-asyncio",
"orjson",
]
pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"]

Expand Down
119 changes: 119 additions & 0 deletions tests/dbt/test_orjson_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Unit tests for the experimental orjson parser feature.

Covers:
- Default setting value
- Error when orjson is enabled but not installed
- Standard json is used when setting is disabled
- orjson produces identical DbtGraph output to standard json
"""

from pathlib import Path
from unittest.mock import patch

import pytest

from cosmos import settings
from cosmos.config import ExecutionConfig, ProjectConfig, RenderConfig
from cosmos.dbt.graph import CosmosLoadDbtException, DbtGraph

SAMPLE_MANIFEST = Path(__file__).parent.parent / "sample/manifest.json"
DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt"


def _make_dbt_graph(manifest_path: Path = SAMPLE_MANIFEST) -> DbtGraph:
return DbtGraph(
project=ProjectConfig(manifest_path=manifest_path, project_name="jaffle_shop"),
execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "jaffle_shop"),
render_config=RenderConfig(),
)


class TestOrjsonParserSettings:
def test_orjson_disabled_by_default(self):
assert settings.enable_orjson_parser is False

@patch.object(settings, "enable_orjson_parser", True)
def test_orjson_setting_can_be_overridden(self):
assert settings.enable_orjson_parser is True


class TestOrjsonParserMissingDependency:
@patch.object(settings, "enable_orjson_parser", True)
@patch("cosmos.dbt.graph.orjson", None)
def test_raises_when_orjson_not_installed(self):
dbt_graph = _make_dbt_graph()

with pytest.raises(CosmosLoadDbtException) as exc_info:
dbt_graph.load_from_dbt_manifest()

error_msg = str(exc_info.value)
assert "orjson" in error_msg.lower()
assert "not installed" in error_msg.lower()
assert "astronomer-cosmos[orjson]" in error_msg

@patch.object(settings, "enable_orjson_parser", True)
@patch("cosmos.dbt.graph.orjson", None)
def test_load_manifest_from_file_raises_without_orjson(self, tmp_path):
manifest_file = tmp_path / "manifest.json"
manifest_file.write_text('{"nodes": {}, "sources": {}, "exposures": {}}')
dbt_graph = _make_dbt_graph()

with pytest.raises(CosmosLoadDbtException, match="astronomer-cosmos\\[orjson\\]"):
dbt_graph._load_manifest_from_file(manifest_file)


class TestOrjsonParserEquivalence:
"""Verify orjson and standard json produce identical DbtGraph output."""

@patch.object(settings, "enable_orjson_parser", False)
def test_standard_json_loads_manifest(self):
dbt_graph = _make_dbt_graph()
dbt_graph.load_from_dbt_manifest()

assert len(dbt_graph.nodes) > 0

@pytest.mark.skipif(
not __import__("importlib").util.find_spec("orjson"),
reason="orjson not installed",
)
def test_orjson_produces_same_nodes_as_standard_json(self):
graph_std = _make_dbt_graph()
with patch.object(settings, "enable_orjson_parser", False):
graph_std.load_from_dbt_manifest()

graph_orjson = _make_dbt_graph()
with patch.object(settings, "enable_orjson_parser", True):
graph_orjson.load_from_dbt_manifest()

assert graph_std.nodes.keys() == graph_orjson.nodes.keys()

for node_id in graph_std.nodes:
std_node = graph_std.nodes[node_id]
fast_node = graph_orjson.nodes[node_id]
assert std_node.unique_id == fast_node.unique_id
assert std_node.resource_type == fast_node.resource_type
assert std_node.depends_on == fast_node.depends_on
assert std_node.tags == fast_node.tags

@pytest.mark.skipif(
not __import__("importlib").util.find_spec("orjson"),
reason="orjson not installed",
)
def test_load_manifest_from_file_returns_same_dict(self, tmp_path):
"""_load_manifest_from_file returns the same structure regardless of parser."""
import json

data = {"nodes": {"model.test.foo": {"resource_type": "model"}}, "sources": {}, "exposures": {}}
manifest_file = tmp_path / "manifest.json"
manifest_file.write_text(json.dumps(data))

dbt_graph = _make_dbt_graph()

with patch.object(settings, "enable_orjson_parser", False):
result_std = dbt_graph._load_manifest_from_file(manifest_file)

with patch.object(settings, "enable_orjson_parser", True):
result_orjson = dbt_graph._load_manifest_from_file(manifest_file)

assert result_std == result_orjson
Loading