Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,15 @@ core:
sensitive: true
default: ~
example: '{"some_param": "some_value"}'
strict_dataset_uri_validation:
description: |
Dataset URI validation should raise an exception if it is not compliant with AIP-60.
By default this configuration is false, meaning that Airflow 2.x only warns the user.
In Airflow 3, this configuration will be enabled by default.
default: "False"
example: ~
version_added: 2.9.2
type: boolean
database_access_isolation:
description: (experimental) Whether components should use Airflow Internal API for DB connectivity.
version_added: 2.6.0
Expand Down
16 changes: 15 additions & 1 deletion airflow/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
if TYPE_CHECKING:
from urllib.parse import SplitResult


from airflow.configuration import conf

__all__ = ["Dataset", "DatasetAll", "DatasetAny"]


Expand Down Expand Up @@ -87,7 +90,18 @@ def _sanitize_uri(uri: str) -> str:
fragment="", # Ignore any fragments.
)
if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None:
parsed = normalizer(parsed)
try:
parsed = normalizer(parsed)
except ValueError as exception:
if conf.getboolean("core", "strict_dataset_uri_validation", fallback=False):
raise exception
else:
warnings.warn(
f"The dataset URI {uri} is not AIP-60 compliant. "
f"In Airflow 3, this will raise an exception. More information: {repr(exception)}",
UserWarning,
stacklevel=3,
)
return urllib.parse.urlunsplit(parsed)


Expand Down
32 changes: 31 additions & 1 deletion tests/datasets/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import os
from collections import defaultdict
from typing import Callable
from unittest.mock import patch

import pytest
from sqlalchemy.sql import select

from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny
from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny, _sanitize_uri
from airflow.models.dataset import DatasetDagRunQueue, DatasetModel
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.empty import EmptyOperator
from airflow.serialization.serialized_objects import BaseSerialization, SerializedDAG
from tests.test_utils.config import conf_vars


@pytest.fixture
Expand Down Expand Up @@ -441,3 +443,31 @@ def test_datasets_expression_error(expression: Callable[[], None], error: str) -
with pytest.raises(TypeError) as info:
expression()
assert str(info.value) == error


def mock_get_uri_normalizer(normalized_scheme):
def normalizer(uri):
raise ValueError("Incorrect URI format")

return normalizer


@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer)
@patch("airflow.datasets.warnings.warn")
def test__sanitize_uri_raises_warning(mock_warn):
_sanitize_uri("postgres://localhost:5432/database.schema.table")
msg = mock_warn.call_args.args[0]
assert "The dataset URI postgres://localhost:5432/database.schema.table is not AIP-60 compliant." in msg
assert (
"In Airflow 3, this will raise an exception. More information: ValueError('Incorrect URI format')"
in msg
)


@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer)
@conf_vars({("core", "strict_dataset_uri_validation"): "True"})
def test__sanitize_uri_raises_exception():
with pytest.raises(ValueError) as e_info:
_sanitize_uri("postgres://localhost:5432/database.schema.table")
assert isinstance(e_info.value, ValueError)
assert str(e_info.value) == "Incorrect URI format"