Skip to content
Merged
13 changes: 12 additions & 1 deletion haystack/core/pipeline/breakpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,20 @@ def _create_pipeline_snapshot(
)
serialized_original_input_data = {}

try:
serialized_pipeline_outputs = _serialize_value_with_schema(pipeline_outputs)
except Exception as error:
logger.warning(
"Failed to serialize outputs of the current pipeline state. "
"This likely occurred due to non-serializable object types. "
"The snapshot will store an empty dictionary instead. Error: {e}",
e=error,
)
serialized_pipeline_outputs = {}

pipeline_snapshot = PipelineSnapshot(
pipeline_state=PipelineState(
inputs=serialized_inputs, component_visits=component_visits, pipeline_outputs=pipeline_outputs
inputs=serialized_inputs, component_visits=component_visits, pipeline_outputs=serialized_pipeline_outputs
),
timestamp=datetime.now(),
break_point=break_point,
Expand Down
7 changes: 6 additions & 1 deletion haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,12 @@ def run( # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
include_outputs_from = pipeline_snapshot.include_outputs_from

# also intermediate_outputs from the snapshot when resuming
pipeline_outputs = pipeline_snapshot.pipeline_state.pipeline_outputs
# keep the deserialization of pipeline_outputs backwards compatible with the old pipeline_outputs format
# TODO: remove this in haystack 2.23.0
if "serialization_schema" not in pipeline_snapshot.pipeline_state.pipeline_outputs.keys():
pipeline_outputs = pipeline_snapshot.pipeline_state.pipeline_outputs
else:
pipeline_outputs = _deserialize_value_with_schema(pipeline_snapshot.pipeline_state.pipeline_outputs)

cached_topological_sort = None
# We need to access a component's receivers multiple times during a pipeline run.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
Fix the serialization and deserialization of ``pipeline_outputs`` in ``pipeline_snapshot`` and make it use the same schema as the rest of the pipeline state when running pipelines with breakpoints.
The deserialization of the older format of ``pipeline_outputs`` without serialization schema is supported till Haystack 2.23.0.
65 changes: 61 additions & 4 deletions test/core/pipeline/test_breakpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,15 @@ def run(self, input_value: str) -> dict[str, str]:
loaded_snapshot = load_pipeline_snapshot(snapshot_file)

# verify the snapshot contains the intermediate outputs from comp1
assert "comp1" in loaded_snapshot.pipeline_state.pipeline_outputs
assert loaded_snapshot.pipeline_state.pipeline_outputs["comp1"]["result"] == "processed_test"
assert loaded_snapshot.pipeline_state.pipeline_outputs == (
{
"serialization_schema": {
"type": "object",
"properties": {"comp1": {"type": "object", "properties": {"result": {"type": "string"}}}},
},
"serialized_data": {"comp1": {"result": "processed_test"}},
}
)

# verify the whole pipeline state contains the expected data
assert loaded_snapshot.pipeline_state.component_visits["comp1"] == 1
Expand All @@ -147,6 +154,37 @@ def run(self, input_value: str) -> dict[str, str]:
assert loaded_snapshot.break_point.visit_count == 0


def test_load_pipeline_snapshot_with_old_pipeline_outputs_format(tmp_path):
"Test to ensure backwards compatibility with the old pipeline_outputs format"
# TODO: remove this test in haystack 2.23.0
pipeline_snapshot = {
"pipeline_state": {
"inputs": {
"serialization_schema": {
"type": "object",
"properties": {"comp2": {"type": "object", "properties": {}}},
},
"serialized_data": {"comp2": {}},
},
"component_visits": {"comp1": 1, "comp2": 0},
"pipeline_outputs": {"comp1": {"result": "Answer from comp1"}},
},
"break_point": {"component_name": "comp2", "visit_count": 0, "snapshot_file_path": "test_breakpoints"},
"agent_snapshot": None,
"timestamp": "2025-12-01T17:14:24.366124",
"original_input_data": {"serialization_schema": {"type": "object", "properties": {}}, "serialized_data": {}},
"ordered_component_names": ["comp1", "comp2"],
"include_outputs_from": ["comp1"],
}

pipeline_snapshot_file = tmp_path / "old_pipeline_outputs_format.json"
with open(pipeline_snapshot_file, "w") as f:
json.dump(pipeline_snapshot, f)

loaded_snapshot = load_pipeline_snapshot(pipeline_snapshot_file)
assert loaded_snapshot == PipelineSnapshot.from_dict(pipeline_snapshot)


def test_trigger_tool_invoker_breakpoint(make_pipeline_snapshot_with_agent_snapshot):
pipeline_snapshot_with_agent_breakpoint = make_pipeline_snapshot_with_agent_snapshot(
break_point=AgentBreakpoint("agent", ToolBreakpoint(component_name="tool_invoker"))
Expand Down Expand Up @@ -236,7 +274,13 @@ def test_create_pipeline_snapshot_all_fields(self):
"serialized_data": {"comp1": {"input_value": "test"}, "comp2": {"input_value": "processed_test"}},
},
component_visits={"comp1": 1, "comp2": 0},
pipeline_outputs={"comp1": {"result": "processed_test"}},
pipeline_outputs={
"serialization_schema": {
"type": "object",
"properties": {"comp1": {"type": "object", "properties": {"result": {"type": "string"}}}},
},
"serialized_data": {"comp1": {"result": "processed_test"}},
},
)

def test_create_pipeline_snapshot_with_dataclasses_in_pipeline_outputs(self):
Expand All @@ -260,7 +304,20 @@ def test_create_pipeline_snapshot_with_dataclasses_in_pipeline_outputs(self):
"serialized_data": {"comp2": {}},
},
component_visits={"comp1": 1, "comp2": 0},
pipeline_outputs={"comp1": {"result": ChatMessage.from_user("hello")}},
pipeline_outputs={
"serialization_schema": {
"type": "object",
"properties": {
"comp1": {
"type": "object",
"properties": {"result": {"type": "haystack.dataclasses.chat_message.ChatMessage"}},
}
},
},
"serialized_data": {
"comp1": {"result": {"role": "user", "meta": {}, "name": None, "content": [{"text": "hello"}]}}
},
},
)

def test_create_pipeline_snapshot_non_serializable_inputs(self, caplog):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ def test_hybrid_rag_pipeline_crash_on_embedding_retriever(self):
assert pipeline_outputs is not None, "Pipeline outputs should be captured in the exception"

# verify that bm25_retriever and text_embedder ran successfully before the crash
assert "bm25_retriever" in pipeline_outputs, "BM25 retriever output not captured"
assert "documents" in pipeline_outputs["bm25_retriever"], "BM25 retriever should have produced documents"
assert "text_embedder" in pipeline_outputs, "Text embedder output not captured"
assert "embedding" in pipeline_outputs["text_embedder"], "Text embedder should have produced embeddings"
assert "bm25_retriever" in pipeline_outputs["serialized_data"], "BM25 retriever output not captured"
assert "documents" in (pipeline_outputs["serialized_data"]["bm25_retriever"]), (
"BM25 retriever should have produced documents"
)
assert "text_embedder" in (pipeline_outputs["serialized_data"]), "Text embedder output not captured"
assert "embedding" in (pipeline_outputs["serialized_data"]["text_embedder"]), (
"Text embedder should have produced embeddings"
)

# components after the crash point are not in the outputs
assert "document_joiner" not in pipeline_outputs, "Document joiner should not have run due to crash"
Expand Down
Loading