Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions docs/source/workflows/evaluate.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,16 @@ Here is a sample evaluator output generated by running evaluation on the simple
The contents of the file have been `snipped` for brevity.

## Visualizing Evaluation Results
You can visualize the evaluation results using the Weights and Biases (W&B) Weave dashboard.
You can visualize the evaluation results using the Weights and Biases (W&B) Weave dashboard or Arize Phoenix.

### Step 1: Install the Weave plugin
### Weights and Biases (W&B) Weave
#### Step 1: Install the Weave plugin
To install the Weave plugin, run:
```bash
uv pip install -e '.[weave]'
```

### Step 2: Enable logging to Weave in the configuration file
#### Step 2: Enable logging to Weave in the configuration file
Edit your evaluation config, for example:
`examples/evaluation_and_profiling/simple_web_query_eval/src/nat_simple_web_query_eval/configs/eval_config_llama31.yml`:
```yaml
Expand All @@ -273,6 +274,28 @@ general:
_type: weave
project: "nat-simple"
```
### Arize Phoenix
#### Exporting evaluation metrics to Arize Phoenix
If your workflow is already configured to export traces to Phoenix, the evaluation run will also export per-item and summary metrics to Phoenix.

#### Step 1: Install the Phoenix plugin
```bash
uv pip install -e '.[phoenix]'
```

#### Step 2: Enable Phoenix tracing in your configuration
Add Phoenix under `general.telemetry.tracing` in your workflow config used for evaluation:
```yaml
general:
telemetry:
tracing:
phoenix:
_type: phoenix
endpoint: http://localhost:6006/v1/traces
project: nat-simple
```

With this tracing configuration, `nat eval` will attempt to log evaluator scores to Phoenix. Scores will be associated with traces by matching the recorded `input.value` of spans to the evaluation items. In the Phoenix UI, open your project and inspect evaluations/feedback linked to recent traces.

When running experiments with different configurations, the `project` name should be the same to allow for comparison of runs. The `workflow_alias` can be configured to differentiate between runs with different configurations. For example to run two evaluations with different LLM models, you can configure the `workflow_alias` as follows:
`examples/evaluation_and_profiling/simple_web_query_eval/src/nat_simple_web_query_eval/configs/eval_config_llama31.yml`:
Expand Down
58 changes: 48 additions & 10 deletions src/nat/eval/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ def __init__(self, config: EvaluationRunConfig):
self.eval_trace_context = EvalTraceContext()

self.weave_eval: WeaveEvaluationIntegration = WeaveEvaluationIntegration(self.eval_trace_context)

try:
from nat.eval.utils.eval_trace_ctx import EvalTraceContext
from nat.eval.utils.phoenix_eval import PhoenixEvaluationIntegration
# Phoenix doesn't need its specific trace context because phoenix annotates spans; no context API is used
self.phoenix_eval = PhoenixEvaluationIntegration(EvalTraceContext())
except Exception:
self.phoenix_eval = None

self._use_weave_eval: bool = False
self._use_phoenix_eval: bool = False

# Metadata
self.eval_input: EvalInput | None = None
self.workflow_interrupted: bool = False
Expand Down Expand Up @@ -213,8 +225,12 @@ async def run_one(item: EvalInputItem):
item.trajectory = self.intermediate_step_adapter.validate_intermediate_steps(intermediate_steps)
usage_stats_item = self._compute_usage_stats(item)

self.weave_eval.log_prediction(item, output)
await self.weave_eval.log_usage_stats(item, usage_stats_item)
if self._use_weave_eval:
self.weave_eval.log_prediction(item, output)
await self.weave_eval.log_usage_stats(item, usage_stats_item)
if self._use_phoenix_eval and self.phoenix_eval:
self.phoenix_eval.log_prediction(item, output)
await self.phoenix_eval.log_usage_stats(item, usage_stats_item)

async def wrapped_run(item: EvalInputItem) -> None:
await run_one(item)
Expand All @@ -238,8 +254,12 @@ async def run_workflow_remote(self):
await handler.run_workflow_remote(self.eval_input)
for item in self.eval_input.eval_input_items:
usage_stats_item = self._compute_usage_stats(item)
self.weave_eval.log_prediction(item, item.output_obj)
await self.weave_eval.log_usage_stats(item, usage_stats_item)
if self._use_weave_eval:
self.weave_eval.log_prediction(item, item.output_obj)
await self.weave_eval.log_usage_stats(item, usage_stats_item)
if self._use_phoenix_eval and self.phoenix_eval:
self.phoenix_eval.log_prediction(item, item.output_obj)
await self.phoenix_eval.log_usage_stats(item, usage_stats_item)

