Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

from airflow.models import Variable

try:
import orjson
except ImportError: # pragma: no cover
orjson = None # type: ignore[assignment]

if TYPE_CHECKING:
try:
# Airflow 3 onwards
Expand Down Expand Up @@ -1250,6 +1255,40 @@ def _apply_manifest_node_selection(self, nodes: dict[str, DbtNode], manifest: di
exclude=self.render_config.exclude,
)

def _load_manifest_from_file(self, manifest_path: Path | ObjectStoragePath) -> dict[str, Any]:
"""
Load and parse a dbt manifest JSON file.

Uses orjson for faster parsing if enabled and available, otherwise falls back to standard json.

Args:
manifest_path: Path to the manifest.json file

Returns:
Parsed manifest dictionary

Raises:
CosmosLoadDbtException: If orjson is enabled but not installed, or if the parsed manifest root is not a dictionary
"""
if settings.enable_orjson_parser and orjson:
with manifest_path.open("rb") as fp:
manifest = orjson.loads(fp.read())
elif settings.enable_orjson_parser:
raise CosmosLoadDbtException("orjson is not installed. Install it with: pip install orjson")
Comment thread
corsettigyg marked this conversation as resolved.
else:
with manifest_path.open("r") as fp:
manifest = json.load(fp)

if manifest is None:
return {}

if not isinstance(manifest, dict):
raise CosmosLoadDbtException(
f"Invalid dbt manifest file `{manifest_path}`: expected top-level JSON object, got {type(manifest).__name__}"
)
Comment thread
corsettigyg marked this conversation as resolved.
Comment thread
corsettigyg marked this conversation as resolved.

return manifest

def load_from_dbt_manifest(self) -> None:
"""
This approach accurately loads `dbt` projects using the `manifest.json` dbt manifest artifact.
Expand All @@ -1273,8 +1312,7 @@ def load_from_dbt_manifest(self) -> None:
if TYPE_CHECKING:
assert self.project.manifest_path is not None # pragma: no cover

with self.project.manifest_path.open() as fp:
manifest = json.load(fp) or {}
manifest = self._load_manifest_from_file(self.project.manifest_path)

project_path = self.execution_config.project_path
nodes = self._load_nodes_from_manifest_data(manifest, project_path)
Comment thread
corsettigyg marked this conversation as resolved.
Expand Down
3 changes: 3 additions & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,6 @@ def convert_to_boolean(value: str | None) -> bool:
# Debug mode - when enabled, Cosmos will track and push memory utilization to XCom
enable_debug_mode = conf.getboolean("cosmos", "enable_debug_mode", fallback=False)
debug_memory_poll_interval_seconds = conf.getfloat("cosmos", "debug_memory_poll_interval_seconds", fallback=0.5)

# Experimental: use orjson for faster dbt manifest.json parsing (disabled by default)
enable_orjson_parser = conf.getboolean("cosmos", "enable_orjson_parser", fallback=False)
17 changes: 17 additions & 0 deletions docs/reference/configs/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,23 @@ This page lists all available `Apache Airflow® <https://airflow.apache.org/>`_
- Default: ``0.5``
- Environment Variable: ``AIRFLOW__COSMOS__DEBUG_MEMORY_POLL_INTERVAL_SECONDS``

.. _enable_orjson_parser:

`enable_orjson_parser`_:
(Experimental, introduced in Cosmos 1.15.0) When enabled, Cosmos uses `orjson <https://github.com/ijl/orjson>`_ to parse
Comment thread
corsettigyg marked this conversation as resolved.
Comment thread
tatiana marked this conversation as resolved.
``manifest.json`` files instead of the standard library ``json`` module. orjson is a fast, Rust-based JSON
library that can significantly reduce DAG parsing time for large dbt projects with big manifests.

Benchmarks show up to 40% faster parsing compared to the standard ``json`` module, with the improvement
scaling with manifest file size.
Comment thread
corsettigyg marked this conversation as resolved.
Comment thread
corsettigyg marked this conversation as resolved.

Requires the optional ``orjson`` dependency: ``pip install orjson``.
If this setting is ``True`` but ``orjson`` is not installed, Cosmos raises a
``CosmosLoadDbtException`` at parse time with an actionable error message.
Comment thread
corsettigyg marked this conversation as resolved.

- Default: ``False``
- Environment Variable: ``AIRFLOW__COSMOS__ENABLE_ORJSON_PARSER``

.. _watcher_dbt_execution_queue:

`watcher_dbt_execution_queue`_:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ dependencies = [
"types-python-dateutil",
"Werkzeug<3.0.0",
"pytest-asyncio",
"orjson",
# Needed by the regression test for issue #2620: the test exercises
# airflow.sentry's module-level ConfiguredSentry() path, which only
# runs when sentry_sdk is importable.
Expand Down
140 changes: 140 additions & 0 deletions tests/dbt/test_orjson_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""
Unit tests for the experimental orjson parser feature.

Covers:
- Default setting value
- Error when orjson is enabled but not installed
- Standard json is used when setting is disabled
- orjson produces identical DbtGraph output to standard json
"""

from pathlib import Path
from unittest.mock import patch

import pytest

