Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@ jobs:
steps:
- run: true

Type-Check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
with:
python-version: '3.10'
architecture: 'x64'
- uses: actions/cache@v3
with:
path: |
~/.cache/pip
key: ${{ runner.os }}-${{ hashFiles('pyproject.toml') }}
- run: pip3 install hatch mypy
- run: hatch run tests.py3.9-2.7:type-check

Run-Unit-Tests:
runs-on: ubuntu-latest
strategy:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ repos:
rev: 'v1.5.1'
hooks:
- id: mypy
name: mypy-python-sdk
name: mypy-python
additional_dependencies: [types-PyYAML, types-attrs, attrs, types-requests, types-python-dateutil, apache-airflow]
files: ^cosmos

Expand Down
2 changes: 2 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
DBT_TARGET_DIR_NAME = "target"
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"

DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos"
OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/"


Expand Down
10 changes: 5 additions & 5 deletions cosmos/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
)


class DbtDockerBaseOperator(DockerOperator, DbtBaseOperator): # type: ignore[misc] # ignores subclass MyPy error
class DbtDockerBaseOperator(DockerOperator, DbtBaseOperator): # type: ignore
"""
Executes a dbt core cli command in a Docker container.

"""

template_fields: Sequence[str] = DbtBaseOperator.template_fields + DockerOperator.template_fields
template_fields: Sequence[str] = tuple(list(DbtBaseOperator.template_fields) + list(DockerOperator.template_fields))

intercept_flag = False

Expand All @@ -39,7 +39,7 @@ def __init__(

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any:
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}") # type: ignore[has-type]
self.log.info(f"Running command: {self.command}")
result = super().execute(context)
logger.info(result)

Expand All @@ -50,8 +50,8 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) ->
self.dbt_executable_path = "dbt"
dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags)
# set env vars
self.environment = {**env_vars, **self.environment} # type: ignore[has-type]
self.command = dbt_cmd
self.environment: dict[str, Any] = {**env_vars, **self.environment}
self.command: list[str] = dbt_cmd

def execute(self, context: Context) -> None:
self.build_and_run_cmd(context=context)
Expand Down
12 changes: 7 additions & 5 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
)


class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type: ignore[misc]
class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type: ignore
"""
Executes a dbt core cli command in a Kubernetes Pod.

"""

template_fields: Sequence[str] = DbtBaseOperator.template_fields + KubernetesPodOperator.template_fields
template_fields: Sequence[str] = tuple(
list(DbtBaseOperator.template_fields) + list(KubernetesPodOperator.template_fields)
)

intercept_flag = False

Expand All @@ -41,12 +43,12 @@ def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) -
super().__init__(**kwargs)

def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None:
env_vars_dict = dict()
env_vars_dict: dict[str, str] = dict()

for env_var in self.env_vars: # type: ignore[has-type]
for env_var in self.env_vars:
env_vars_dict[env_var.name] = env_var.value

self.env_vars = convert_env_vars({**env, **env_vars_dict})
self.env_vars: list[Any] = convert_env_vars({**env, **env_vars_dict})

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any:
self.build_kube_args(context, cmd_flags)
Expand Down
27 changes: 18 additions & 9 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pathlib import Path
from typing import Any, Callable, Literal, Sequence, TYPE_CHECKING

import airflow
import yaml
from airflow import DAG
from airflow.compat.functools import cached_property
Expand All @@ -30,7 +31,7 @@

from sqlalchemy.orm import Session

from cosmos.constants import OPENLINEAGE_PRODUCER
from cosmos.constants import DEFAULT_OPENLINEAGE_NAMESPACE, OPENLINEAGE_PRODUCER
from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.operators.base import DbtBaseOperator
Expand All @@ -43,18 +44,20 @@
)

logger = get_logger(__name__)
lineage_namespace = conf.get("openlineage", "namespace", fallback=os.getenv("OPENLINEAGE_NAMESPACE", "default"))


try:
from airflow.providers.openlineage.extractors.base import OperatorLineage
except (ImportError, ModuleNotFoundError):
from openlineage.airflow.extractors.base import OperatorLineage
except (ImportError, ModuleNotFoundError):
logger.warning(
"To enable emitting Openlineage events, please, upgrade to Airflow 2.7 or install `openlineage-airflow`."
)
is_openlineage_available = False
try:
from openlineage.airflow.extractors.base import OperatorLineage
except (ImportError, ModuleNotFoundError) as error:
logger.warning(
"To enable emitting Openlineage events. In order to use openlineage, upgrade to Airflow 2.7 or "
"install astronomer-cosmos[openlineage]."
)
logger.exception(error)
is_openlineage_available = False


class DbtLocalBaseOperator(DbtBaseOperator):
Expand Down Expand Up @@ -248,6 +251,12 @@ def calculate_openlineage_events_completes(
for key, value in env.items():
os.environ[key] = str(value)

lineage_namespace = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)
try:
lineage_namespace = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
pass