async def profile_workflow(self) -> ProfilerResults:
"""
Expand Down Expand Up @@ -357,15 +377,22 @@ def publish_output(self, dataset_handler: DatasetHandler, profiler_results: Prof
"`eval` with the --skip_completed_entries flag.")
logger.warning(msg)

self.weave_eval.log_summary(self.usage_stats, self.evaluation_results, profiler_results)
if self._use_weave_eval:
self.weave_eval.log_summary(self.usage_stats, self.evaluation_results, profiler_results)
# Export to Phoenix if selected
if self._use_phoenix_eval and self.phoenix_eval:
self.phoenix_eval.log_summary(self.usage_stats, self.evaluation_results, profiler_results)

async def run_single_evaluator(self, evaluator_name: str, evaluator: Any):
"""Run a single evaluator and store its results."""
try:
eval_output = await evaluator.evaluate_fn(self.eval_input)
self.evaluation_results.append((evaluator_name, eval_output))

await self.weave_eval.alog_score(eval_output, evaluator_name)
if self._use_weave_eval:
await self.weave_eval.alog_score(eval_output, evaluator_name)
if self._use_phoenix_eval and self.phoenix_eval:
await self.phoenix_eval.alog_score(eval_output, evaluator_name)
except Exception as e:
logger.exception("An error occurred while running evaluator %s: %s", evaluator_name, e)

Expand All @@ -383,8 +410,11 @@ async def run_evaluators(self, evaluators: dict[str, Any]):
logger.error("An error occurred while running evaluators: %s", e)
raise
finally:
# Finish prediction loggers in Weave
await self.weave_eval.afinish_loggers()
# Finish prediction loggers where enabled
if self._use_weave_eval:
await self.weave_eval.afinish_loggers()
if self._use_phoenix_eval and self.phoenix_eval:
await self.phoenix_eval.afinish_loggers()

def apply_overrides(self):
from nat.cli.cli_utils.config_override import load_and_override_config
Expand Down Expand Up @@ -511,8 +541,16 @@ async def run_and_evaluate(self,

# Run workflow and evaluate
async with WorkflowEvalBuilder.from_config(config=config) as eval_workflow:
# Initialize Weave integration
self.weave_eval.initialize_logger(workflow_alias, self.eval_input, config)
# Decide which evaluation integrations to use based on tracing config
tracing_cfg = getattr(getattr(config.general, 'telemetry', None), 'tracing', None)
self._use_weave_eval = isinstance(tracing_cfg, dict) and ('weave' in tracing_cfg)
self._use_phoenix_eval = isinstance(tracing_cfg, dict) and ('phoenix' in tracing_cfg)

# Initialize selected integrations
if self._use_weave_eval:
self.weave_eval.initialize_logger(workflow_alias, self.eval_input, config)
if self._use_phoenix_eval and self.phoenix_eval:
self.phoenix_eval.initialize_logger(workflow_alias, self.eval_input, config)

with self.eval_trace_context.evaluation_context():
# Run workflow
Expand Down
239 changes: 239 additions & 0 deletions src/nat/eval/utils/phoenix_eval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import logging
from datetime import datetime
from datetime import timedelta
from typing import TYPE_CHECKING
from typing import Any

import httpx

from nat.eval.evaluator.evaluator_model import EvalInput
from nat.eval.evaluator.evaluator_model import EvalInputItem
from nat.eval.evaluator.evaluator_model import EvalOutput
from nat.eval.usage_stats import UsageStats
from nat.profiler.data_models import ProfilerResults

if TYPE_CHECKING:
from nat.eval.utils.eval_trace_ctx import EvalTraceContext

logger = logging.getLogger(__name__)


class PhoenixEvaluationIntegration:
"""
Class to handle Arize Phoenix integration for evaluation metrics.

This integration attempts best-effort logging of per-item evaluator scores
to Phoenix when Phoenix tracing is configured in the workflow config.
"""

def __init__(self, eval_trace_context: "EvalTraceContext"):
self.available = False
self.client = None
self.project_name: str | None = None
self.eval_trace_context = eval_trace_context
self.run_name: str | None = None
# Minimal state to match Weave-level complexity
# Best-effort mapping from eval item id -> input string for span association
self._id_to_input: dict[str, str] = {}

try:
from phoenix.client import Client as _PhoenixClient # noqa: F401
self.available = True
except ImportError:
self.available = False

def _extract_phoenix_server_url(self, endpoint: str) -> str:
"""Convert OTLP traces endpoint to Phoenix server URL if needed.

