diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index e868c04fd6..bab5f4f21c 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -575,6 +575,10 @@ def _handle_datasets(self, context: Context) -> None: logger.info("Outlets: %s", outlets) self.register_dataset(inlets, outlets, context) + if settings.enable_uri_xcom and (uris := [outlet.uri for outlet in outlets]): + context["ti"].xcom_push(key="uri", value=uris) + logger.info(f"Pushed outlet URI(s) to XCom: {uris}") + def _update_partial_parse_cache(self, tmp_dir_path: Path) -> None: if self.cache_dir is None: return diff --git a/cosmos/settings.py b/cosmos/settings.py index a4f35aa74d..edd82e5210 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -19,6 +19,7 @@ cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR) enable_cache = conf.getboolean("cosmos", "enable_cache", fallback=True) enable_dataset_alias = conf.getboolean("cosmos", "enable_dataset_alias", fallback=True) +enable_uri_xcom = conf.getboolean("cosmos", "enable_uri_xcom", fallback=False) use_dataset_airflow3_uri_standard = conf.getboolean( "cosmos", "enable_dataset_airflow3_uri", diff --git a/docs/configuration/scheduling.rst b/docs/configuration/scheduling.rst index 2450663ce4..622803176c 100644 --- a/docs/configuration/scheduling.rst +++ b/docs/configuration/scheduling.rst @@ -202,4 +202,46 @@ they can set this configuration to ``False``. It can also be set in the ``airflo [cosmos] enable_dataset_alias = False -Starting in Airflow 3, Cosmos users no longer allowed to set ``AIRFLOW__COSMOS__ENABLE_DATASET_ALIAS`` to ``True``. +Starting in Airflow 3, Cosmos users are no longer allowed to set ``AIRFLOW__COSMOS__ENABLE_DATASET_ALIAS`` to ``True``. + + +Emitting Dataset URIs as XCom +............................. + +By default, Cosmos emits datasets as Airflow inlets/outlets but does not expose the raw dataset URIs as XCom values. +If you need access to the dataset URIs (for example, to use them in downstream tasks or for debugging purposes), +you can enable the ``enable_uri_xcom`` setting. + +When enabled, Cosmos will push the outlet URIs to XCom with the key ``uri`` after each task execution that emits datasets. + +To enable this feature, set the environment variable: + +.. code-block:: bash + + export AIRFLOW__COSMOS__ENABLE_URI_XCOM=True + +Or in your ``airflow.cfg``: + +.. code-block:: + + [cosmos] + enable_uri_xcom = True + +When enabled, you can access the URIs in downstream tasks using XCom: + +.. code-block:: python + + from airflow.decorators import task + + + @task + def process_uris(**context): + ti = context["ti"] + uris = ti.xcom_pull(task_ids="my_dbt_task", key="uri") + for uri in uris: + print(f"Processing dataset: {uri}") + +.. note:: + + This feature is available for all Airflow versions (2.4+) and works alongside the existing dataset emission behavior. + The ``uri`` XCom contains a list of URI strings, even if there is only one outlet. diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 18a76e89d9..bcd178164d 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -2186,3 +2186,125 @@ def test_dbt_cmd_flags_all_templated(): expected_flags = ["--select", "--exclude", "model1", "model2"] assert operator.dbt_cmd_flags == expected_flags + + +@patch("cosmos.settings.enable_uri_xcom", False) +def test_handle_datasets_does_not_push_uri_xcom_when_disabled(): + """Test that _handle_datasets does not push URI to XCom when enable_uri_xcom is False (default).""" + operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + should_store_compiled_sql=False, + ) + + # Create mock Asset objects with uri attribute + mock_outlet = MagicMock() + mock_outlet.uri = "postgres://0.0.0.0:5432/postgres.public.test_table" + + mock_ti = MagicMock() + mock_context = {"ti": mock_ti, "outlet_events": MagicMock()} + + # Mock get_datasets to return mock assets and register_dataset to avoid database interactions + with ( + patch.object(operator, "get_datasets", side_effect=[[], [mock_outlet]]), + patch.object(operator, "register_dataset"), + ): + operator._handle_datasets(mock_context) + + # Verify xcom_push was NOT called with "uri" key + uri_xcom_calls = [call for call in mock_ti.xcom_push.call_args_list if call[1].get("key") == "uri"] + assert len(uri_xcom_calls) == 0, "URI XCom should not be pushed when enable_uri_xcom is False" + + +@patch("cosmos.settings.enable_uri_xcom", True) +def test_handle_datasets_pushes_uri_xcom_when_enabled(): + """Test that _handle_datasets pushes URI to XCom when enable_uri_xcom is True.""" + operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + should_store_compiled_sql=False, + ) + + # Create mock Asset objects with uri attribute + mock_outlet = MagicMock() + mock_outlet.uri = "postgres://0.0.0.0:5432/postgres.public.test_table" + + mock_ti = MagicMock() + mock_context = {"ti": mock_ti, "outlet_events": MagicMock()} + + # Mock get_datasets to return mock assets and register_dataset to avoid database interactions + with ( + patch.object(operator, "get_datasets", side_effect=[[], [mock_outlet]]), + patch.object(operator, "register_dataset"), + ): + operator._handle_datasets(mock_context) + + # Verify xcom_push was called with "uri" key and the correct value + mock_ti.xcom_push.assert_called_once() + call_kwargs = mock_ti.xcom_push.call_args[1] + assert call_kwargs["key"] == "uri" + assert isinstance(call_kwargs["value"], list) + assert len(call_kwargs["value"]) == 1 + assert call_kwargs["value"][0] == "postgres://0.0.0.0:5432/postgres.public.test_table" + + +@patch("cosmos.settings.enable_uri_xcom", True) +def test_handle_datasets_pushes_multiple_uris_to_xcom(): + """Test that _handle_datasets pushes multiple URIs to XCom when there are multiple outlets.""" + operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + should_store_compiled_sql=False, + ) + + # Create mock Asset objects with uri attribute + mock_outlet1 = MagicMock() + mock_outlet1.uri = "postgres://0.0.0.0:5432/postgres.public.table1" + mock_outlet2 = MagicMock() + mock_outlet2.uri = "postgres://0.0.0.0:5432/postgres.public.table2" + + mock_ti = MagicMock() + mock_context = {"ti": mock_ti, "outlet_events": MagicMock()} + + # Mock get_datasets to return mock assets and register_dataset to avoid database interactions + with ( + patch.object(operator, "get_datasets", side_effect=[[], [mock_outlet1, mock_outlet2]]), + patch.object(operator, "register_dataset"), + ): + operator._handle_datasets(mock_context) + + # Verify xcom_push was called with correct URIs + mock_ti.xcom_push.assert_called_once() + call_kwargs = mock_ti.xcom_push.call_args[1] + assert call_kwargs["key"] == "uri" + assert len(call_kwargs["value"]) == 2 + assert "postgres://0.0.0.0:5432/postgres.public.table1" in call_kwargs["value"] + assert "postgres://0.0.0.0:5432/postgres.public.table2" in call_kwargs["value"] + + +@patch("cosmos.settings.enable_uri_xcom", True) +def test_handle_datasets_does_not_push_xcom_when_no_outlets(): + """Test that _handle_datasets does not push XCom when there are no outlets.""" + operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + should_store_compiled_sql=False, + ) + + mock_ti = MagicMock() + mock_context = {"ti": mock_ti, "outlet_events": MagicMock()} + + # Mock get_datasets to return empty lists and register_dataset to avoid database interactions + with ( + patch.object(operator, "get_datasets", side_effect=[[], []]), + patch.object(operator, "register_dataset"), + ): + operator._handle_datasets(mock_context) + + # Verify xcom_push was NOT called (no outlets to push) + uri_xcom_calls = [call for call in mock_ti.xcom_push.call_args_list if call[1].get("key") == "uri"] + assert len(uri_xcom_calls) == 0, "URI XCom should not be pushed when there are no outlets"