openlineage_processor = DbtLocalArtifactProcessor(
producer=OPENLINEAGE_PRODUCER,
job_namespace=lineage_namespace,
Expand All @@ -261,7 +270,7 @@ def calculate_openlineage_events_completes(
try:
events = openlineage_processor.parse()
self.openlineage_events_completes = events.completes
except FileNotFoundError as error:
except (FileNotFoundError, NotImplementedError) as error:
logger.exception(error)

def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
Expand Down
18 changes: 13 additions & 5 deletions docs/configuration/lineage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ and virtualenv execution methods (read `execution modes <../getting_started/exec
To emit lineage events, Cosmos can use one of the following:

1. Airflow 2.7 `built-in support to OpenLineage <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/1.0.2/guides/user.html>`_, or
2. The `openlineage-airflow <https://openlineage.io/docs/integrations/airflow/>`_ package
2. `Additional libraries <https://openlineage.io/docs/integrations/airflow/>`_.

No change to the user DAG files is required to use OpenLineage.

Expand All @@ -20,14 +20,22 @@ Installation

If using Airflow 2.7, no other dependency is required.

Otherwise, install the Python package ``openlineage-airflow``.
Otherwise, install Cosmos using ``astronomer-cosmos[openlineage]``.


Namespace configuration
-----------------------
Configuration
-------------

If using Airflow 2.7, follow `these instructions <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/1.0.2/guides/user.html>`_ on how to configure OpenLineage.

Otherwise, follow `these instructions <https://openlineage.io/docs/integrations/airflow/>`_.


Namespace
.........

Cosmos will use the Airflow ``[openlineage]`` ``namespace`` property as a namespace, `if available <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/1.0.2/guides/user.html>`_.

Otherwise, it attempts to use the environment variable ``OPENLINEAGE_NAMESPACE`` as the namespace.

If not defined, it uses ``"default"`` as the namespace.
Finally, if neither are defined, it uses ``"cosmos"`` as the namespace.
23 changes: 15 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ dependencies = [
"typing-extensions; python_version < '3.8'",
"virtualenv",
"openlineage-integration-common",
"openlineage-airflow"
]

[project.optional-dependencies]
Expand Down Expand Up @@ -76,12 +75,12 @@ dbt-snowflake = [
dbt-spark = [
"dbt-spark<=1.5.4",
]
lineage = [
openlineage = [
"openlineage-airflow",
]
all = [
"astronomer-cosmos[dbt-all]",
"astronomer-cosmos[lineage]"
"astronomer-cosmos[openlineage]"
]
docs =[
"sphinx",
Expand Down Expand Up @@ -132,7 +131,12 @@ dependencies = [
"astronomer-cosmos[tests]",
"apache-airflow-providers-docker>=3.5.0",
"apache-airflow-providers-cncf-kubernetes>=5.1.1,<7.3.0",
"openlineage-airflow"
"types-PyYAML",
"types-attrs",
"attrs",
"types-requests",
"types-python-dateutil",
"apache-airflow"
]

[[tool.hatch.envs.tests.matrix]]
Expand All @@ -150,22 +154,23 @@ matrix.airflow.dependencies = [

[tool.hatch.envs.tests.scripts]
freeze = "pip freeze"
type-check = "mypy cosmos"
test = 'pytest -vv --durations=0 . -m "not integration" --ignore=tests/test_example_dags.py --ignore=tests/test_example_dags_no_connections.py'
test-cov = 'pytest -vv --cov=cosmos --cov-report=term-missing --cov-report=xml --durations=0 -m "not integration" --ignore=tests/test_example_dags.py --ignore=tests/test_example_dags_no_connections.py'
test-cov = """pytest -vv --cov=cosmos --cov-report=term-missing --cov-report=xml --durations=0 -m "not integration" --ignore=tests/test_example_dags.py --ignore=tests/test_example_dags_no_connections.py"""
# we install using the following workaround to overcome installation conflicts, such as:
# apache-airflow 2.3.0 and dbt-core [0.13.0 - 1.5.2] and jinja2>=3.0.0 because these package versions have conflicting dependencies
test-integration-setup = """pip uninstall dbt-postgres dbt-databricks; \
rm -rf airflow.*; \
airflow db init; \
pip install 'dbt-postgres<=1.5.4' 'dbt-databricks<=1.5.4'"""
pip install 'dbt-core==1.5.4' 'dbt-databricks<=1.5.4' 'dbt-postgres<=1.5.4' 'openlineage-airflow'"""
test-integration = """rm -rf dbt/jaffle_shop/dbt_packages;
pytest -vv \
--cov=cosmos \
--cov-report=term-missing \
--cov-report=xml \
--durations=0 \
-m integration \
-k 'not (example_cosmos_python_models or example_cosmos_python_models or example_virtualenv)'
-k 'not (example_cosmos_python_models or example_virtualenv)'
"""
test-integration-expensive = """rm -rf dbt/jaffle_shop/dbt_packages;
pytest -vv \
Expand All @@ -174,7 +179,7 @@ pytest -vv \
--cov-report=xml \
--durations=0 \
-m integration \
-k 'example_cosmos_python_models or example_cosmos_python_models or example_virtualenv'"""
-k 'example_cosmos_python_models or example_virtualenv'"""

[tool.pytest.ini_options]
filterwarnings = [
Expand Down Expand Up @@ -215,6 +220,8 @@ known_third_party = ["airflow", "jinja2"]

[tool.mypy]
strict = true
ignore_missing_imports = true
no_warn_unused_ignores = true

[tool.ruff]
line-length = 120
Expand Down
1 change: 1 addition & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def test_run_operator_dataset_inlets_and_outlets():
assert test_operator.outlets == []


@pytest.mark.integration
def test_run_operator_emits_events():
class MockRun:
facets = {"c": 3}
Expand Down