From 6fbd36e85a08cf5dbec1531e2a203b61f323ff72 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 16 May 2024 13:48:08 +0100 Subject: [PATCH 1/3] Change dataset validation to raise warning --- airflow/datasets/__init__.py | 10 +++++++++- tests/datasets/test_dataset.py | 22 +++++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 37720314d3c66..fb80d331069a9 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -87,7 +87,15 @@ def _sanitize_uri(uri: str) -> str: fragment="", # Ignore any fragments. ) if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None: - parsed = normalizer(parsed) + try: + parsed = normalizer(parsed) + except ValueError as exception: + warnings.warn( + f"The dataset URI {uri} is not AIP-60 compliant. " + f"In Airflow 3, this will raise an exception. More information: {repr(exception)}", + UserWarning, + stacklevel=3, + ) return urllib.parse.urlunsplit(parsed) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 31aebff9fad5f..139820dfaafce 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -20,11 +20,12 @@ import os from collections import defaultdict from typing import Callable +from unittest.mock import patch import pytest from sqlalchemy.sql import select -from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny +from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny, _sanitize_uri from airflow.models.dataset import DatasetDagRunQueue, DatasetModel from airflow.models.serialized_dag import SerializedDagModel from airflow.operators.empty import EmptyOperator @@ -441,3 +442,22 @@ def test_datasets_expression_error(expression: Callable[[], None], error: str) - with pytest.raises(TypeError) as info: expression() assert str(info.value) == error + + +def mock_get_uri_normalizer(normalized_scheme): + def normalizer(uri): + raise ValueError("Incorrect URI format") + + return normalizer + + +@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer) +@patch("airflow.datasets.warnings.warn") +def test__sanitize_uri(mock_warn, caplog): + _sanitize_uri("postgres://localhost:5432/database.schema.table") + msg = mock_warn.call_args.args[0] + assert "The dataset URI postgres://localhost:5432/database.schema.table is not AIP-60 compliant." in msg + assert ( + "In Airflow 3, this will raise an exception. More information: ValueError('Incorrect URI format')" + in msg + ) From a6e58c049591e03acd03d04fa3f523f9e026fd14 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 16 May 2024 15:20:16 +0100 Subject: [PATCH 2/3] Introduce configuration core.strict_dataset_uri_validation --- airflow/config_templates/config.yml | 9 +++++++++ airflow/datasets/__init__.py | 18 ++++++++++++------ tests/datasets/test_dataset.py | 12 +++++++++++- 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 95d83f9d4c7f0..4d524344b3835 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -464,6 +464,15 @@ core: sensitive: true default: ~ example: '{"some_param": "some_value"}' + strict_dataset_uri_validation: + description: | + Dataset URI validation should raise an exception if it is not compliant with AIP-60. + By default this configuration is false, meaning that Airflow 2.x only warns the user. + In Airflow 3, this configuration will be enabled by default. + default: "False" + example: ~ + version_added: 2.9.2 + type: boolean database_access_isolation: description: (experimental) Whether components should use Airflow Internal API for DB connectivity. version_added: 2.6.0 diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index fb80d331069a9..07f82a18dd25c 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -27,6 +27,9 @@ if TYPE_CHECKING: from urllib.parse import SplitResult + +from airflow.configuration import conf + __all__ = ["Dataset", "DatasetAll", "DatasetAny"] @@ -90,12 +93,15 @@ def _sanitize_uri(uri: str) -> str: try: parsed = normalizer(parsed) except ValueError as exception: - warnings.warn( - f"The dataset URI {uri} is not AIP-60 compliant. " - f"In Airflow 3, this will raise an exception. More information: {repr(exception)}", - UserWarning, - stacklevel=3, - ) + if conf.getboolean("core", "strict_dataset_uri_validation"): + raise exception + else: + warnings.warn( + f"The dataset URI {uri} is not AIP-60 compliant. " + f"In Airflow 3, this will raise an exception. More information: {repr(exception)}", + UserWarning, + stacklevel=3, + ) return urllib.parse.urlunsplit(parsed) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 139820dfaafce..5453d971d9196 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -30,6 +30,7 @@ from airflow.models.serialized_dag import SerializedDagModel from airflow.operators.empty import EmptyOperator from airflow.serialization.serialized_objects import BaseSerialization, SerializedDAG +from tests.test_utils.config import conf_vars @pytest.fixture @@ -453,7 +454,7 @@ def normalizer(uri): @patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer) @patch("airflow.datasets.warnings.warn") -def test__sanitize_uri(mock_warn, caplog): +def test__sanitize_uri_raises_warning(mock_warn): _sanitize_uri("postgres://localhost:5432/database.schema.table") msg = mock_warn.call_args.args[0] assert "The dataset URI postgres://localhost:5432/database.schema.table is not AIP-60 compliant." in msg @@ -461,3 +462,12 @@ def test__sanitize_uri(mock_warn, caplog): "In Airflow 3, this will raise an exception. More information: ValueError('Incorrect URI format')" in msg ) + + +@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer) +@conf_vars({("core", "strict_dataset_uri_validation"): "True"}) +def test__sanitize_uri_raises_exception(): + with pytest.raises(ValueError) as e_info: + _sanitize_uri("postgres://localhost:5432/database.schema.table") + assert isinstance(e_info.value, ValueError) + assert str(e_info.value) == "Incorrect URI format" From 3f8f5c4df62350f26802c8e1bdbcd3eb58f173f1 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 16 May 2024 19:09:34 +0100 Subject: [PATCH 3/3] Address code review feedback --- airflow/datasets/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 07f82a18dd25c..d8012e68743dd 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -93,7 +93,7 @@ def _sanitize_uri(uri: str) -> str: try: parsed = normalizer(parsed) except ValueError as exception: - if conf.getboolean("core", "strict_dataset_uri_validation"): + if conf.getboolean("core", "strict_dataset_uri_validation", fallback=False): raise exception else: warnings.warn(