Skip to content

Commit fe3f8d1

Browse files
Include input and output messages in weave observability traces (#1050)
This PR ensures the input message and output message are displayed in the root observability trace in Weave when workflows are executed using either `nat run` or `nat serve` with NAT-UI. ## With `nat run`: <img width="1022" height="112" alt="image" src="https://github.com/user-attachments/assets/edf12d4d-d458-4779-b54b-9c508533aa93" /> ## With `nat serve` ### `/chat` <img width="1088" height="113" alt="image" src="https://github.com/user-attachments/assets/4e59f587-4208-4949-a63b-13eb42e53f8e" /> ### `/generate` <img width="1030" height="112" alt="image" src="https://github.com/user-attachments/assets/f17117a9-a8ec-40a6-81a0-33d7d6335dde" /> ### `/chat/stream`: For streaming endpoints, a preview of the first few tokens is collected for display in Weave <img width="1127" height="119" alt="image" src="https://github.com/user-attachments/assets/b46d65bd-bfd8-499b-a9d7-c6f364e8f0a4" /> ### `/generate/stream` <img width="940" height="122" alt="image" src="https://github.com/user-attachments/assets/e320d67b-b93d-43b8-b2b9-ea1ab944f009" /> ## Websocket Schemas ### `chat` <img width="1138" height="114" alt="image" src="https://github.com/user-attachments/assets/988dea72-7957-48b2-8c98-6fa9dee880c0" /> ### `chat_stream` <img width="1405" height="117" alt="image" src="https://github.com/user-attachments/assets/c2c87012-5d42-4580-9ade-d601fa3209ea" /> ### `generate` <img width="939" height="116" alt="image" src="https://github.com/user-attachments/assets/719658cc-8e5e-4013-8c8c-f17a7ae0b3ea" /> ### `generate_stream` <img width="979" height="117" alt="image" src="https://github.com/user-attachments/assets/cb0e9c84-92b7-46a1-b3c6-ee133425a589" /> Closes #1041 ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/NVIDIA/NeMo-Agent-Toolkit/blob/develop/docs/source/resources/contributing.md). - We require that all contributors "sign-off" on their commits. This certifies that the contribution is your original work, or you have rights to submit it under the same license, or a compatible license. - Any contribution which contains commits that are not Signed-Off will not be accepted. - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. ## Summary by CodeRabbit * **New Features** * Workflow start/end events now include structured input/output data. * Streaming workflows include an output preview (up to 50 items) on completion. * **Improvements** * Better extraction of inputs from websocket-style messages. * Enhanced parsing to surface message/content details alongside raw outputs. * Added automatic truncation utility for consistent, readable output previews. Authors: - Patrick Chin (https://github.com/thepatrickchin) Approvers: - Will Killian (https://github.com/willkill07) URL: #1050
1 parent 17bb09b commit fe3f8d1

File tree

3 files changed

+101
-3
lines changed

3 files changed

+101
-3
lines changed

packages/nvidia_nat_weave/src/nat/plugins/weave/weave_exporter.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
import logging
1717
from collections.abc import Generator
1818
from contextlib import contextmanager
19+
from typing import Any
1920

2021
from nat.data_models.intermediate_step import IntermediateStep
2122
from nat.data_models.span import Span
2223
from nat.observability.exporter.base_exporter import IsolatedAttribute
2324
from nat.observability.exporter.span_exporter import SpanExporter
2425
from nat.utils.log_utils import LogFilter
26+
from nat.utils.string_utils import truncate_string
2527
from nat.utils.type_utils import override
2628
from weave.trace.context import weave_client_context
2729
from weave.trace.context.call_context import get_current_call
@@ -152,6 +154,7 @@ def _create_weave_call(self, step: IntermediateStep, span: Span) -> Call:
152154
try:
153155
# Add the input to the Weave call
154156
inputs["input"] = step.payload.data.input
157+
self._extract_input_message(step.payload.data.input, inputs)
155158
except Exception:
156159
# If serialization fails, use string representation
157160
inputs["input"] = str(step.payload.data.input)
@@ -176,6 +179,76 @@ def _create_weave_call(self, step: IntermediateStep, span: Span) -> Call:
176179

177180
return call
178181

182+
def _extract_input_message(self, input_data: Any, inputs: dict[str, Any]) -> None:
183+
"""
184+
Extract message content from input data and add to inputs dictionary.
185+
Also handles websocket mode where message is located at messages[0].content[0].text.
186+
187+
Args:
188+
input_data: The raw input data from the request
189+
inputs: Dictionary to populate with extracted message content
190+
"""
191+
# Extract message content if input has messages attribute
192+
messages = getattr(input_data, 'messages', [])
193+
if messages:
194+
content = messages[0].content
195+
if isinstance(content, list) and content:
196+
inputs["input_message"] = getattr(content[0], 'text', content[0])
197+
else:
198+
inputs["input_message"] = content
199+
200+
def _extract_output_message(self, output_data: Any, outputs: dict[str, Any]) -> None:
201+
"""
202+
Extract message content from various response formats and add a preview to the outputs dictionary.
203+
No data is added to the outputs dictionary if the output format is not supported.
204+
205+
Supported output formats for message content include:
206+
207+
- output.choices[0].message.content /chat endpoint
208+
- output.value /generate endpoint
209+
- output[0].choices[0].message.content chat WS schema
210+
- output[0].choices[0].delta.content chat_stream WS schema, /chat/stream endpoint
211+
- output[0].value generate & generate_stream WS schema, /generate/stream endpoint
212+
213+
Args:
214+
output_data: The raw output data from the response
215+
outputs: Dictionary to populate with extracted message content.
216+
"""
217+
# Handle choices-keyed output object for /chat completion endpoint
218+
choices = getattr(output_data, 'choices', None)
219+
if choices:
220+
outputs["output_message"] = truncate_string(choices[0].message.content)
221+
return
222+
223+
# Handle value-keyed output object for union types common for /generate completion endpoint
224+
value = getattr(output_data, 'value', None)
225+
if value:
226+
outputs["output_message"] = truncate_string(value)
227+
return
228+
229+
# Handle list-based outputs (streaming or websocket)
230+
if not isinstance(output_data, list) or not output_data:
231+
return
232+
233+
choices = getattr(output_data[0], 'choices', None)
234+
if choices:
235+
# chat websocket schema
236+
message = getattr(choices[0], 'message', None)
237+
if message:
238+
outputs["output_message"] = truncate_string(getattr(message, 'content', None))
239+
return
240+
241+
# chat_stream websocket schema and /chat/stream completion endpoint
242+
delta = getattr(choices[0], 'delta', None)
243+
if delta:
244+
outputs["output_preview"] = truncate_string(getattr(delta, 'content', None))
245+
return
246+
247+
# generate & generate_stream websocket schema, and /generate/stream completion endpoint
248+
value = getattr(output_data[0], 'value', None)
249+
if value:
250+
outputs["output_preview"] = truncate_string(str(value))
251+
179252
def _finish_weave_call(self, step: IntermediateStep) -> None:
180253
"""
181254
Finish a previously created Weave call.
@@ -196,6 +269,7 @@ def _finish_weave_call(self, step: IntermediateStep) -> None:
196269
try:
197270
# Add the output to the Weave call
198271
outputs["output"] = step.payload.data.output
272+
self._extract_output_message(step.payload.data.output, outputs)
199273
except Exception:
200274
# If serialization fails, use string representation
201275
outputs["output"] = str(step.payload.data.output)

src/nat/runtime/runner.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ async def result(self, to_type: type | None = None):
170170
IntermediateStepPayload(UUID=workflow_step_uuid,
171171
event_type=IntermediateStepType.WORKFLOW_START,
172172
name=workflow_name,
173-
metadata=start_metadata))
173+
metadata=start_metadata,
174+
data=StreamEventData(input=self._input_message)))
174175

175176
result = await self._entry_fn.ainvoke(self._input_message, to_type=to_type) # type: ignore
176177

@@ -249,9 +250,15 @@ async def result_stream(self, to_type: type | None = None):
249250
IntermediateStepPayload(UUID=workflow_step_uuid,
250251
event_type=IntermediateStepType.WORKFLOW_START,
251252
name=workflow_name,
252-
metadata=start_metadata))
253+
metadata=start_metadata,
254+
data=StreamEventData(input=self._input_message)))
255+
256+
# Collect preview of streaming results for the WORKFLOW_END event
257+
output_preview = []
253258

254259
async for m in self._entry_fn.astream(self._input_message, to_type=to_type): # type: ignore
260+
if len(output_preview) < 50:
261+
output_preview.append(m)
255262
yield m
256263

257264
# Emit WORKFLOW_END
@@ -265,7 +272,8 @@ async def result_stream(self, to_type: type | None = None):
265272
IntermediateStepPayload(UUID=workflow_step_uuid,
266273
event_type=IntermediateStepType.WORKFLOW_END,
267274
name=workflow_name,
268-
metadata=end_metadata))
275+
metadata=end_metadata,
276+
data=StreamEventData(output=output_preview)))
269277
self._state = RunnerState.COMPLETED
270278

271279
# Close the intermediate stream

src/nat/utils/string_utils.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,19 @@ def convert_to_str(value: Any) -> str:
3636
return str(value)
3737
else:
3838
raise ValueError(f"Unsupported type for conversion to string: {type(value)}")
39+
40+
41+
def truncate_string(text: str | None, max_length: int = 100) -> str | None:
42+
"""
43+
Truncate a string to a maximum length, adding ellipsis if truncated.
44+
45+
Args:
46+
text: The text to truncate (can be None)
47+
max_length: Maximum allowed length (default: 100)
48+
49+
Returns:
50+
The truncated text with ellipsis if needed, or None if input was None
51+
"""
52+
if not text or len(text) <= max_length:
53+
return text
54+
return text[:max_length - 3] + "..."

0 commit comments

Comments
 (0)