diff --git a/airflow-core/docs/authoring-and-scheduling/serializers.rst b/airflow-core/docs/authoring-and-scheduling/serializers.rst index 3e087dcf74db6..fb19a111d9177 100644 --- a/airflow-core/docs/authoring-and-scheduling/serializers.rst +++ b/airflow-core/docs/authoring-and-scheduling/serializers.rst @@ -28,9 +28,9 @@ like ``str`` and ``int`` and it loops over iterables. When things become more co Airflow out of the box supports three ways of custom serialization. Primitives are returned as is, without additional encoding, e.g. a ``str`` remains a ``str``. When it is not a primitive (or iterable thereof) Airflow -first looks in the class for a ``serialize()`` method or in case of deserialization a ``deserialize(data, version: int)`` method. -Next airflow checks for a registered serializer and deserializer in the namespace of ``airflow.serialization.serializers``. -Finally, if the class is either decorated with ``@dataclass`` +looks for a registered serializer and deserializer in the namespace of ``airflow.serialization.serializers``. +If not found it will look in the class for a ``serialize()`` method or in case of deserialization a +``deserialize(data, version: int)`` method. Finally, if the class is either decorated with ``@dataclass`` or ``@attr.define`` it will use the public methods for those decorators. If you are looking to extend Airflow with a new serializer, it is good to know when to choose what way of serialization. diff --git a/airflow-core/newsfragments/56881.significant.rst b/airflow-core/newsfragments/56881.significant.rst deleted file mode 100644 index 071d827356b41..0000000000000 --- a/airflow-core/newsfragments/56881.significant.rst +++ /dev/null @@ -1,16 +0,0 @@ -Prioritize custom SerDe methods defined in classes over the built-in SerDe methods. - -Previously, the SerDe logic prioritized built-in serializers in airflow-core over those that users defined -in their classes with the ``serialize(self)`` and ``deserialize(data: dict, version: int)`` methods. -This behavior has been changed to always use custom SerDe methods before considering built-in SerDe methods. - -* Types of change - - * [ ] Dag changes - * [ ] Config changes - * [ ] API changes - * [ ] CLI changes - * [x] Behaviour changes - * [ ] Plugin changes - * [ ] Dependency changes - * [ ] Code interface changes diff --git a/airflow-core/src/airflow/serialization/serde.py b/airflow-core/src/airflow/serialization/serde.py index e3e8e2047df86..6faedd88417a7 100644 --- a/airflow-core/src/airflow/serialization/serde.py +++ b/airflow-core/src/airflow/serialization/serde.py @@ -133,23 +133,6 @@ def serialize(o: object, depth: int = 0) -> U | None: qn = qualname(o) classname = None - # custom serializers - dct = { - CLASSNAME: qn, - VERSION: getattr(cls, "__version__", DEFAULT_VERSION), - } - - # object / class brings their own. Prioritize over built-in serializers - if hasattr(o, "serialize"): - data = getattr(o, "serialize")() - - # if we end up with a structure, ensure its values are serialized - if isinstance(data, dict): - data = serialize(data, depth + 1) - - dct[DATA] = data - return dct - # Serialize namedtuple like tuples # We also override the classname returned by the builtin.py serializer. The classname # has to be "builtins.tuple", so that the deserializer can deserialize the object into tuple. @@ -170,13 +153,29 @@ def serialize(o: object, depth: int = 0) -> U | None: return encode(classname or serialized_classname, version, serialize(data, depth + 1)) # primitive types are returned as is - # Need to come after registered built-ins - else numpy float64 won't be serialized as numpy class, because isinstance(np.float64(0.234), float) == True if isinstance(o, _primitives): if isinstance(o, enum.Enum): return o.value return o + # custom serializers + dct = { + CLASSNAME: qn, + VERSION: getattr(cls, "__version__", DEFAULT_VERSION), + } + + # object / class brings their own + if hasattr(o, "serialize"): + data = getattr(o, "serialize")() + + # if we end up with a structure, ensure its values are serialized + if isinstance(data, dict): + data = serialize(data, depth + 1) + + dct[DATA] = data + return dct + # dataclasses if dataclasses.is_dataclass(cls): # fixme: unfortunately using asdict with nested dataclasses it looses information @@ -263,10 +262,6 @@ def deserialize(o: T | None, full=True, type_hint: Any = None) -> object: cls = import_string(classname) - # class has deserialization function - if hasattr(cls, "deserialize"): - return getattr(cls, "deserialize")(deserialize(value), version) - # registered deserializer if classname in _deserializers: return _deserializers[classname].deserialize(cls, version, deserialize(value)) @@ -274,6 +269,10 @@ def deserialize(o: T | None, full=True, type_hint: Any = None) -> object: if PYDANTIC_MODEL_QUALNAME in _deserializers: return _deserializers[PYDANTIC_MODEL_QUALNAME].deserialize(cls, version, deserialize(value)) + # class has deserialization function + if hasattr(cls, "deserialize"): + return getattr(cls, "deserialize")(deserialize(value), version) + # attr or dataclass if attr.has(cls) or dataclasses.is_dataclass(cls): class_version = getattr(cls, "__version__", 0) diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index e54a5fd09f9f0..20572af71536f 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -931,9 +931,7 @@ def execute(self, context, event=None): tis = dr.get_task_instances() assert next(x for x in tis if x.task_id == "abc").state == "success" - @mock.patch( - "airflow.sdk.execution_time.task_runner._execute_task", return_value="dummy task return value" - ) + @mock.patch("airflow.sdk.execution_time.task_runner._execute_task") def test_dag_test_with_mark_success(self, mock__execute_task): """ option `--mark-success-pattern` should mark matching tasks as success without executing them. diff --git a/airflow-core/tests/unit/serialization/test_serde.py b/airflow-core/tests/unit/serialization/test_serde.py index e89fbf53ce060..6130437f1b957 100644 --- a/airflow-core/tests/unit/serialization/test_serde.py +++ b/airflow-core/tests/unit/serialization/test_serde.py @@ -33,20 +33,16 @@ from airflow.serialization.serde import ( CLASSNAME, DATA, - PYDANTIC_MODEL_QUALNAME, SCHEMA_ID, VERSION, - _deserializers, _get_patterns, _get_regexp_patterns, _match, _match_glob, _match_regexp, - _serializers, deserialize, serialize, ) -from airflow.serialization.typing import is_pydantic_model from airflow.utils.module_loading import import_string, iter_namespace, qualname from tests_common.test_utils.config import conf_vars @@ -200,18 +196,6 @@ def __call__(self): return None -class PydanticModelWithCustomSerDe(BaseModel): - ignored_field_in_serialization: int = 0 - x: str - - @staticmethod - def deserialize(data: dict, version: int): - return PydanticModelWithCustomSerDe(ignored_field_in_serialization=-1, x=data["x"]) - - def serialize(self) -> dict: - return {"x": self.x} - - @pytest.mark.usefixtures("recalculate_patterns") class TestSerDe: def test_ser_primitives(self): @@ -530,21 +514,3 @@ def test_error_when_serializing_callable_without_name(self): TypeError, match="cannot serialize object of type " ): serialize(i) - - def test_custom_serde_methods_are_prioritized_over_builtins(self): - """ - There is a built-in SerDe for pydantic classes. - Test that the custom defined SerDe methods take precedence over the built-in ones. - """ - orig = PydanticModelWithCustomSerDe(ignored_field_in_serialization=200, x="SerDe Test") - - assert is_pydantic_model(orig) - assert PYDANTIC_MODEL_QUALNAME in _serializers - assert PYDANTIC_MODEL_QUALNAME in _deserializers - - serialized = serialize(orig) - assert "ignored_field_in_serialization" not in serialized - deserialized: PydanticModelWithCustomSerDe = deserialize(serialized) - assert deserialized.ignored_field_in_serialization == -1 - assert deserialized.x == orig.x - assert type(orig) is type(deserialized)