diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0963dba8e4..97ea9cd39c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -21,6 +21,22 @@ jobs: steps: - run: true + Type-Check: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v3 + with: + python-version: '3.10' + architecture: 'x64' + - uses: actions/cache@v3 + with: + path: | + ~/.cache/pip + key: ${{ runner.os }}-${{ hashFiles('pyproject.toml') }} + - run: pip3 install hatch mypy + - run: hatch run tests.py3.9-2.7:type-check + Run-Unit-Tests: runs-on: ubuntu-latest strategy: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a7a33789be..635c394ec0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -70,7 +70,7 @@ repos: rev: 'v1.5.1' hooks: - id: mypy - name: mypy-python-sdk + name: mypy-python additional_dependencies: [types-PyYAML, types-attrs, attrs, types-requests, types-python-dateutil, apache-airflow] files: ^cosmos diff --git a/cosmos/constants.py b/cosmos/constants.py index ec67f8cd2f..90037f14d2 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -12,6 +12,8 @@ DBT_TARGET_DIR_NAME = "target" DBT_LOG_FILENAME = "dbt.log" DBT_BINARY_NAME = "dbt" + +DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos" OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/" diff --git a/cosmos/operators/docker.py b/cosmos/operators/docker.py index 0dd724153c..fb2e1c90c7 100644 --- a/cosmos/operators/docker.py +++ b/cosmos/operators/docker.py @@ -20,13 +20,13 @@ ) -class DbtDockerBaseOperator(DockerOperator, DbtBaseOperator): # type: ignore[misc] # ignores subclass MyPy error +class DbtDockerBaseOperator(DockerOperator, DbtBaseOperator): # type: ignore """ Executes a dbt core cli command in a Docker container. """ - template_fields: Sequence[str] = DbtBaseOperator.template_fields + DockerOperator.template_fields + template_fields: Sequence[str] = tuple(list(DbtBaseOperator.template_fields) + list(DockerOperator.template_fields)) intercept_flag = False @@ -39,7 +39,7 @@ def __init__( def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any: self.build_command(context, cmd_flags) - self.log.info(f"Running command: {self.command}") # type: ignore[has-type] + self.log.info(f"Running command: {self.command}") result = super().execute(context) logger.info(result) @@ -50,8 +50,8 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> self.dbt_executable_path = "dbt" dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags) # set env vars - self.environment = {**env_vars, **self.environment} # type: ignore[has-type] - self.command = dbt_cmd + self.environment: dict[str, Any] = {**env_vars, **self.environment} + self.command: list[str] = dbt_cmd def execute(self, context: Context) -> None: self.build_and_run_cmd(context=context) diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index 94a3b8f53c..38ca474525 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -26,13 +26,15 @@ ) -class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type: ignore[misc] +class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type: ignore """ Executes a dbt core cli command in a Kubernetes Pod. """ - template_fields: Sequence[str] = DbtBaseOperator.template_fields + KubernetesPodOperator.template_fields + template_fields: Sequence[str] = tuple( + list(DbtBaseOperator.template_fields) + list(KubernetesPodOperator.template_fields) + ) intercept_flag = False @@ -41,12 +43,12 @@ def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) - super().__init__(**kwargs) def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None: - env_vars_dict = dict() + env_vars_dict: dict[str, str] = dict() - for env_var in self.env_vars: # type: ignore[has-type] + for env_var in self.env_vars: env_vars_dict[env_var.name] = env_var.value - self.env_vars = convert_env_vars({**env, **env_vars_dict}) + self.env_vars: list[Any] = convert_env_vars({**env, **env_vars_dict}) def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any: self.build_kube_args(context, cmd_flags) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 8c980d6928..69bc1a78c0 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -7,6 +7,7 @@ from pathlib import Path from typing import Any, Callable, Literal, Sequence, TYPE_CHECKING +import airflow import yaml from airflow import DAG from airflow.compat.functools import cached_property @@ -30,7 +31,7 @@ from sqlalchemy.orm import Session -from cosmos.constants import OPENLINEAGE_PRODUCER +from cosmos.constants import DEFAULT_OPENLINEAGE_NAMESPACE, OPENLINEAGE_PRODUCER from cosmos.config import ProfileConfig from cosmos.log import get_logger from cosmos.operators.base import DbtBaseOperator @@ -43,18 +44,20 @@ ) logger = get_logger(__name__) -lineage_namespace = conf.get("openlineage", "namespace", fallback=os.getenv("OPENLINEAGE_NAMESPACE", "default")) try: from airflow.providers.openlineage.extractors.base import OperatorLineage except (ImportError, ModuleNotFoundError): - from openlineage.airflow.extractors.base import OperatorLineage -except (ImportError, ModuleNotFoundError): - logger.warning( - "To enable emitting Openlineage events, please, upgrade to Airflow 2.7 or install `openlineage-airflow`." - ) - is_openlineage_available = False + try: + from openlineage.airflow.extractors.base import OperatorLineage + except (ImportError, ModuleNotFoundError) as error: + logger.warning( + "To enable emitting Openlineage events. In order to use openlineage, upgrade to Airflow 2.7 or " + "install astronomer-cosmos[openlineage]." + ) + logger.exception(error) + is_openlineage_available = False class DbtLocalBaseOperator(DbtBaseOperator): @@ -248,6 +251,12 @@ def calculate_openlineage_events_completes( for key, value in env.items(): os.environ[key] = str(value) + lineage_namespace = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) + try: + lineage_namespace = conf.get("openlineage", "namespace") + except airflow.exceptions.AirflowConfigException: + pass + openlineage_processor = DbtLocalArtifactProcessor( producer=OPENLINEAGE_PRODUCER, job_namespace=lineage_namespace, @@ -261,7 +270,7 @@ def calculate_openlineage_events_completes( try: events = openlineage_processor.parse() self.openlineage_events_completes = events.completes - except FileNotFoundError as error: + except (FileNotFoundError, NotImplementedError) as error: logger.exception(error) def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: diff --git a/docs/configuration/lineage.rst b/docs/configuration/lineage.rst index 767deaaa32..54f9ad46db 100644 --- a/docs/configuration/lineage.rst +++ b/docs/configuration/lineage.rst @@ -10,7 +10,7 @@ and virtualenv execution methods (read `execution modes <../getting_started/exec To emit lineage events, Cosmos can use one of the following: 1. Airflow 2.7 `built-in support to OpenLineage `_, or -2. The `openlineage-airflow `_ package +2. `Additional libraries `_. No change to the user DAG files is required to use OpenLineage. @@ -20,14 +20,22 @@ Installation If using Airflow 2.7, no other dependency is required. -Otherwise, install the Python package ``openlineage-airflow``. +Otherwise, install Cosmos using ``astronomer-cosmos[openlineage]``. -Namespace configuration ------------------------ +Configuration +------------- + +If using Airflow 2.7, follow `these instructions `_ on how to configure OpenLineage. + +Otherwise, follow `these instructions `_. + + +Namespace +......... Cosmos will use the Airflow ``[openlineage]`` ``namespace`` property as a namespace, `if available `_. Otherwise, it attempts to use the environment variable ``OPENLINEAGE_NAMESPACE`` as the namespace. -If not defined, it uses ``"default"`` as the namespace. +Finally, if neither are defined, it uses ``"cosmos"`` as the namespace. diff --git a/pyproject.toml b/pyproject.toml index c9bace3109..66e8d2852d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,6 @@ dependencies = [ "typing-extensions; python_version < '3.8'", "virtualenv", "openlineage-integration-common", - "openlineage-airflow" ] [project.optional-dependencies] @@ -76,12 +75,12 @@ dbt-snowflake = [ dbt-spark = [ "dbt-spark<=1.5.4", ] -lineage = [ +openlineage = [ "openlineage-airflow", ] all = [ "astronomer-cosmos[dbt-all]", - "astronomer-cosmos[lineage]" + "astronomer-cosmos[openlineage]" ] docs =[ "sphinx", @@ -132,7 +131,12 @@ dependencies = [ "astronomer-cosmos[tests]", "apache-airflow-providers-docker>=3.5.0", "apache-airflow-providers-cncf-kubernetes>=5.1.1,<7.3.0", - "openlineage-airflow" + "types-PyYAML", + "types-attrs", + "attrs", + "types-requests", + "types-python-dateutil", + "apache-airflow" ] [[tool.hatch.envs.tests.matrix]] @@ -150,14 +154,15 @@ matrix.airflow.dependencies = [ [tool.hatch.envs.tests.scripts] freeze = "pip freeze" +type-check = "mypy cosmos" test = 'pytest -vv --durations=0 . -m "not integration" --ignore=tests/test_example_dags.py --ignore=tests/test_example_dags_no_connections.py' -test-cov = 'pytest -vv --cov=cosmos --cov-report=term-missing --cov-report=xml --durations=0 -m "not integration" --ignore=tests/test_example_dags.py --ignore=tests/test_example_dags_no_connections.py' +test-cov = """pytest -vv --cov=cosmos --cov-report=term-missing --cov-report=xml --durations=0 -m "not integration" --ignore=tests/test_example_dags.py --ignore=tests/test_example_dags_no_connections.py""" # we install using the following workaround to overcome installation conflicts, such as: # apache-airflow 2.3.0 and dbt-core [0.13.0 - 1.5.2] and jinja2>=3.0.0 because these package versions have conflicting dependencies test-integration-setup = """pip uninstall dbt-postgres dbt-databricks; \ rm -rf airflow.*; \ airflow db init; \ -pip install 'dbt-postgres<=1.5.4' 'dbt-databricks<=1.5.4'""" +pip install 'dbt-core==1.5.4' 'dbt-databricks<=1.5.4' 'dbt-postgres<=1.5.4' 'openlineage-airflow'""" test-integration = """rm -rf dbt/jaffle_shop/dbt_packages; pytest -vv \ --cov=cosmos \ @@ -165,7 +170,7 @@ pytest -vv \ --cov-report=xml \ --durations=0 \ -m integration \ --k 'not (example_cosmos_python_models or example_cosmos_python_models or example_virtualenv)' +-k 'not (example_cosmos_python_models or example_virtualenv)' """ test-integration-expensive = """rm -rf dbt/jaffle_shop/dbt_packages; pytest -vv \ @@ -174,7 +179,7 @@ pytest -vv \ --cov-report=xml \ --durations=0 \ -m integration \ --k 'example_cosmos_python_models or example_cosmos_python_models or example_virtualenv'""" +-k 'example_cosmos_python_models or example_virtualenv'""" [tool.pytest.ini_options] filterwarnings = [ @@ -215,6 +220,8 @@ known_third_party = ["airflow", "jinja2"] [tool.mypy] strict = true +ignore_missing_imports = true +no_warn_unused_ignores = true [tool.ruff] line-length = 120 diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index ffde94f1db..7d00d88c88 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -172,6 +172,7 @@ def test_run_operator_dataset_inlets_and_outlets(): assert test_operator.outlets == [] +@pytest.mark.integration def test_run_operator_emits_events(): class MockRun: facets = {"c": 3}