diff --git a/docs/source/_static/weave_tracing.png b/docs/source/_static/weave_tracing.png new file mode 100644 index 000000000..8f92d660a --- /dev/null +++ b/docs/source/_static/weave_tracing.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:1417d9c584737dfa9cd94e97d49145e8f82f5c1594642d117e92f8b1ec26acaf +size 732469 diff --git a/docs/source/guides/fine-grained-tracing-with-weave.md b/docs/source/guides/fine-grained-tracing-with-weave.md new file mode 100644 index 000000000..c468abe7c --- /dev/null +++ b/docs/source/guides/fine-grained-tracing-with-weave.md @@ -0,0 +1,79 @@ + + +# Observing a Workflow with W&B Weave + +This guide provides a step-by-step process to enable observability in an AIQ Toolkit workflow using Weights and Biases (W&B) Weave for tracing using just a few lines of code in your workflow configuration file. + +![Weave Tracing Dashboard](../_static/weave_tracing.png) + +### Step 1: Install the Weave plugin + +To install the Weave plugin, run the following: + +```bash +uv pip install -e '.[weave]' +``` + +### Step 2: Install the Workflow + +Pick an example from the list of available workflows. In this guide, we will be using the `simple_calculator` example. + +```bash +uv pip install -e examples/simple_calculator +``` + +### Step 3: Modify Workflow Configuration + +Update your workflow configuration file to include the weave telemetry settings. For example, `examples/simple_calculator/configs/config-weave.yml` has the following weave settings: + +```bash +general: + use_uvloop: true + telemetry: + tracing: + weave: + _type: weave + project: "aiqtoolkit-demo" +``` + +This setup enables logging trace data to W&B weave. The weave integration requires one parameter and one optional parameter: + +| Parameter | Description | Example | +|-----------|-------------|---------| +| `project` | The name of your W&B Weave project | `"aiqtoolkit-demo"` | +| `entity` (optional) | Your W&B username or team name | `"your-wandb-username-or-teamname"` | + +### Step 4: Run Your Workflow +From the root directory of the AIQ Toolkit library, execute your workflow as shown below: + +```bash +aiq run --config_file examples/simple_calculator/configs/config.yml --input "Is the product of 2 * 4 greater than the current hour of the day?" +``` + +If it is your first time running the workflow, you will be prompted to login to W&B Weave. + +### Step 5: View Traces Data in Weave Dashboard + +As the workflow runs, you will find a Weave URL (starting with a 🍩 emoji). Click on the URL to access your logged trace timeline. + +Note how the integration captures not only the `aiq` intermediate steps but also the underlying framework. This is because [Weave has integrations](https://weave-docs.wandb.ai/guides/integrations/) with many of your favorite frameworks. + +## Resources + +- Learn more about tracing [here](https://weave-docs.wandb.ai/guides/tracking/tracing). +- Learn more about how to navigate the logged traces [here](https://weave-docs.wandb.ai/guides/tracking/trace-tree). diff --git a/docs/source/guides/index.md b/docs/source/guides/index.md index 9bab073f4..e2a7cd2bc 100644 --- a/docs/source/guides/index.md +++ b/docs/source/guides/index.md @@ -25,6 +25,7 @@ Share Components <./sharing-workflows-and-tools.md> Evaluate <./evaluate.md> Add Custom Evaluators <./custom-evaluator.md> Evaluation Endpoints <./evaluate-api.md> +./fine-grained-tracing-with-weave.md ./observe-workflow-with-phoenix.md Use User Interface and API Server <./using-aiqtoolkit-ui-and-server.md> MCP Server Front-End <./mcp-server.md> diff --git a/examples/simple_calculator/src/aiq_simple_calculator/configs/config-weave.yml b/examples/simple_calculator/src/aiq_simple_calculator/configs/config-weave.yml new file mode 100644 index 000000000..e05ff73cd --- /dev/null +++ b/examples/simple_calculator/src/aiq_simple_calculator/configs/config-weave.yml @@ -0,0 +1,58 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +general: + use_uvloop: true + telemetry: + tracing: + weave: + _type: weave + project: "aiqtoolkit-demo" + +functions: + calculator_multiply: + _type: calculator_multiply + calculator_inequality: + _type: calculator_inequality + calculator_divide: + _type: aiq_simple_calculator/calculator_divide + current_datetime: + _type: current_datetime + calculator_subtract: + _type: calculator_subtract + +llms: + nim_llm: + _type: nim + model_name: meta/llama-3.1-70b-instruct + temperature: 0.0 + max_tokens: 1024 + openai_llm: + _type: openai + model_name: gpt-3.5-turbo + max_tokens: 2000 + +workflow: + _type: react_agent + tool_names: + - calculator_multiply + - calculator_inequality + - current_datetime + - calculator_divide + - calculator_subtract + llm_name: nim_llm + verbose: true + retry_parsing_errors: true + max_retries: 3 diff --git a/packages/aiqtoolkit_weave/pyproject.toml b/packages/aiqtoolkit_weave/pyproject.toml new file mode 100644 index 000000000..cf77f15a4 --- /dev/null +++ b/packages/aiqtoolkit_weave/pyproject.toml @@ -0,0 +1,41 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools >= 64", "setuptools-scm>=8"] + + +[tool.setuptools.packages.find] +where = ["src"] +include = ["aiq.*"] + + +[tool.setuptools_scm] +root = "../.." + + +[project] +name = "aiqtoolkit-weave" +dynamic = ["version"] +dependencies = [ + # Keep package version constraints as open as possible to avoid conflicts with other packages. Always define a minimum + # version when adding a new package. If unsure, default to using `~=` instead of `==`. Does not apply to aiq packages. + # Keep sorted!!! + "aiqtoolkit", + "weave>=0.51.44" +] +requires-python = ">=3.12" +description = "Subpackage for Weave integration in AIQToolkit" +readme = "src/aiq/meta/pypi.md" +keywords = ["ai", "observability", "wandb"] +classifiers = ["Programming Language :: Python"] + + +[tool.uv] +config-settings = { editable_mode = "compat" } + + +[tool.uv.sources] +aiqtoolkit = { workspace = true } + + +[project.entry-points.'aiq.components'] +aiqtoolkit_weave = "aiq.plugins.weave.register" diff --git a/packages/aiqtoolkit_weave/src/aiq/meta/pypi.md b/packages/aiqtoolkit_weave/src/aiq/meta/pypi.md new file mode 100644 index 000000000..5c8437758 --- /dev/null +++ b/packages/aiqtoolkit_weave/src/aiq/meta/pypi.md @@ -0,0 +1,23 @@ + + +![NVIDIA Agent Intelligence Toolkit](https://media.githubusercontent.com/media/NVIDIA/AIQToolkit/refs/heads/main/docs/source/_static/aiqtoolkit_banner.png "AIQ Toolkit banner image" + +# NVIDIA Agent Intelligence Toolkit Subpackage +This is a subpackage for Weights and Biases Weave integration for observability. + +For more information about AIQ Toolkit, please visit the [AIQ Toolkit package](https://pypi.org/project/aiqtoolkit/). diff --git a/packages/aiqtoolkit_weave/src/aiq/plugins/weave/__init__.py b/packages/aiqtoolkit_weave/src/aiq/plugins/weave/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/aiqtoolkit_weave/src/aiq/plugins/weave/register.py b/packages/aiqtoolkit_weave/src/aiq/plugins/weave/register.py new file mode 100644 index 000000000..db5fc96ee --- /dev/null +++ b/packages/aiqtoolkit_weave/src/aiq/plugins/weave/register.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=unused-import +# flake8: noqa +# isort:skip_file + +from . import weave_sdk diff --git a/packages/aiqtoolkit_weave/src/aiq/plugins/weave/weave_sdk.py b/packages/aiqtoolkit_weave/src/aiq/plugins/weave/weave_sdk.py new file mode 100644 index 000000000..9ea63fe0c --- /dev/null +++ b/packages/aiqtoolkit_weave/src/aiq/plugins/weave/weave_sdk.py @@ -0,0 +1,49 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional + +from pydantic import Field + +from aiq.builder.builder import Builder +from aiq.cli.register_workflow import register_telemetry_exporter +from aiq.data_models.telemetry_exporter import TelemetryExporterBaseConfig + + +class WeaveTelemetryExporter(TelemetryExporterBaseConfig, name="weave"): + """A telemetry exporter to transmit traces to Weights & Biases Weave using OpenTelemetry.""" + project: str = Field(description="The W&B project name.") + entity: Optional[str] = Field(default=None, description="The W&B username or team name.") + + +@register_telemetry_exporter(config_type=WeaveTelemetryExporter) +async def weave_telemetry_exporter(config: WeaveTelemetryExporter, builder: Builder): + import weave + + if config.entity: + _ = weave.init(project_name=f"{config.entity}/{config.project}") + else: + _ = weave.init(project_name=config.project) + + class NoOpSpanExporter: + + def export(self, spans): + return None + + def shutdown(self): + return None + + # just yielding None errors with 'NoneType' object has no attribute 'export' + yield NoOpSpanExporter() diff --git a/pyproject.toml b/pyproject.toml index d439c2dbb..b51b5bd39 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,7 @@ mem0ai = ["aiqtoolkit-mem0ai"] semantic-kernel = ["aiqtoolkit-semantic-kernel"] zep-cloud = ["aiqtoolkit-zep-cloud"] agno = ["aiqtoolkit-agno"] +weave = ["aiqtoolkit-weave"] examples = [ "aiq_email_phishing_analyzer", @@ -98,6 +99,7 @@ aiqtoolkit-semantic-kernel = { workspace = true } aiqtoolkit-test = { workspace = true } aiqtoolkit-zep-cloud = { workspace = true } aiqtoolkit-agno = { workspace = true } +aiqtoolkit-weave = { workspace = true } # All examples here aiq_email_phishing_analyzer = { path = "examples/email_phishing_analyzer", editable = true } diff --git a/src/aiq/observability/async_otel_listener.py b/src/aiq/observability/async_otel_listener.py index 9a038d301..4323b3243 100644 --- a/src/aiq/observability/async_otel_listener.py +++ b/src/aiq/observability/async_otel_listener.py @@ -16,6 +16,7 @@ import logging import re from contextlib import asynccontextmanager +from contextlib import contextmanager from typing import Any from openinference.semconv.trace import OpenInferenceSpanKindValues @@ -30,6 +31,17 @@ from aiq.data_models.intermediate_step import IntermediateStep from aiq.data_models.intermediate_step import IntermediateStepState +try: + from weave.trace.context import weave_client_context + from weave.trace.context.call_context import get_current_call + from weave.trace.context.call_context import set_call_stack + from weave.trace.weave_client import Call + WEAVE_AVAILABLE = True +except ImportError: + WEAVE_AVAILABLE = False + # we simply don't do anything if weave is not available + pass + logger = logging.getLogger(__name__) OPENINFERENCE_SPAN_KIND = SpanAttributes.OPENINFERENCE_SPAN_KIND @@ -84,6 +96,17 @@ def __init__(self, context_state: AIQContextState | None = None): self._tracer = trace.get_tracer("aiq-async-otel-listener") + # Initialize Weave-specific components if available + self.gc = None + self._weave_calls = {} + if WEAVE_AVAILABLE: + try: + # Try to get the weave client, but don't fail if Weave isn't initialized + self.gc = weave_client_context.require_weave_client() + except Exception: + # Weave is not initialized, so we don't do anything + pass + def _on_next(self, step: IntermediateStep) -> None: """ The main logic that reacts to each IntermediateStep. @@ -159,6 +182,12 @@ async def _cleanup(self): self._span_stack.clear() + # Clean up any lingering Weave calls if Weave is available and initialized + if self.gc is not None and self._weave_calls: + for _, call in list(self._weave_calls.items()): + self.gc.finish_call(call, {"status": "incomplete"}) + self._weave_calls.clear() + def _serialize_payload(self, input_value: Any) -> tuple[str, bool]: """ Serialize the input value to a string. Returns a tuple with the serialized value and a boolean indicating if the @@ -237,6 +266,10 @@ def _process_start_event(self, step: IntermediateStep): self._outstanding_spans[step.UUID] = sub_span + # Create corresponding Weave call if Weave is available and initialized + if self.gc is not None: + self._create_weave_call(step, sub_span) + def _process_end_event(self, step: IntermediateStep): # Find the subspan that was created in the start event @@ -271,3 +304,114 @@ def _process_end_event(self, step: IntermediateStep): # End the subspan sub_span.end(end_time=end_ns) + + # Finish corresponding Weave call if Weave is available and initialized + if self.gc is not None: + self._finish_weave_call(step, sub_span) + + @contextmanager + def parent_call(self, trace_id: str, parent_call_id: str): + """Context manager to set a parent call context for Weave. + This allows connecting AIQ spans to existing traces from other frameworks. + """ + dummy_call = Call(trace_id=trace_id, id=parent_call_id, _op_name="", project_id="", parent_id=None, inputs={}) + with set_call_stack([dummy_call]): + yield + + def _create_weave_call(self, step: IntermediateStep, span: Span) -> None: + """ + Create a Weave call directly from the span and step data, + connecting to existing framework traces if available. + """ + # Check for existing Weave trace/call + existing_call = get_current_call() + + # Extract parent call if applicable + parent_call = None + + # If we have an existing Weave call from another framework (e.g., LangChain), + # use it as the parent + if existing_call is not None: + parent_call = existing_call + logger.debug(f"Found existing Weave call: {existing_call.id} from trace: {existing_call.trace_id}") + # Otherwise, check our internal stack for parent relationships + elif len(self._weave_calls) > 0 and len(self._span_stack) > 1: + # Get the parent span using stack position (one level up) + parent_span_id = self._span_stack[-2].get_span_context().span_id + # Find the corresponding weave call for this parent span + for uuid, call in self._weave_calls.items(): + if getattr(call, "span_id", None) == parent_span_id: + parent_call = call + break + + # Generate a meaningful operation name based on event type + event_type = step.payload.event_type.split(".")[-1] + if step.payload.name: + op_name = f"aiq.{event_type}.{step.payload.name}" + else: + op_name = f"aiq.{event_type}" + + # Create input dictionary + inputs = {} + if step.payload.data and step.payload.data.input is not None: + try: + # Add the input to the Weave call + inputs["input"] = step.payload.data.input + except Exception: + # If serialization fails, use string representation + inputs["input"] = str(step.payload.data.input) + + # Create the Weave call + call = self.gc.create_call( + op_name, + inputs=inputs, + parent=parent_call, + attributes=span.attributes, + display_name=op_name, + ) + + # Store the call with step UUID as key + self._weave_calls[step.UUID] = call + + # Store span ID for parent reference + setattr(call, "span_id", span.get_span_context().span_id) + + return call + + def _finish_weave_call(self, step: IntermediateStep, span: Span) -> None: + """ + Finish a previously created Weave call + """ + # Find the call for this step + call = self._weave_calls.pop(step.UUID, None) + + if call is None: + logger.warning("No Weave call found for step %s", step.UUID) + return + + # Create output dictionary + outputs = {} + if step.payload.data and step.payload.data.output is not None: + try: + # Add the output to the Weave call + outputs["output"] = step.payload.data.output + except Exception: + # If serialization fails, use string representation + outputs["output"] = str(step.payload.data.output) + + # Add usage information if available + usage_info = step.payload.usage_info + if usage_info: + if usage_info.token_usage: + outputs["prompt_tokens"] = usage_info.token_usage.prompt_tokens + outputs["completion_tokens"] = usage_info.token_usage.completion_tokens + outputs["total_tokens"] = usage_info.token_usage.total_tokens + + if usage_info.num_llm_calls: + outputs["num_llm_calls"] = usage_info.num_llm_calls + + if usage_info.seconds_between_calls: + outputs["seconds_between_calls"] = usage_info.seconds_between_calls + + # Finish the call with outputs + self.gc.finish_call(call, outputs)