Example: http://localhost:6006/v1/traces -> http://localhost:6006
"""
if not endpoint:
return endpoint
# strip trailing '/v1/traces' if present
suffix = "/v1/traces"
return endpoint[:-len(suffix)] if endpoint.endswith(suffix) else endpoint

def _find_phoenix_config(self, config: Any) -> tuple[str | None, str | None]:
"""Find Phoenix tracing config (endpoint, project) from full config object."""
try:
cfg = config.model_dump(mode="json")
except AttributeError:
try:
# If already a dict
cfg = dict(config) # type: ignore[arg-type]
except (TypeError, ValueError):
return None, None

tracing = (cfg.get("general", {}) or {}).get("telemetry", {}).get("tracing", {})
phoenix_cfg = tracing.get("phoenix") or tracing.get("Phoenix")
if not isinstance(phoenix_cfg, dict):
return None, None

endpoint = phoenix_cfg.get("endpoint")
project = phoenix_cfg.get("project")
return (endpoint, project)

def _metric_eval_name(self, metric: str) -> str:
return f"{self.run_name}:{metric}" if self.run_name else metric

def initialize_logger(self, _workflow_alias: str, _eval_input: EvalInput, config: Any) -> bool:
"""Initialize Phoenix client if Phoenix tracing is configured."""
if not self.available:
return False

endpoint, project = self._find_phoenix_config(config)
if not endpoint or not project:
# Phoenix tracing not configured; skip
return False

try:
from phoenix.client import Client as PhoenixClient
except ImportError as e:
logger.warning("Failed to import phoenix client: %s", e)
self.client = None
self.project_name = None
return False

try:
server_url = self._extract_phoenix_server_url(endpoint)
self.client = PhoenixClient(base_url=server_url)
self.project_name = project
# capture a friendly run label (workflow alias) for evaluations
self.run_name = _workflow_alias
# Build id->input mapping for later span matching (best effort)
if _eval_input and getattr(_eval_input, "eval_input_items", None):
for it in _eval_input.eval_input_items:
item_id = str(it.id)
input_val = str(it.input_obj) if it.input_obj is not None else ""
if item_id:
self._id_to_input[item_id] = input_val
logger.debug("Initialized Phoenix client for project '%s' at '%s'", project, server_url)
return True
except (ValueError, RuntimeError, TypeError) as e:
logger.warning("Failed to initialize Phoenix client: %s", e)
self.client = None
self.project_name = None
return False

def log_prediction(self, _item: EvalInputItem, _output: Any):
"""No-op for Phoenix (kept for interface parity)."""
return

async def log_usage_stats(self, item: EvalInputItem, usage_stats_item): # noqa: ANN001
"""Best-effort usage stats logging as span annotations.

We intentionally keep this lightweight and skip logging if span resolution fails.
"""
if not self.client:
return
span_id = self._resolve_span_id_for_item(str(item.id))
if not span_id:
return
try:
self.client.annotations.add_span_annotation(
span_id=span_id,
annotation_name=self._metric_eval_name("wf_runtime"),
annotator_kind="LLM",
label="seconds",
score=float(getattr(usage_stats_item, "runtime", 0.0) or 0.0),
explanation=None,
)
self.client.annotations.add_span_annotation(
span_id=span_id,
annotation_name=self._metric_eval_name("wf_tokens"),
annotator_kind="LLM",
label="count",
score=float(getattr(usage_stats_item, "total_tokens", 0) or 0),
explanation=None,
)
except (ValueError, TypeError, RuntimeError, httpx.HTTPError):
logger.debug("Phoenix usage stats logging failed")

async def alog_score(self, eval_output: EvalOutput, evaluator_name: str):
"""Log per-item evaluator scores to Phoenix as span annotations."""
if not self.client:
return

if not eval_output.eval_output_items:
return

for eval_output_item in eval_output.eval_output_items:
span_id = self._resolve_span_id_for_item(str(eval_output_item.id))
if not span_id:
continue
score_val = eval_output_item.score
try:
score_val = float(score_val)
except (TypeError, ValueError):
# Skip non-numeric scores
continue
try:
self.client.annotations.add_span_annotation(
span_id=span_id,
annotation_name=self._metric_eval_name(evaluator_name),
annotator_kind="LLM",
label="score",
score=score_val,
explanation=None,
)
except (ValueError, TypeError, RuntimeError, httpx.HTTPError):
logger.debug("Phoenix per-item score logging failed")

async def afinish_loggers(self):
# No-op for Phoenix integration
return

def log_summary(self,
_usage_stats: UsageStats,
_evaluation_results: list[tuple[str, EvalOutput]],
_profiler_results: ProfilerResults):
"""No-op: Phoenix Client annotations are span-based; skip summary logging."""
return

def _resolve_span_id_for_item(self, item_id: str) -> str | None:
"""Resolve a Phoenix span id for an evaluation item.

Keep this best-effort and lightweight: fetch a small recent window of spans
and match on `input.value`. If unavailable or not found, skip.
"""
if not self.client or not self.project_name or not item_id:
return None
input_value = self._id_to_input.get(item_id)
if input_value is None:
return None
try:
# Search a narrow window to reduce overhead
end_time = datetime.now()
start_time = end_time - timedelta(hours=4)
spans = self.client.spans.get_spans(
project_identifier=self.project_name,
limit=2000,
start_time=start_time,
end_time=end_time,
)
for span in spans or []:
sid = span.get("id")
attrs = span.get("attributes") or {}
val = attrs.get("input.value")
if sid and val is not None and str(val) == str(input_value):
return str(sid)
except (ValueError, TypeError, RuntimeError, httpx.HTTPError):
return None
return None
Loading