Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ def _handle_datasets(self, context: Context) -> None:
logger.info("Outlets: %s", outlets)
self.register_dataset(inlets, outlets, context)

if settings.enable_uri_xcom and (uris := [outlet.uri for outlet in outlets]):
context["ti"].xcom_push(key="uri", value=uris)
logger.info(f"Pushed outlet URI(s) to XCom: {uris}")
Comment thread
corsettigyg marked this conversation as resolved.

def _update_partial_parse_cache(self, tmp_dir_path: Path) -> None:
if self.cache_dir is None:
return
Expand Down
1 change: 1 addition & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR)
enable_cache = conf.getboolean("cosmos", "enable_cache", fallback=True)
enable_dataset_alias = conf.getboolean("cosmos", "enable_dataset_alias", fallback=True)
enable_uri_xcom = conf.getboolean("cosmos", "enable_uri_xcom", fallback=False)
use_dataset_airflow3_uri_standard = conf.getboolean(
"cosmos",
"enable_dataset_airflow3_uri",
Expand Down
44 changes: 43 additions & 1 deletion docs/configuration/scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,46 @@ they can set this configuration to ``False``. It can also be set in the ``airflo
[cosmos]
enable_dataset_alias = False

Starting in Airflow 3, Cosmos users no longer allowed to set ``AIRFLOW__COSMOS__ENABLE_DATASET_ALIAS`` to ``True``.
Starting in Airflow 3, Cosmos users are no longer allowed to set ``AIRFLOW__COSMOS__ENABLE_DATASET_ALIAS`` to ``True``.


Emitting Dataset URIs as XCom
.............................

By default, Cosmos emits datasets as Airflow inlets/outlets but does not expose the raw dataset URIs as XCom values.
If you need access to the dataset URIs (for example, to use them in downstream tasks or for debugging purposes),
you can enable the ``enable_uri_xcom`` setting.

When enabled, Cosmos will push the outlet URIs to XCom with the key ``uri`` after each task execution that emits datasets.

To enable this feature, set the environment variable:

.. code-block:: bash

export AIRFLOW__COSMOS__ENABLE_URI_XCOM=True

Or in your ``airflow.cfg``:

.. code-block::

[cosmos]
enable_uri_xcom = True

When enabled, you can access the URIs in downstream tasks using XCom:

.. code-block:: python

from airflow.decorators import task


@task
def process_uris(**context):
ti = context["ti"]
uris = ti.xcom_pull(task_ids="my_dbt_task", key="uri")
for uri in uris:
print(f"Processing dataset: {uri}")

.. note::

This feature is available for all Airflow versions (2.4+) and works alongside the existing dataset emission behavior.
The ``uri`` XCom contains a list of URI strings, even if there is only one outlet.
122 changes: 122 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2186,3 +2186,125 @@ def test_dbt_cmd_flags_all_templated():
expected_flags = ["--select", "--exclude", "model1", "model2"]

assert operator.dbt_cmd_flags == expected_flags


@patch("cosmos.settings.enable_uri_xcom", False)
def test_handle_datasets_does_not_push_uri_xcom_when_disabled():
"""Test that _handle_datasets does not push URI to XCom when enable_uri_xcom is False (default)."""
operator = ConcreteDbtLocalBaseOperator(
profile_config=profile_config,
task_id="my-task",
project_dir="my/dir",
should_store_compiled_sql=False,
)

# Create mock Asset objects with uri attribute
mock_outlet = MagicMock()
mock_outlet.uri = "postgres://0.0.0.0:5432/postgres.public.test_table"

mock_ti = MagicMock()
mock_context = {"ti": mock_ti, "outlet_events": MagicMock()}

# Mock get_datasets to return mock assets and register_dataset to avoid database interactions
with (
patch.object(operator, "get_datasets", side_effect=[[], [mock_outlet]]),
patch.object(operator, "register_dataset"),
):
operator._handle_datasets(mock_context)

# Verify xcom_push was NOT called with "uri" key
uri_xcom_calls = [call for call in mock_ti.xcom_push.call_args_list if call[1].get("key") == "uri"]
assert len(uri_xcom_calls) == 0, "URI XCom should not be pushed when enable_uri_xcom is False"


@patch("cosmos.settings.enable_uri_xcom", True)
def test_handle_datasets_pushes_uri_xcom_when_enabled():
"""Test that _handle_datasets pushes URI to XCom when enable_uri_xcom is True."""
operator = ConcreteDbtLocalBaseOperator(
profile_config=profile_config,
task_id="my-task",
project_dir="my/dir",
should_store_compiled_sql=False,
)

# Create mock Asset objects with uri attribute
mock_outlet = MagicMock()
mock_outlet.uri = "postgres://0.0.0.0:5432/postgres.public.test_table"

mock_ti = MagicMock()
mock_context = {"ti": mock_ti, "outlet_events": MagicMock()}

# Mock get_datasets to return mock assets and register_dataset to avoid database interactions
with (
patch.object(operator, "get_datasets", side_effect=[[], [mock_outlet]]),
patch.object(operator, "register_dataset"),
):
operator._handle_datasets(mock_context)

# Verify xcom_push was called with "uri" key and the correct value
mock_ti.xcom_push.assert_called_once()
call_kwargs = mock_ti.xcom_push.call_args[1]
assert call_kwargs["key"] == "uri"
assert isinstance(call_kwargs["value"], list)
assert len(call_kwargs["value"]) == 1
assert call_kwargs["value"][0] == "postgres://0.0.0.0:5432/postgres.public.test_table"


@patch("cosmos.settings.enable_uri_xcom", True)
def test_handle_datasets_pushes_multiple_uris_to_xcom():
"""Test that _handle_datasets pushes multiple URIs to XCom when there are multiple outlets."""
operator = ConcreteDbtLocalBaseOperator(
profile_config=profile_config,
task_id="my-task",
project_dir="my/dir",
should_store_compiled_sql=False,
)

# Create mock Asset objects with uri attribute
mock_outlet1 = MagicMock()
mock_outlet1.uri = "postgres://0.0.0.0:5432/postgres.public.table1"
mock_outlet2 = MagicMock()
mock_outlet2.uri = "postgres://0.0.0.0:5432/postgres.public.table2"

mock_ti = MagicMock()
mock_context = {"ti": mock_ti, "outlet_events": MagicMock()}

# Mock get_datasets to return mock assets and register_dataset to avoid database interactions
with (
patch.object(operator, "get_datasets", side_effect=[[], [mock_outlet1, mock_outlet2]]),
patch.object(operator, "register_dataset"),
):
operator._handle_datasets(mock_context)

# Verify xcom_push was called with correct URIs
mock_ti.xcom_push.assert_called_once()
call_kwargs = mock_ti.xcom_push.call_args[1]
assert call_kwargs["key"] == "uri"
assert len(call_kwargs["value"]) == 2
assert "postgres://0.0.0.0:5432/postgres.public.table1" in call_kwargs["value"]
assert "postgres://0.0.0.0:5432/postgres.public.table2" in call_kwargs["value"]


@patch("cosmos.settings.enable_uri_xcom", True)
def test_handle_datasets_does_not_push_xcom_when_no_outlets():
"""Test that _handle_datasets does not push XCom when there are no outlets."""
operator = ConcreteDbtLocalBaseOperator(
profile_config=profile_config,
task_id="my-task",
project_dir="my/dir",
should_store_compiled_sql=False,
)

mock_ti = MagicMock()
mock_context = {"ti": mock_ti, "outlet_events": MagicMock()}

# Mock get_datasets to return empty lists and register_dataset to avoid database interactions
with (
patch.object(operator, "get_datasets", side_effect=[[], []]),
patch.object(operator, "register_dataset"),
):
operator._handle_datasets(mock_context)

# Verify xcom_push was NOT called (no outlets to push)
uri_xcom_calls = [call for call in mock_ti.xcom_push.call_args_list if call[1].get("key") == "uri"]
assert len(uri_xcom_calls) == 0, "URI XCom should not be pushed when there are no outlets"