From 9f0f7d4bb56d6945c1b1907478a5cf8d31bd6cf8 Mon Sep 17 00:00:00 2001 From: GunaPalanivel Date: Mon, 15 Dec 2025 21:29:30 +0530 Subject: [PATCH] feat!: remove backward compatibility for legacy pipeline_outputs format Remove temporary deserialization logic for old pipeline_outputs format in pipeline snapshots. This backward-compatibility layer was introduced in PR #10096 and marked for removal in Haystack 2.23.0. BREAKING CHANGE: Pipeline snapshots created before Haystack 2.17.0 will no longer deserialize correctly. Users must recreate snapshots with the current Haystack version or manually migrate to the new serialization schema. Changes: - Remove conditional deserialization check in Pipeline.run() - Remove test_load_pipeline_snapshot_with_old_pipeline_outputs_format test - Add breaking change documentation with migration instructions The new format uses 'serialization_schema' and 'serialized_data' fields, consistent with other pipeline state serialization. Closes #10168 Ref #10096 --- haystack/core/pipeline/pipeline.py | 7 +-- ...puts-deserialization-5acccb0245b84890.yaml | 8 +++ test/core/pipeline/test_breakpoint.py | 51 +------------------ 3 files changed, 10 insertions(+), 56 deletions(-) create mode 100644 releasenotes/notes/remove-legacy-pipeline-outputs-deserialization-5acccb0245b84890.yaml diff --git a/haystack/core/pipeline/pipeline.py b/haystack/core/pipeline/pipeline.py index 5748636877..0761410ffe 100644 --- a/haystack/core/pipeline/pipeline.py +++ b/haystack/core/pipeline/pipeline.py @@ -260,12 +260,7 @@ def run( # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches include_outputs_from = pipeline_snapshot.include_outputs_from # also intermediate_outputs from the snapshot when resuming - # keep the deserialization of pipeline_outputs backwards compatible with the old pipeline_outputs format - # TODO: remove this in haystack 2.23.0 - if "serialization_schema" not in pipeline_snapshot.pipeline_state.pipeline_outputs.keys(): - pipeline_outputs = pipeline_snapshot.pipeline_state.pipeline_outputs - else: - pipeline_outputs = _deserialize_value_with_schema(pipeline_snapshot.pipeline_state.pipeline_outputs) + pipeline_outputs = _deserialize_value_with_schema(pipeline_snapshot.pipeline_state.pipeline_outputs) cached_topological_sort = None # We need to access a component's receivers multiple times during a pipeline run. diff --git a/releasenotes/notes/remove-legacy-pipeline-outputs-deserialization-5acccb0245b84890.yaml b/releasenotes/notes/remove-legacy-pipeline-outputs-deserialization-5acccb0245b84890.yaml new file mode 100644 index 0000000000..3b56acae20 --- /dev/null +++ b/releasenotes/notes/remove-legacy-pipeline-outputs-deserialization-5acccb0245b84890.yaml @@ -0,0 +1,8 @@ +--- +upgrade: + - | + Remove backward-compatibility support for deserializing pipeline snapshots with + the old ``pipeline_outputs`` format. Pipeline snapshots created before Haystack 2.22.0 + that contain ``pipeline_outputs`` without the ``serialization_schema`` and ``serialized_data`` + structure are no longer supported. Users should recreate their pipeline snapshots + with the current Haystack version before upgrading to 2.23.0. diff --git a/test/core/pipeline/test_breakpoint.py b/test/core/pipeline/test_breakpoint.py index 51d05d28b6..8f646a7a34 100644 --- a/test/core/pipeline/test_breakpoint.py +++ b/test/core/pipeline/test_breakpoint.py @@ -17,25 +17,7 @@ load_pipeline_snapshot, ) from haystack.dataclasses import ChatMessage -from haystack.dataclasses.breakpoints import AgentBreakpoint, AgentSnapshot, Breakpoint, PipelineSnapshot, PipelineState - - -@pytest.fixture -def make_pipeline_snapshot_with_agent_snapshot(): - def _make(break_point: AgentBreakpoint) -> PipelineSnapshot: - return PipelineSnapshot( - break_point=break_point, - pipeline_state=PipelineState(inputs={}, component_visits={"agent": 0}, pipeline_outputs={}), - original_input_data={}, - ordered_component_names=["agent"], - agent_snapshot=AgentSnapshot( - break_point=break_point, - component_inputs={"chat_generator": {}, "tool_invoker": {"serialized_data": {"state": {}}}}, - component_visits={"chat_generator": 0, "tool_invoker": 0}, - ), - ) - - return _make +from haystack.dataclasses.breakpoints import Breakpoint, PipelineSnapshot, PipelineState def test_transform_json_structure_unwraps_sender_value(): @@ -145,37 +127,6 @@ def run(self, input_value: str) -> dict[str, str]: assert loaded_snapshot.break_point.visit_count == 0 -def test_load_pipeline_snapshot_with_old_pipeline_outputs_format(tmp_path): - "Test to ensure backwards compatibility with the old pipeline_outputs format" - # TODO: remove this test in haystack 2.23.0 - pipeline_snapshot = { - "pipeline_state": { - "inputs": { - "serialization_schema": { - "type": "object", - "properties": {"comp2": {"type": "object", "properties": {}}}, - }, - "serialized_data": {"comp2": {}}, - }, - "component_visits": {"comp1": 1, "comp2": 0}, - "pipeline_outputs": {"comp1": {"result": "Answer from comp1"}}, - }, - "break_point": {"component_name": "comp2", "visit_count": 0, "snapshot_file_path": "test_breakpoints"}, - "agent_snapshot": None, - "timestamp": "2025-12-01T17:14:24.366124", - "original_input_data": {"serialization_schema": {"type": "object", "properties": {}}, "serialized_data": {}}, - "ordered_component_names": ["comp1", "comp2"], - "include_outputs_from": ["comp1"], - } - - pipeline_snapshot_file = tmp_path / "old_pipeline_outputs_format.json" - with open(pipeline_snapshot_file, "w") as f: - json.dump(pipeline_snapshot, f) - - loaded_snapshot = load_pipeline_snapshot(pipeline_snapshot_file) - assert loaded_snapshot == PipelineSnapshot.from_dict(pipeline_snapshot) - - class TestCreatePipelineSnapshot: def test_create_pipeline_snapshot_all_fields(self): break_point = Breakpoint(component_name="comp2")