Skip to content

Commit 28f744c

Browse files
authored
core[patch]: Correctly order parent ids in astream events (from root to immediate parent), add defensive check for cycles (#22637)
This PR makes two changes: 1. Fixes the order of parent IDs to be from root to immediate parent 2. Adds a simple defensive check for cycles
1 parent 8359261 commit 28f744c

File tree

2 files changed

+50
-7
lines changed

2 files changed

+50
-7
lines changed

libs/core/langchain_core/tracers/event_stream.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,20 @@ def __init__(
118118
def _get_parent_ids(self, run_id: UUID) -> List[str]:
119119
"""Get the parent IDs of a run (non-recursively) cast to strings."""
120120
parent_ids = []
121-
parent_id = self.parent_map[run_id]
122121

123-
while parent_id is not None:
124-
parent_ids.append(str(parent_id))
125-
parent_id = self.parent_map[parent_id]
122+
while parent_id := self.parent_map.get(run_id):
123+
str_parent_id = str(parent_id)
124+
if str_parent_id in parent_ids:
125+
raise AssertionError(
126+
f"Parent ID {parent_id} is already in the parent_ids list. "
127+
f"This should never happen."
128+
)
129+
parent_ids.append(str_parent_id)
130+
run_id = parent_id
126131

127-
return parent_ids
132+
# Return the parent IDs in reverse order, so that the first
133+
# parent ID is the root and the last ID is the immediate parent.
134+
return parent_ids[::-1]
128135

129136
def _send(self, event: StreamEvent, event_type: str) -> None:
130137
"""Send an event to the stream."""

libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2033,8 +2033,8 @@ async def parent(x: str, config: RunnableConfig) -> str:
20332033
"metadata": {},
20342034
"name": "grandchild",
20352035
"parent_ids": [
2036-
"00000000-0000-0000-0000-000000000008",
20372036
"00000000-0000-0000-0000-000000000007",
2037+
"00000000-0000-0000-0000-000000000008",
20382038
],
20392039
"run_id": "00000000-0000-0000-0000-000000000009",
20402040
"tags": [],
@@ -2045,8 +2045,8 @@ async def parent(x: str, config: RunnableConfig) -> str:
20452045
"metadata": {},
20462046
"name": "grandchild",
20472047
"parent_ids": [
2048-
"00000000-0000-0000-0000-000000000008",
20492048
"00000000-0000-0000-0000-000000000007",
2049+
"00000000-0000-0000-0000-000000000008",
20502050
],
20512051
"run_id": "00000000-0000-0000-0000-000000000009",
20522052
"tags": [],
@@ -2081,6 +2081,42 @@ async def parent(x: str, config: RunnableConfig) -> str:
20812081
]
20822082

20832083

2084+
async def test_bad_parent_ids() -> None:
2085+
"""Test handling of situation where a run id is duplicated in the run tree."""
2086+
2087+
# Type ignores in the code below need to be investigated.
2088+
# Looks like a typing issue when using RunnableLambda as a decorator
2089+
# with async functions.
2090+
@RunnableLambda # type: ignore
2091+
async def child(x: str) -> str:
2092+
return x
2093+
2094+
@RunnableLambda # type: ignore
2095+
async def parent(x: str, config: RunnableConfig) -> str:
2096+
config["run_id"] = uuid.UUID(int=7)
2097+
return await child.ainvoke(x, config) # type: ignore
2098+
2099+
bond = uuid.UUID(int=7)
2100+
events = await _collect_events(
2101+
parent.astream_events("hello", {"run_id": bond}, version="v2"),
2102+
with_nulled_ids=False,
2103+
)
2104+
# Includes only a partial list of events since the run ID gets duplicated
2105+
# between parent and child run ID and the callback handler throws an exception.
2106+
# The exception does not get bubbled up to the user.
2107+
assert events == [
2108+
{
2109+
"data": {"input": "hello"},
2110+
"event": "on_chain_start",
2111+
"metadata": {},
2112+
"name": "parent",
2113+
"parent_ids": [],
2114+
"run_id": "00000000-0000-0000-0000-000000000007",
2115+
"tags": [],
2116+
}
2117+
]
2118+
2119+
20842120
async def test_runnable_generator() -> None:
20852121
"""Test async events from sync lambda."""
20862122

0 commit comments

Comments
 (0)