Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow-core/docs/authoring-and-scheduling/serializers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ like ``str`` and ``int`` and it loops over iterables. When things become more co

Airflow out of the box supports three ways of custom serialization. Primitives are returned as is, without
additional encoding, e.g. a ``str`` remains a ``str``. When it is not a primitive (or iterable thereof) Airflow
first looks in the class for a ``serialize()`` method or in case of deserialization a ``deserialize(data, version: int)`` method.
Next airflow checks for a registered serializer and deserializer in the namespace of ``airflow.serialization.serializers``.
Finally, if the class is either decorated with ``@dataclass``
looks for a registered serializer and deserializer in the namespace of ``airflow.serialization.serializers``.
If not found it will look in the class for a ``serialize()`` method or in case of deserialization a
``deserialize(data, version: int)`` method. Finally, if the class is either decorated with ``@dataclass``
or ``@attr.define`` it will use the public methods for those decorators.

If you are looking to extend Airflow with a new serializer, it is good to know when to choose what way of serialization.
Expand Down
16 changes: 0 additions & 16 deletions airflow-core/newsfragments/56881.significant.rst

This file was deleted.

43 changes: 21 additions & 22 deletions airflow-core/src/airflow/serialization/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,23 +133,6 @@ def serialize(o: object, depth: int = 0) -> U | None:
qn = qualname(o)
classname = None

# custom serializers
dct = {
CLASSNAME: qn,
VERSION: getattr(cls, "__version__", DEFAULT_VERSION),
}

# object / class brings their own. Prioritize over built-in serializers
if hasattr(o, "serialize"):
data = getattr(o, "serialize")()

# if we end up with a structure, ensure its values are serialized
if isinstance(data, dict):
data = serialize(data, depth + 1)

dct[DATA] = data
return dct

# Serialize namedtuple like tuples
# We also override the classname returned by the builtin.py serializer. The classname
# has to be "builtins.tuple", so that the deserializer can deserialize the object into tuple.
Expand All @@ -170,13 +153,29 @@ def serialize(o: object, depth: int = 0) -> U | None:
return encode(classname or serialized_classname, version, serialize(data, depth + 1))

# primitive types are returned as is
# Need to come after registered built-ins - else numpy float64 won't be serialized as numpy class, because isinstance(np.float64(0.234), float) == True
if isinstance(o, _primitives):
if isinstance(o, enum.Enum):
return o.value

return o

# custom serializers
dct = {
CLASSNAME: qn,
VERSION: getattr(cls, "__version__", DEFAULT_VERSION),
}

# object / class brings their own
if hasattr(o, "serialize"):
data = getattr(o, "serialize")()

# if we end up with a structure, ensure its values are serialized
if isinstance(data, dict):
data = serialize(data, depth + 1)

dct[DATA] = data
return dct

# dataclasses
if dataclasses.is_dataclass(cls):
# fixme: unfortunately using asdict with nested dataclasses it looses information
Expand Down Expand Up @@ -263,17 +262,17 @@ def deserialize(o: T | None, full=True, type_hint: Any = None) -> object:

cls = import_string(classname)

# class has deserialization function
if hasattr(cls, "deserialize"):
return getattr(cls, "deserialize")(deserialize(value), version)

# registered deserializer
if classname in _deserializers:
return _deserializers[classname].deserialize(cls, version, deserialize(value))
if is_pydantic_model(cls):
if PYDANTIC_MODEL_QUALNAME in _deserializers:
return _deserializers[PYDANTIC_MODEL_QUALNAME].deserialize(cls, version, deserialize(value))

# class has deserialization function
if hasattr(cls, "deserialize"):
return getattr(cls, "deserialize")(deserialize(value), version)

# attr or dataclass
if attr.has(cls) or dataclasses.is_dataclass(cls):
class_version = getattr(cls, "__version__", 0)
Expand Down
4 changes: 1 addition & 3 deletions airflow-core/tests/unit/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,7 @@ def execute(self, context, event=None):
tis = dr.get_task_instances()
assert next(x for x in tis if x.task_id == "abc").state == "success"

@mock.patch(
"airflow.sdk.execution_time.task_runner._execute_task", return_value="dummy task return value"
)
@mock.patch("airflow.sdk.execution_time.task_runner._execute_task")
def test_dag_test_with_mark_success(self, mock__execute_task):
"""
option `--mark-success-pattern` should mark matching tasks as success without executing them.
Expand Down
34 changes: 0 additions & 34 deletions airflow-core/tests/unit/serialization/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,16 @@
from airflow.serialization.serde import (
CLASSNAME,
DATA,
PYDANTIC_MODEL_QUALNAME,
SCHEMA_ID,
VERSION,
_deserializers,
_get_patterns,
_get_regexp_patterns,
_match,
_match_glob,
_match_regexp,
_serializers,
deserialize,
serialize,
)
from airflow.serialization.typing import is_pydantic_model
from airflow.utils.module_loading import import_string, iter_namespace, qualname

from tests_common.test_utils.config import conf_vars
Expand Down Expand Up @@ -200,18 +196,6 @@ def __call__(self):
return None


class PydanticModelWithCustomSerDe(BaseModel):
ignored_field_in_serialization: int = 0
x: str

@staticmethod
def deserialize(data: dict, version: int):
return PydanticModelWithCustomSerDe(ignored_field_in_serialization=-1, x=data["x"])

def serialize(self) -> dict:
return {"x": self.x}


@pytest.mark.usefixtures("recalculate_patterns")
class TestSerDe:
def test_ser_primitives(self):
Expand Down Expand Up @@ -530,21 +514,3 @@ def test_error_when_serializing_callable_without_name(self):
TypeError, match="cannot serialize object of type <class 'unit.serialization.test_serde.C'>"
):
serialize(i)

def test_custom_serde_methods_are_prioritized_over_builtins(self):
"""
There is a built-in SerDe for pydantic classes.
Test that the custom defined SerDe methods take precedence over the built-in ones.
"""
orig = PydanticModelWithCustomSerDe(ignored_field_in_serialization=200, x="SerDe Test")

assert is_pydantic_model(orig)
assert PYDANTIC_MODEL_QUALNAME in _serializers
assert PYDANTIC_MODEL_QUALNAME in _deserializers

serialized = serialize(orig)
assert "ignored_field_in_serialization" not in serialized
deserialized: PydanticModelWithCustomSerDe = deserialize(serialized)
assert deserialized.ignored_field_in_serialization == -1
assert deserialized.x == orig.x
assert type(orig) is type(deserialized)
Loading