Skip to content
28 changes: 16 additions & 12 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,6 @@
from airflow.datasets import Dataset as Asset # type: ignore


try:
import openlineage
from openlineage.common.provider.dbt.local import DbtLocalArtifactProcessor
except ModuleNotFoundError:
is_openlineage_available = False
DbtLocalArtifactProcessor = None
else:
is_openlineage_available = True

if TYPE_CHECKING: # pragma: no cover
import openlineage # pragma: no cover
from dbt.cli.main import dbtRunner, dbtRunnerResult
Expand Down Expand Up @@ -128,6 +119,20 @@

logger = get_logger(__name__)

# The following is related to the ability of Cosmos parsing dbt artifacts and generating OpenLineage URIs
# It is used for emitting Airflow assets and not necessarily OpenLineage events
try:
import openlineage
from openlineage.common.provider.dbt.local import DbtLocalArtifactProcessor

is_openlineage_common_available = True
except ModuleNotFoundError:
is_openlineage_common_available = False
DbtLocalArtifactProcessor = None


# The following is related to the ability of Airflow to emit OpenLineage events
# This will decide if the method `get_openlineage_facets_on_complete` will be called by the Airflow OpenLineage listener or not
try:
from airflow.providers.openlineage.extractors.base import OperatorLineage
except (ImportError, ModuleNotFoundError):
Expand All @@ -138,10 +143,9 @@
"To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]."
)
logger.debug(
"Further details on lack of Openlineage:",
"Further details on lack of Openlineage Airflow provider:",
stack_info=True,
)
is_openlineage_available = False

@define
class OperatorLineage: # type: ignore
Expand Down Expand Up @@ -685,7 +689,7 @@ def run_command( # noqa: C901
env=env,
cwd=tmp_project_dir,
)
if is_openlineage_available:
if is_openlineage_common_available:
self.calculate_openlineage_events_completes(env, tmp_dir_path)
if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION:
# Airflow 3 does not support associating 'openlineage_events_completes' with task_instance,
Expand Down
4 changes: 1 addition & 3 deletions dev/dags/performance_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
DBT_ROOT_PATH / "perf",
),
profile_config=profile_config,
render_config=RenderConfig(
dbt_deps=False,
),
render_config=RenderConfig(dbt_deps=False, emit_datasets=False),
Comment thread
tatiana marked this conversation as resolved.
# normal dag parameters
schedule=None,
start_date=datetime(2024, 1, 1),
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies = [
"packaging>=22.0",
"pydantic>=1.10.0",
"virtualenv",
"openlineage-integration-common" # Without this dependency, Airflow datasets/assets don't work
Comment thread
pankajastro marked this conversation as resolved.
]

[project.optional-dependencies]
Expand Down