Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
639 changes: 303 additions & 336 deletions cosmos/__init__.py

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions cosmos/provider_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""
Required provider info for using Airflow config for configuration
"""

from __future__ import annotations

from typing import Any

from cosmos import __version__ # type: ignore[attr-defined]


def get_provider_info() -> dict[str, Any]:
return {
"package-name": "astronomer-cosmos", # Required
"name": "Astronomer Cosmos", # Required
"description": "Astronomer Cosmos is a library for rendering dbt workflows in Airflow. Contains dags, task groups, and operators.", # Required
"versions": [__version__], # Required
"config": {
"cosmos": {
"description": None,
"options": {
"propagate_logs": {
"description": "Enable log propagation from Cosmos custom logger\n",
"version_added": "1.3.0a1",
"version_deprecated": "1.6.0a1",
"deprecation_reason": "`propagate_logs` is no longer necessary as of Cosmos 1.6.0"
" because the issue this option was meant to address is no longer an"
" issue with Cosmos's new logging approach.",
Comment thread
pankajkoti marked this conversation as resolved.
"type": "boolean",
"example": None,
"default": "True",
},
},
},
},
}
7 changes: 7 additions & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@
remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None)
remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None)

# Eager imports in cosmos/__init__.py expose all Cosmos classes at the top level,
# which can significantly increase memory usage—even when Cosmos is installed but not actively used.
# This option allows disabling those eager imports to reduce memory footprint.
# When enabled, users must access Cosmos classes via their full module paths,
# avoiding the overhead of importing unused modules and classes.
enable_memory_optimised_imports = conf.getboolean("cosmos", "enable_memory_optimised_imports", fallback=False)

# Related to async operators
enable_setup_async_task = conf.getboolean("cosmos", "enable_setup_async_task", fallback=True)
enable_teardown_async_task = conf.getboolean("cosmos", "enable_teardown_async_task", fallback=True)
Expand Down
3 changes: 3 additions & 0 deletions dev/dags/basic_cosmos_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from datetime import datetime
from pathlib import Path

# [START cosmos_init_imports]
from cosmos import DbtDag, ProfileConfig, ProjectConfig

# [END cosmos_init_imports]
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
Expand Down
47 changes: 47 additions & 0 deletions dev/dags/basic_cosmos_dag_full_module_path_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
An example DAG that uses Cosmos by importing Cosmos classes with their full module path.
"""

import os
from datetime import datetime
from pathlib import Path

# [START cosmos_explicit_imports]
from cosmos.airflow.dag import DbtDag
from cosmos.config import ProfileConfig, ProjectConfig

# [END cosmos_explicit_imports]
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)

# [START local_example]
basic_cosmos_dag_full_module_path_imports = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
},
# normal dag parameters
schedule="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="basic_cosmos_dag_full_module_path_imports",
default_args={"retries": 2},
)
# [END local_example]
28 changes: 28 additions & 0 deletions docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,34 @@ This page lists all available Airflow configurations that affect ``astronomer-co
- Default: ``False``
- Environment Variable: ``AIRFLOW__COSMOS__USE_DATASET_AIRFLOW3_URI_STANDARD``

.. _enable_memory_optimised_imports:

`enable_memory_optimised_imports`_:
(Introduced in Cosmos 1.10.1): Eager imports in cosmos/__init__.py expose all Cosmos classes at the top level,
which can significantly increase memory usage—even when Cosmos is just installed but not actively used. This option allows
disabling those eager imports to reduce memory footprint. When enabled, users must access Cosmos classes via their full
module paths, avoiding the overhead of importing unused modules and classes.

- Default: ``False``
- Environment Variable: ``AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS``

.. note::
This option will become the default behavior in Cosmos 2.0.0, where all eager imports will be removed from ``cosmos/__init__.py``.

As an example, when this option is enabled, the following is an example of specifying the imports with full module paths:

.. literalinclude:: ../../dev/dags/basic_cosmos_dag_full_module_path_imports.py
:language: python
:start-after: [START cosmos_explicit_imports]
:end-before: [END cosmos_explicit_imports]

as opposed to the following approach you might be have when this option is disabled (default):

.. literalinclude:: ../../dev/dags/basic_cosmos_dag.py
:language: python
:start-after: [START cosmos_init_imports]
:end-before: [END cosmos_init_imports]


[openlineage]
~~~~~~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ gcp-cloud-run-job = [
]

[project.entry-points.apache_airflow_provider]
provider_info = "cosmos:get_provider_info"
provider_info = "cosmos.provider_info:get_provider_info"

[project.entry-points."airflow.plugins"]
cosmos = "cosmos.plugin:CosmosPlugin"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_log.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import pytest

import cosmos.log
from cosmos import get_provider_info
from cosmos.log import CosmosRichLogger, get_logger
from cosmos.provider_info import get_provider_info


def test_get_logger(monkeypatch):
Expand Down
32 changes: 32 additions & 0 deletions tests/test_settings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import subprocess
import textwrap
from importlib import reload
from unittest.mock import patch

Expand All @@ -9,3 +11,33 @@
def test_enable_cache_env_var():
reload(settings)
assert settings.enable_cache is False


def test_enable_memory_optimised_imports_true(monkeypatch):
script = textwrap.dedent(
"""
import os
os.environ["AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS"] = "True"
import cosmos
assert cosmos.settings.enable_memory_optimised_imports is True
assert not hasattr(cosmos, "DbtDag")
"""
)

result = subprocess.run(["python", "-c", script], capture_output=True, text=True)
assert result.returncode == 0, result.stderr


def test_enable_memory_optimised_imports_false(monkeypatch):
script = textwrap.dedent(
"""
import os
os.environ["AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS"] = "False"
import cosmos
assert cosmos.settings.enable_memory_optimised_imports is False
assert hasattr(cosmos, "DbtDag")
"""
)

result = subprocess.run(["python", "-c", script], capture_output=True, text=True)
assert result.returncode == 0, result.stderr
Loading