from cosmos import settings
from cosmos.config import ExecutionConfig, ProjectConfig, RenderConfig
from cosmos.dbt.graph import CosmosLoadDbtException, DbtGraph

SAMPLE_MANIFEST = Path(__file__).parent.parent / "sample/manifest.json"
DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt"


def _make_dbt_graph(manifest_path: Path = SAMPLE_MANIFEST) -> DbtGraph:
return DbtGraph(
project=ProjectConfig(manifest_path=manifest_path, project_name="jaffle_shop"),
execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / "jaffle_shop"),
render_config=RenderConfig(),
)


class TestOrjsonParserSettings:
def test_orjson_disabled_by_default(self):
assert settings.enable_orjson_parser is False

@patch.object(settings, "enable_orjson_parser", True)
def test_orjson_setting_can_be_overridden(self):
assert settings.enable_orjson_parser is True


class TestOrjsonParserMissingDependency:
@patch.object(settings, "enable_orjson_parser", True)
@patch("cosmos.dbt.graph.orjson", None)
def test_raises_when_orjson_not_installed(self):
dbt_graph = _make_dbt_graph()

with pytest.raises(CosmosLoadDbtException) as exc_info:
dbt_graph.load_from_dbt_manifest()

error_msg = str(exc_info.value)
assert "orjson" in error_msg.lower()
assert "not installed" in error_msg.lower()
assert "pip install orjson" in error_msg

Comment thread
corsettigyg marked this conversation as resolved.
@patch.object(settings, "enable_orjson_parser", True)
@patch("cosmos.dbt.graph.orjson", None)
def test_load_manifest_from_file_raises_without_orjson(self, tmp_path):
manifest_file = tmp_path / "manifest.json"
manifest_file.write_text('{"nodes": {}, "sources": {}, "exposures": {}}')
dbt_graph = _make_dbt_graph()

with pytest.raises(CosmosLoadDbtException, match="pip install orjson"):
dbt_graph._load_manifest_from_file(manifest_file)


class TestOrjsonParserEquivalence:
"""Verify orjson and standard json produce identical DbtGraph output."""

@patch.object(settings, "enable_orjson_parser", False)
def test_standard_json_loads_manifest(self):
dbt_graph = _make_dbt_graph()
dbt_graph.load_from_dbt_manifest()

assert len(dbt_graph.nodes) > 0

@pytest.mark.skipif(
not __import__("importlib").util.find_spec("orjson"),
reason="orjson not installed",
)
def test_orjson_produces_same_nodes_as_standard_json(self):
graph_std = _make_dbt_graph()
with patch.object(settings, "enable_orjson_parser", False):
graph_std.load_from_dbt_manifest()

graph_orjson = _make_dbt_graph()
with patch.object(settings, "enable_orjson_parser", True):
graph_orjson.load_from_dbt_manifest()

assert graph_std.nodes.keys() == graph_orjson.nodes.keys()

for node_id in graph_std.nodes:
std_node = graph_std.nodes[node_id]
fast_node = graph_orjson.nodes[node_id]
assert std_node.unique_id == fast_node.unique_id
assert std_node.resource_type == fast_node.resource_type
assert std_node.depends_on == fast_node.depends_on
assert std_node.tags == fast_node.tags

@pytest.mark.skipif(
not __import__("importlib").util.find_spec("orjson"),
reason="orjson not installed",
)
def test_load_manifest_from_file_returns_same_dict(self, tmp_path):
"""_load_manifest_from_file returns the same structure regardless of parser."""
import json

data = {"nodes": {"model.test.foo": {"resource_type": "model"}}, "sources": {}, "exposures": {}}
manifest_file = tmp_path / "manifest.json"
manifest_file.write_text(json.dumps(data))

dbt_graph = _make_dbt_graph()

with patch.object(settings, "enable_orjson_parser", False):
result_std = dbt_graph._load_manifest_from_file(manifest_file)

with patch.object(settings, "enable_orjson_parser", True):
result_orjson = dbt_graph._load_manifest_from_file(manifest_file)

assert result_std == result_orjson
Comment thread
corsettigyg marked this conversation as resolved.

def test_load_from_dbt_manifest_handles_null_manifest_root_per_loader_contract(self, tmp_path):
"""A manifest containing JSON ``null`` is treated as an empty dict (backward-compatible)."""
manifest_file = tmp_path / "manifest.json"
manifest_file.write_text("null")

dbt_graph = _make_dbt_graph(manifest_file)

with patch.object(settings, "enable_orjson_parser", False):
assert dbt_graph._load_manifest_from_file(manifest_file) == {}

def test_load_manifest_from_file_raises_on_invalid_root_type(self, tmp_path):
"""Non-dict, non-null roots (e.g. JSON arrays) raise CosmosLoadDbtException."""
manifest_file = tmp_path / "manifest.json"
manifest_file.write_text("[1, 2, 3]")

dbt_graph = _make_dbt_graph(manifest_file)

with patch.object(settings, "enable_orjson_parser", False):
with pytest.raises(CosmosLoadDbtException, match="expected top-level JSON object"):
dbt_graph._load_manifest_from_file(manifest_file)
Loading