diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 9228a58934..8b4082340a 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,353 +5,320 @@ Contains dags, task groups, and operators. """ +from __future__ import annotations -__version__ = "1.10.0" +from cosmos import settings +__version__ = "1.10.1a3" -from cosmos.airflow.dag import DbtDag -from cosmos.airflow.task_group import DbtTaskGroup -from cosmos.config import ( - ExecutionConfig, - ProfileConfig, - ProjectConfig, - RenderConfig, -) -from cosmos.constants import ( - DbtResourceType, - ExecutionMode, - InvocationMode, - LoadMode, - SourceRenderingBehavior, - TestBehavior, - TestIndirectSelection, -) -from cosmos.log import get_logger -from cosmos.operators.lazy_load import MissingPackage -from cosmos.operators.local import ( - DbtBuildLocalOperator, - DbtCloneLocalOperator, - DbtDepsLocalOperator, - DbtLSLocalOperator, - DbtRunLocalOperator, - DbtRunOperationLocalOperator, - DbtSeedLocalOperator, - DbtSnapshotLocalOperator, - DbtTestLocalOperator, -) - -logger = get_logger(__name__) - -try: - from cosmos.operators.docker import ( - DbtBuildDockerOperator, - DbtCloneDockerOperator, - DbtLSDockerOperator, - DbtRunDockerOperator, - DbtRunOperationDockerOperator, - DbtSeedDockerOperator, - DbtSnapshotDockerOperator, - DbtTestDockerOperator, - ) -except ImportError: - DbtLSDockerOperator = MissingPackage("cosmos.operators.docker.DbtLSDockerOperator", "docker") - DbtRunDockerOperator = MissingPackage("cosmos.operators.docker.DbtRunDockerOperator", "docker") - DbtRunOperationDockerOperator = MissingPackage( - "cosmos.operators.docker.DbtRunOperationDockerOperator", - "docker", - ) - DbtSeedDockerOperator = MissingPackage("cosmos.operators.docker.DbtSeedDockerOperator", "docker") - DbtSnapshotDockerOperator = MissingPackage("cosmos.operators.docker.DbtSnapshotDockerOperator", "docker") - DbtTestDockerOperator = MissingPackage("cosmos.operators.docker.DbtTestDockerOperator", "docker") - -try: - from cosmos.operators.kubernetes import ( - DbtBuildKubernetesOperator, - DbtCloneKubernetesOperator, - DbtLSKubernetesOperator, - DbtRunKubernetesOperator, - DbtRunOperationKubernetesOperator, - DbtSeedKubernetesOperator, - DbtSnapshotKubernetesOperator, - DbtTestKubernetesOperator, - ) -except ImportError: - logger.debug("To import Kubernetes modules, install astronomer-cosmos[kubernetes].", stack_info=True) - DbtBuildKubernetesOperator = MissingPackage( - "cosmos.operators.kubernetes.DbtBuildKubernetesOperator", - "kubernetes", - ) - DbtLSKubernetesOperator = MissingPackage( - "cosmos.operators.kubernetes.DbtLSKubernetesOperator", - "kubernetes", - ) - DbtRunKubernetesOperator = MissingPackage( - "cosmos.operators.kubernetes.DbtRunKubernetesOperator", - "kubernetes", - ) - DbtRunOperationKubernetesOperator = MissingPackage( - "cosmos.operators.kubernetes.DbtRunOperationKubernetesOperator", - "kubernetes", - ) - DbtSeedKubernetesOperator = MissingPackage( - "cosmos.operators.kubernetes.DbtSeedKubernetesOperator", - "kubernetes", +if not settings.enable_memory_optimised_imports: + from cosmos.airflow.dag import DbtDag + from cosmos.airflow.task_group import DbtTaskGroup + from cosmos.config import ( + ExecutionConfig, + ProfileConfig, + ProjectConfig, + RenderConfig, ) - DbtSnapshotKubernetesOperator = MissingPackage( - "cosmos.operators.kubernetes.DbtSnapshotKubernetesOperator", - "kubernetes", + from cosmos.constants import ( + DbtResourceType, + ExecutionMode, + InvocationMode, + LoadMode, + SourceRenderingBehavior, + TestBehavior, + TestIndirectSelection, ) - DbtTestKubernetesOperator = MissingPackage( - "cosmos.operators.kubernetes.DbtTestKubernetesOperator", - "kubernetes", + from cosmos.log import get_logger + from cosmos.operators.lazy_load import MissingPackage + from cosmos.operators.local import ( + DbtBuildLocalOperator, + DbtCloneLocalOperator, + DbtDepsLocalOperator, + DbtLSLocalOperator, + DbtRunLocalOperator, + DbtRunOperationLocalOperator, + DbtSeedLocalOperator, + DbtSnapshotLocalOperator, + DbtTestLocalOperator, ) -try: - from cosmos.operators.azure_container_instance import ( - DbtBuildAzureContainerInstanceOperator, - DbtCloneAzureContainerInstanceOperator, - DbtLSAzureContainerInstanceOperator, - DbtRunAzureContainerInstanceOperator, - DbtRunOperationAzureContainerInstanceOperator, - DbtSeedAzureContainerInstanceOperator, - DbtSnapshotAzureContainerInstanceOperator, - DbtTestAzureContainerInstanceOperator, - ) -except ImportError: - DbtBuildAzureContainerInstanceOperator = MissingPackage( - "cosmos.operators.azure_container_instance.DbtBuildAzureContainerInstanceOperator", "azure-container-instance" - ) - DbtLSAzureContainerInstanceOperator = MissingPackage( - "cosmos.operators.azure_container_instance.DbtLSAzureContainerInstanceOperator", "azure-container-instance" - ) - DbtRunAzureContainerInstanceOperator = MissingPackage( - "cosmos.operators.azure_container_instance.DbtRunAzureContainerInstanceOperator", "azure-container-instance" - ) - DbtRunOperationAzureContainerInstanceOperator = MissingPackage( - "cosmos.operators.azure_container_instance.DbtRunOperationAzureContainerInstanceOperator", - "azure-container-instance", - ) - DbtSeedAzureContainerInstanceOperator = MissingPackage( - "cosmos.operators.azure_container_instance.DbtSeedAzureContainerInstanceOperator", "azure-container-instance" - ) - DbtSnapshotAzureContainerInstanceOperator = MissingPackage( - "cosmos.operators.azure_container_instance.DbtSnapshotAzureContainerInstanceOperator", - "azure-container-instance", - ) - DbtTestAzureContainerInstanceOperator = MissingPackage( - "cosmos.operators.azure_container_instance.DbtTestAzureContainerInstanceOperator", "azure-container-instance" - ) + logger = get_logger(__name__) + try: + from cosmos.operators.docker import ( + DbtBuildDockerOperator, + DbtCloneDockerOperator, + DbtLSDockerOperator, + DbtRunDockerOperator, + DbtRunOperationDockerOperator, + DbtSeedDockerOperator, + DbtSnapshotDockerOperator, + DbtTestDockerOperator, + ) + except ImportError: # pragma: no cover + DbtLSDockerOperator = MissingPackage("cosmos.operators.docker.DbtLSDockerOperator", "docker") + DbtRunDockerOperator = MissingPackage("cosmos.operators.docker.DbtRunDockerOperator", "docker") + DbtRunOperationDockerOperator = MissingPackage( + "cosmos.operators.docker.DbtRunOperationDockerOperator", + "docker", + ) + DbtSeedDockerOperator = MissingPackage("cosmos.operators.docker.DbtSeedDockerOperator", "docker") + DbtSnapshotDockerOperator = MissingPackage("cosmos.operators.docker.DbtSnapshotDockerOperator", "docker") + DbtTestDockerOperator = MissingPackage("cosmos.operators.docker.DbtTestDockerOperator", "docker") -try: - from cosmos.operators.aws_eks import ( - DbtBuildAwsEksOperator, - DbtCloneAwsEksOperator, - DbtLSAwsEksOperator, - DbtRunAwsEksOperator, - DbtRunOperationAwsEksOperator, - DbtSeedAwsEksOperator, - DbtSnapshotAwsEksOperator, - DbtTestAwsEksOperator, - ) -except ImportError: - DbtBuildAwsEksOperator = MissingPackage( - "cosmos.operators.azure_container_instance.DbtBuildAwsEksOperator", "aws_eks" - ) - DbtLSAwsEksOperator = MissingPackage("cosmos.operators.azure_container_instance.DbtLSAwsEksOperator", "aws_eks") - DbtRunAwsEksOperator = MissingPackage("cosmos.operators.azure_container_instance.DbtRunAwsEksOperator", "aws_eks") - DbtRunOperationAwsEksOperator = MissingPackage( - "cosmos.operators.azure_container_instance.DbtRunOperationAwsEksOperator", - "aws_eks", - ) - DbtSeedAwsEksOperator = MissingPackage("cosmos.operators.azure_container_instance.DbtSeedAwsEksOperator", "aws_eks") - DbtSnapshotAwsEksOperator = MissingPackage( - "cosmos.operators.azure_container_instance.DbtSnapshotAwsEksOperator", - "aws_eks", - ) - DbtTestAwsEksOperator = MissingPackage("cosmos.operators.azure_container_instance.DbtTestAwsEksOperator", "aws_eks") - - -try: - from cosmos.operators.aws_ecs import ( - DbtBuildAwsEcsOperator, - DbtLSAwsEcsOperator, - DbtRunAwsEcsOperator, - DbtRunOperationAwsEcsOperator, - DbtSeedAwsEcsOperator, - DbtSnapshotAwsEcsOperator, - DbtSourceAwsEcsOperator, - DbtTestAwsEcsOperator, - ) -except ImportError: - DbtBuildAwsEcsOperator = MissingPackage( - "cosmos.operators.aws_ecs.DbtBuildAwsEcsOperator", "aws-ecs" - ) # pragma: no cover - DbtLSAwsEcsOperator = MissingPackage("cosmos.operators.aws_ecs.DbtLSAwsEcsOperator", "aws-ecs") # pragma: no cover - DbtRunAwsEcsOperator = MissingPackage( - "cosmos.operators.aws_ecs.DbtRunAwsEcsOperator", "aws-ecs" - ) # pragma: no cover - DbtRunOperationAwsEcsOperator = MissingPackage( - "cosmos.operators.aws_ecs.DbtRunOperationAwsEcsOperator", - "aws-ecs", - ) # pragma: no cover - DbtSeedAwsEcsOperator = MissingPackage( - "cosmos.operators.aws_ecs.DbtSeedAwsEcsOperator", "aws-ecs" - ) # pragma: no cover - DbtSnapshotAwsEcsOperator = MissingPackage( - "cosmos.operators.aws_ecs.DbtSnapshotAwsEcsOperator", - "aws-ecs", - ) # pragma: no cover - DbtTestAwsEcsOperator = MissingPackage( - "cosmos.operators.aws_ecs.DbtTestAwsEcsOperator", "aws-ecs" - ) # pragma: no cover - DbtSourceAwsEcsOperator = MissingPackage( - "cosmos.operators.aws_ecs.DbtSourceAwsEcsOperator", "aws-ecs" - ) # pragma: no cover - - -try: - from cosmos.operators.gcp_cloud_run_job import ( - DbtBuildGcpCloudRunJobOperator, - DbtCloneGcpCloudRunJobOperator, - DbtLSGcpCloudRunJobOperator, - DbtRunGcpCloudRunJobOperator, - DbtRunOperationGcpCloudRunJobOperator, - DbtSeedGcpCloudRunJobOperator, - DbtSnapshotGcpCloudRunJobOperator, - DbtTestGcpCloudRunJobOperator, - ) -except (ImportError, AttributeError): - DbtBuildGcpCloudRunJobOperator = MissingPackage( - "cosmos.operators.gcp_cloud_run_job.DbtBuildGcpCloudRunJobOperator", "gcp-cloud-run-job" - ) - DbtLSGcpCloudRunJobOperator = MissingPackage( - "cosmos.operators.gcp_cloud_run_job.DbtLSGcpCloudRunJobOperator", "gcp-cloud-run-job" - ) - DbtRunGcpCloudRunJobOperator = MissingPackage( - "cosmos.operators.gcp_cloud_run_job.DbtRunGcpCloudRunJobOperator", "gcp-cloud-run-job" - ) - DbtRunOperationGcpCloudRunJobOperator = MissingPackage( - "cosmos.operators.gcp_cloud_run_job.DbtRunOperationGcpCloudRunJobOperator", "gcp-cloud-run-job" - ) - DbtSeedGcpCloudRunJobOperator = MissingPackage( - "cosmos.operators.gcp_cloud_run_job.DbtSeedGcpCloudRunJobOperator", "gcp-cloud-run-job" - ) - DbtSnapshotGcpCloudRunJobOperator = MissingPackage( - "cosmos.operators.gcp_cloud_run_job.DbtSnapshotGcpCloudRunJobOperator", "gcp-cloud-run-job" - ) - DbtTestGcpCloudRunJobOperator = MissingPackage( - "cosmos.operators.gcp_cloud_run_job.DbtTestGcpCloudRunJobOperator", "gcp-cloud-run-job" - ) + try: + from cosmos.operators.kubernetes import ( + DbtBuildKubernetesOperator, + DbtCloneKubernetesOperator, + DbtLSKubernetesOperator, + DbtRunKubernetesOperator, + DbtRunOperationKubernetesOperator, + DbtSeedKubernetesOperator, + DbtSnapshotKubernetesOperator, + DbtTestKubernetesOperator, + ) + except ImportError: # pragma: no cover + logger.debug("To import Kubernetes modules, install astronomer-cosmos[kubernetes].", stack_info=True) + DbtBuildKubernetesOperator = MissingPackage( + "cosmos.operators.kubernetes.DbtBuildKubernetesOperator", + "kubernetes", + ) + DbtLSKubernetesOperator = MissingPackage( + "cosmos.operators.kubernetes.DbtLSKubernetesOperator", + "kubernetes", + ) + DbtRunKubernetesOperator = MissingPackage( + "cosmos.operators.kubernetes.DbtRunKubernetesOperator", + "kubernetes", + ) + DbtRunOperationKubernetesOperator = MissingPackage( + "cosmos.operators.kubernetes.DbtRunOperationKubernetesOperator", + "kubernetes", + ) + DbtSeedKubernetesOperator = MissingPackage( + "cosmos.operators.kubernetes.DbtSeedKubernetesOperator", + "kubernetes", + ) + DbtSnapshotKubernetesOperator = MissingPackage( + "cosmos.operators.kubernetes.DbtSnapshotKubernetesOperator", + "kubernetes", + ) + DbtTestKubernetesOperator = MissingPackage( + "cosmos.operators.kubernetes.DbtTestKubernetesOperator", + "kubernetes", + ) + try: + from cosmos.operators.azure_container_instance import ( + DbtBuildAzureContainerInstanceOperator, + DbtCloneAzureContainerInstanceOperator, + DbtLSAzureContainerInstanceOperator, + DbtRunAzureContainerInstanceOperator, + DbtRunOperationAzureContainerInstanceOperator, + DbtSeedAzureContainerInstanceOperator, + DbtSnapshotAzureContainerInstanceOperator, + DbtTestAzureContainerInstanceOperator, + ) + except ImportError: # pragma: no cover + DbtBuildAzureContainerInstanceOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtBuildAzureContainerInstanceOperator", + "azure-container-instance", + ) + DbtLSAzureContainerInstanceOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtLSAzureContainerInstanceOperator", "azure-container-instance" + ) + DbtRunAzureContainerInstanceOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtRunAzureContainerInstanceOperator", "azure-container-instance" + ) + DbtRunOperationAzureContainerInstanceOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtRunOperationAzureContainerInstanceOperator", + "azure-container-instance", + ) + DbtSeedAzureContainerInstanceOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtSeedAzureContainerInstanceOperator", + "azure-container-instance", + ) + DbtSnapshotAzureContainerInstanceOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtSnapshotAzureContainerInstanceOperator", + "azure-container-instance", + ) + DbtTestAzureContainerInstanceOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtTestAzureContainerInstanceOperator", + "azure-container-instance", + ) -__all__ = [ - "ProjectConfig", - "ProfileConfig", - "ExecutionConfig", - "RenderConfig", - "DbtDag", - "DbtTaskGroup", - "ExecutionMode", - "LoadMode", - "TestBehavior", - "InvocationMode", - "TestIndirectSelection", - "SourceRenderingBehavior", - "DbtResourceType", - # Local Execution Mode - "DbtBuildLocalOperator", - "DbtCloneLocalOperator", - "DbtDepsLocalOperator", # deprecated, to be delete in Cosmos 2.x - "DbtLSLocalOperator", - "DbtRunLocalOperator", - "DbtRunOperationLocalOperator", - "DbtSeedLocalOperator", - "DbtSnapshotLocalOperator", - "DbtTestLocalOperator", - # Docker Execution Mode - "DbtBuildDockerOperator", - "DbtCloneDockerOperator", - "DbtLSDockerOperator", - "DbtRunDockerOperator", - "DbtRunOperationDockerOperator", - "DbtSeedDockerOperator", - "DbtSnapshotDockerOperator", - "DbtTestDockerOperator", - # Kubernetes Execution Mode - "DbtBuildKubernetesOperator", - "DbtCloneKubernetesOperator", - "DbtLSKubernetesOperator", - "DbtRunKubernetesOperator", - "DbtRunOperationKubernetesOperator", - "DbtSeedKubernetesOperator", - "DbtSnapshotKubernetesOperator", - "DbtTestKubernetesOperator", - # Azure Container Instance Execution Mode - "DbtBuildAzureContainerInstanceOperator", - "DbtCloneAzureContainerInstanceOperator", - "DbtLSAzureContainerInstanceOperator", - "DbtRunAzureContainerInstanceOperator", - "DbtRunOperationAzureContainerInstanceOperator", - "DbtSeedAzureContainerInstanceOperator", - "DbtSnapshotAzureContainerInstanceOperator", - "DbtTestAzureContainerInstanceOperator", - # AWS EKS Execution Mode - "DbtBuildAwsEksOperator", - "DbtCloneAwsEksOperator", - "DbtLSAwsEksOperator", - "DbtRunAwsEksOperator", - "DbtRunOperationAwsEksOperator", - "DbtSeedAwsEksOperator", - "DbtSnapshotAwsEksOperator", - "DbtTestAwsEksOperator", - # AWS ECS Task Run Execution Mode - "DbtBuildAwsEcsOperator", - "DbtLSAwsEcsOperator", - "DbtRunAwsEcsOperator", - "DbtRunOperationAwsEcsOperator", - "DbtSeedAwsEcsOperator", - "DbtSnapshotAwsEcsOperator", - "DbtTestAwsEcsOperator", - "DbtSourceAwsEcsOperator", - # GCP Cloud Run Job Execution Mode - "DbtBuildGcpCloudRunJobOperator", - "DbtCloneGcpCloudRunJobOperator", - "DbtLSGcpCloudRunJobOperator", - "DbtRunGcpCloudRunJobOperator", - "DbtRunOperationGcpCloudRunJobOperator", - "DbtSeedGcpCloudRunJobOperator", - "DbtSnapshotGcpCloudRunJobOperator", - "DbtTestGcpCloudRunJobOperator", -] + try: + from cosmos.operators.aws_eks import ( + DbtBuildAwsEksOperator, + DbtCloneAwsEksOperator, + DbtLSAwsEksOperator, + DbtRunAwsEksOperator, + DbtRunOperationAwsEksOperator, + DbtSeedAwsEksOperator, + DbtSnapshotAwsEksOperator, + DbtTestAwsEksOperator, + ) + except ImportError: # pragma: no cover + DbtBuildAwsEksOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtBuildAwsEksOperator", "aws_eks" + ) + DbtLSAwsEksOperator = MissingPackage("cosmos.operators.azure_container_instance.DbtLSAwsEksOperator", "aws_eks") + DbtRunAwsEksOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtRunAwsEksOperator", "aws_eks" + ) + DbtRunOperationAwsEksOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtRunOperationAwsEksOperator", + "aws_eks", + ) + DbtSeedAwsEksOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtSeedAwsEksOperator", "aws_eks" + ) + DbtSnapshotAwsEksOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtSnapshotAwsEksOperator", + "aws_eks", + ) + DbtTestAwsEksOperator = MissingPackage( + "cosmos.operators.azure_container_instance.DbtTestAwsEksOperator", "aws_eks" + ) -""" -Required provider info for using Airflow config for configuration -""" + try: + from cosmos.operators.aws_ecs import ( + DbtBuildAwsEcsOperator, + DbtLSAwsEcsOperator, + DbtRunAwsEcsOperator, + DbtRunOperationAwsEcsOperator, + DbtSeedAwsEcsOperator, + DbtSnapshotAwsEcsOperator, + DbtSourceAwsEcsOperator, + DbtTestAwsEcsOperator, + ) + except ImportError: # pragma: no cover + DbtBuildAwsEcsOperator = MissingPackage("cosmos.operators.aws_ecs.DbtBuildAwsEcsOperator", "aws-ecs") + DbtLSAwsEcsOperator = MissingPackage("cosmos.operators.aws_ecs.DbtLSAwsEcsOperator", "aws-ecs") + DbtRunAwsEcsOperator = MissingPackage("cosmos.operators.aws_ecs.DbtRunAwsEcsOperator", "aws-ecs") + DbtRunOperationAwsEcsOperator = MissingPackage( + "cosmos.operators.aws_ecs.DbtRunOperationAwsEcsOperator", + "aws-ecs", + ) + DbtSeedAwsEcsOperator = MissingPackage("cosmos.operators.aws_ecs.DbtSeedAwsEcsOperator", "aws-ecs") + DbtSnapshotAwsEcsOperator = MissingPackage( + "cosmos.operators.aws_ecs.DbtSnapshotAwsEcsOperator", + "aws-ecs", + ) + DbtTestAwsEcsOperator = MissingPackage("cosmos.operators.aws_ecs.DbtTestAwsEcsOperator", "aws-ecs") + DbtSourceAwsEcsOperator = MissingPackage("cosmos.operators.aws_ecs.DbtSourceAwsEcsOperator", "aws-ecs") + try: + from cosmos.operators.gcp_cloud_run_job import ( + DbtBuildGcpCloudRunJobOperator, + DbtCloneGcpCloudRunJobOperator, + DbtLSGcpCloudRunJobOperator, + DbtRunGcpCloudRunJobOperator, + DbtRunOperationGcpCloudRunJobOperator, + DbtSeedGcpCloudRunJobOperator, + DbtSnapshotGcpCloudRunJobOperator, + DbtTestGcpCloudRunJobOperator, + ) + except (ImportError, AttributeError): + DbtBuildGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtBuildGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtLSGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtLSGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtRunGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtRunGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtRunOperationGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtRunOperationGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtSeedGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtSeedGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtSnapshotGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtSnapshotGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtTestGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtTestGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) -def get_provider_info(): - return { - "package-name": "astronomer-cosmos", # Required - "name": "Astronomer Cosmos", # Required - "description": "Astronomer Cosmos is a library for rendering dbt workflows in Airflow. Contains dags, task groups, and operators.", # Required - "versions": [__version__], # Required - "config": { - "cosmos": { - "description": None, - "options": { - "propagate_logs": { - "description": "Enable log propagation from Cosmos custom logger\n", - "version_added": "1.3.0a1", - "version_deprecated": "1.6.0a1", - "deprecation_reason": "`propagate_logs` is no longer necessary as of Cosmos 1.6.0" - " because the issue this option was meant to address is no longer an" - " issue with Cosmos's new logging approach.", - "type": "boolean", - "example": None, - "default": "True", - }, - }, - }, - }, - } + __all__ = [ + "ProjectConfig", + "ProfileConfig", + "ExecutionConfig", + "RenderConfig", + "DbtDag", + "DbtTaskGroup", + "ExecutionMode", + "LoadMode", + "TestBehavior", + "InvocationMode", + "TestIndirectSelection", + "SourceRenderingBehavior", + "DbtResourceType", + # Local Execution Mode + "DbtBuildLocalOperator", + "DbtCloneLocalOperator", + "DbtDepsLocalOperator", # deprecated, to be delete in Cosmos 2.x + "DbtLSLocalOperator", + "DbtRunLocalOperator", + "DbtRunOperationLocalOperator", + "DbtSeedLocalOperator", + "DbtSnapshotLocalOperator", + "DbtTestLocalOperator", + # Docker Execution Mode + "DbtBuildDockerOperator", + "DbtCloneDockerOperator", + "DbtLSDockerOperator", + "DbtRunDockerOperator", + "DbtRunOperationDockerOperator", + "DbtSeedDockerOperator", + "DbtSnapshotDockerOperator", + "DbtTestDockerOperator", + # Kubernetes Execution Mode + "DbtBuildKubernetesOperator", + "DbtCloneKubernetesOperator", + "DbtLSKubernetesOperator", + "DbtRunKubernetesOperator", + "DbtRunOperationKubernetesOperator", + "DbtSeedKubernetesOperator", + "DbtSnapshotKubernetesOperator", + "DbtTestKubernetesOperator", + # Azure Container Instance Execution Mode + "DbtBuildAzureContainerInstanceOperator", + "DbtCloneAzureContainerInstanceOperator", + "DbtLSAzureContainerInstanceOperator", + "DbtRunAzureContainerInstanceOperator", + "DbtRunOperationAzureContainerInstanceOperator", + "DbtSeedAzureContainerInstanceOperator", + "DbtSnapshotAzureContainerInstanceOperator", + "DbtTestAzureContainerInstanceOperator", + # AWS EKS Execution Mode + "DbtBuildAwsEksOperator", + "DbtCloneAwsEksOperator", + "DbtLSAwsEksOperator", + "DbtRunAwsEksOperator", + "DbtRunOperationAwsEksOperator", + "DbtSeedAwsEksOperator", + "DbtSnapshotAwsEksOperator", + "DbtTestAwsEksOperator", + # AWS ECS Task Run Execution Mode + "DbtBuildAwsEcsOperator", + "DbtLSAwsEcsOperator", + "DbtRunAwsEcsOperator", + "DbtRunOperationAwsEcsOperator", + "DbtSeedAwsEcsOperator", + "DbtSnapshotAwsEcsOperator", + "DbtTestAwsEcsOperator", + "DbtSourceAwsEcsOperator", + # GCP Cloud Run Job Execution Mode + "DbtBuildGcpCloudRunJobOperator", + "DbtCloneGcpCloudRunJobOperator", + "DbtLSGcpCloudRunJobOperator", + "DbtRunGcpCloudRunJobOperator", + "DbtRunOperationGcpCloudRunJobOperator", + "DbtSeedGcpCloudRunJobOperator", + "DbtSnapshotGcpCloudRunJobOperator", + "DbtTestGcpCloudRunJobOperator", + ] diff --git a/cosmos/provider_info.py b/cosmos/provider_info.py new file mode 100644 index 0000000000..9f1764ff20 --- /dev/null +++ b/cosmos/provider_info.py @@ -0,0 +1,36 @@ +""" +Required provider info for using Airflow config for configuration +""" + +from __future__ import annotations + +from typing import Any + +from cosmos import __version__ # type: ignore[attr-defined] + + +def get_provider_info() -> dict[str, Any]: + return { + "package-name": "astronomer-cosmos", # Required + "name": "Astronomer Cosmos", # Required + "description": "Astronomer Cosmos is a library for rendering dbt workflows in Airflow. Contains dags, task groups, and operators.", # Required + "versions": [__version__], # Required + "config": { + "cosmos": { + "description": None, + "options": { + "propagate_logs": { + "description": "Enable log propagation from Cosmos custom logger\n", + "version_added": "1.3.0a1", + "version_deprecated": "1.6.0a1", + "deprecation_reason": "`propagate_logs` is no longer necessary as of Cosmos 1.6.0" + " because the issue this option was meant to address is no longer an" + " issue with Cosmos's new logging approach.", + "type": "boolean", + "example": None, + "default": "True", + }, + }, + }, + }, + } diff --git a/cosmos/settings.py b/cosmos/settings.py index bdb862a9c7..d7be150437 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -39,6 +39,13 @@ remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None) remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None) +# Eager imports in cosmos/__init__.py expose all Cosmos classes at the top level, +# which can significantly increase memory usage—even when Cosmos is installed but not actively used. +# This option allows disabling those eager imports to reduce memory footprint. +# When enabled, users must access Cosmos classes via their full module paths, +# avoiding the overhead of importing unused modules and classes. +enable_memory_optimised_imports = conf.getboolean("cosmos", "enable_memory_optimised_imports", fallback=False) + # Related to async operators enable_setup_async_task = conf.getboolean("cosmos", "enable_setup_async_task", fallback=True) enable_teardown_async_task = conf.getboolean("cosmos", "enable_teardown_async_task", fallback=True) diff --git a/dev/dags/basic_cosmos_dag.py b/dev/dags/basic_cosmos_dag.py index 613d839acd..2d68ebdec2 100644 --- a/dev/dags/basic_cosmos_dag.py +++ b/dev/dags/basic_cosmos_dag.py @@ -6,7 +6,10 @@ from datetime import datetime from pathlib import Path +# [START cosmos_init_imports] from cosmos import DbtDag, ProfileConfig, ProjectConfig + +# [END cosmos_init_imports] from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" diff --git a/dev/dags/basic_cosmos_dag_full_module_path_imports.py b/dev/dags/basic_cosmos_dag_full_module_path_imports.py new file mode 100644 index 0000000000..645d36310a --- /dev/null +++ b/dev/dags/basic_cosmos_dag_full_module_path_imports.py @@ -0,0 +1,47 @@ +""" +An example DAG that uses Cosmos by importing Cosmos classes with their full module path. +""" + +import os +from datetime import datetime +from pathlib import Path + +# [START cosmos_explicit_imports] +from cosmos.airflow.dag import DbtDag +from cosmos.config import ProfileConfig, ProjectConfig + +# [END cosmos_explicit_imports] +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + +# [START local_example] +basic_cosmos_dag_full_module_path_imports = DbtDag( + # dbt/cosmos-specific parameters + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + operator_args={ + "install_deps": True, # install any necessary dependencies before running any dbt command + "full_refresh": True, # used only in dbt commands that support this flag + }, + # normal dag parameters + schedule="@daily", + start_date=datetime(2023, 1, 1), + catchup=False, + dag_id="basic_cosmos_dag_full_module_path_imports", + default_args={"retries": 2}, +) +# [END local_example] diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 0230d58278..7ff0ddfa3a 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -185,6 +185,34 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``False`` - Environment Variable: ``AIRFLOW__COSMOS__USE_DATASET_AIRFLOW3_URI_STANDARD`` +.. _enable_memory_optimised_imports: + +`enable_memory_optimised_imports`_: + (Introduced in Cosmos 1.10.1): Eager imports in cosmos/__init__.py expose all Cosmos classes at the top level, + which can significantly increase memory usage—even when Cosmos is just installed but not actively used. This option allows + disabling those eager imports to reduce memory footprint. When enabled, users must access Cosmos classes via their full + module paths, avoiding the overhead of importing unused modules and classes. + + - Default: ``False`` + - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS`` + + .. note:: + This option will become the default behavior in Cosmos 2.0.0, where all eager imports will be removed from ``cosmos/__init__.py``. + + As an example, when this option is enabled, the following is an example of specifying the imports with full module paths: + + .. literalinclude:: ../../dev/dags/basic_cosmos_dag_full_module_path_imports.py + :language: python + :start-after: [START cosmos_explicit_imports] + :end-before: [END cosmos_explicit_imports] + + as opposed to the following approach you might be have when this option is disabled (default): + + .. literalinclude:: ../../dev/dags/basic_cosmos_dag.py + :language: python + :start-after: [START cosmos_init_imports] + :end-before: [END cosmos_init_imports] + [openlineage] ~~~~~~~~~~~~~ diff --git a/pyproject.toml b/pyproject.toml index 23a7809341..f80a311dc5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -118,7 +118,7 @@ gcp-cloud-run-job = [ ] [project.entry-points.apache_airflow_provider] -provider_info = "cosmos:get_provider_info" +provider_info = "cosmos.provider_info:get_provider_info" [project.entry-points."airflow.plugins"] cosmos = "cosmos.plugin:CosmosPlugin" diff --git a/tests/test_log.py b/tests/test_log.py index 9f75634ad8..194799e2b6 100644 --- a/tests/test_log.py +++ b/tests/test_log.py @@ -1,8 +1,8 @@ import pytest import cosmos.log -from cosmos import get_provider_info from cosmos.log import CosmosRichLogger, get_logger +from cosmos.provider_info import get_provider_info def test_get_logger(monkeypatch): diff --git a/tests/test_settings.py b/tests/test_settings.py index d9f5e0f6e7..5bb0ad37d5 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -1,4 +1,6 @@ import os +import subprocess +import textwrap from importlib import reload from unittest.mock import patch @@ -9,3 +11,33 @@ def test_enable_cache_env_var(): reload(settings) assert settings.enable_cache is False + + +def test_enable_memory_optimised_imports_true(monkeypatch): + script = textwrap.dedent( + """ + import os + os.environ["AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS"] = "True" + import cosmos + assert cosmos.settings.enable_memory_optimised_imports is True + assert not hasattr(cosmos, "DbtDag") + """ + ) + + result = subprocess.run(["python", "-c", script], capture_output=True, text=True) + assert result.returncode == 0, result.stderr + + +def test_enable_memory_optimised_imports_false(monkeypatch): + script = textwrap.dedent( + """ + import os + os.environ["AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS"] = "False" + import cosmos + assert cosmos.settings.enable_memory_optimised_imports is False + assert hasattr(cosmos, "DbtDag") + """ + ) + + result = subprocess.run(["python", "-c", script], capture_output=True, text=True) + assert result.returncode == 0, result.stderr