Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import json
import logging
import re

Expand Down Expand Up @@ -60,13 +61,15 @@ def _split_evaluators_and_grader_configs(
:return: Tuple of two dictionaries, the first containing evaluators and the second containing AOAI graders.
:rtype: Tuple[Dict[str, Callable], Dict[str, AoaiGrader]]
"""
LOGGER.info(f"AOAI: Splitting {len(evaluators)} evaluators into AOAI graders and standard evaluators...")
true_evaluators = {}
aoai_graders = {}
for key, value in evaluators.items():
if isinstance(value, AzureOpenAIGrader):
aoai_graders[key] = value
else:
true_evaluators[key] = value
LOGGER.info(f"AOAI: Found {len(aoai_graders)} AOAI graders and {len(true_evaluators)} standard evaluators.")
return true_evaluators, aoai_graders


Expand Down Expand Up @@ -103,11 +106,18 @@ def _begin_aoai_evaluation(
LOGGER.info("AOAI: Aoai graders detected among evaluator inputs. Preparing to create OAI eval group...")
all_eval_run_info: List[OAIEvalRunCreationInfo] = []

for selected_graders, selected_column_mapping in _get_graders_and_column_mappings(graders, column_mappings):
grader_mapping_list = list(_get_graders_and_column_mappings(graders, column_mappings))
LOGGER.info(f"AOAI: Will create {len(grader_mapping_list)} separate evaluation run(s) based on column mappings.")

for idx, (selected_graders, selected_column_mapping) in enumerate(grader_mapping_list):
LOGGER.info(
f"AOAI: Starting evaluation run {idx + 1}/{len(grader_mapping_list)} with {len(selected_graders)} grader(s)..."
)
all_eval_run_info.append(
_begin_single_aoai_evaluation(selected_graders, data, selected_column_mapping, run_name)
)

LOGGER.info(f"AOAI: Successfully created {len(all_eval_run_info)} evaluation run(s).")
return all_eval_run_info


Expand All @@ -133,6 +143,7 @@ def _begin_single_aoai_evaluation(
"""

# Format data for eval group creation
LOGGER.info(f"AOAI: Preparing evaluation for {len(graders)} grader(s): {list(graders.keys())}")
grader_name_list = []
grader_list = []
# It's expected that all graders supplied for a single eval run use the same credentials
Expand All @@ -143,10 +154,12 @@ def _begin_single_aoai_evaluation(
grader_name_list.append(name)
grader_list.append(grader._grader_config)
effective_column_mapping: Dict[str, str] = column_mapping or {}
LOGGER.info(f"AOAI: Generating data source config with {len(effective_column_mapping)} column mapping(s)...")
data_source_config = _generate_data_source_config(data, effective_column_mapping)
LOGGER.info(f"AOAI: Data source config generated with schema type: {data_source_config.get('type')}")

# Create eval group
# import pdb; pdb.set_trace()
LOGGER.info(f"AOAI: Creating eval group with {len(grader_list)} testing criteria...")
eval_group_info = client.evals.create(
data_source_config=data_source_config, testing_criteria=grader_list, metadata={"is_foundry_eval": "true"}
)
Expand All @@ -167,6 +180,7 @@ def _begin_single_aoai_evaluation(
grader_name_map[criteria.id] = name

# Create eval run
LOGGER.info(f"AOAI: Creating eval run '{run_name}' with {len(data)} data rows...")
eval_run_id = _begin_eval_run(client, eval_group_info.id, run_name, data, effective_column_mapping)
LOGGER.info(
f"AOAI: Eval run created with id {eval_run_id}."
Expand Down Expand Up @@ -197,13 +211,16 @@ def _get_evaluation_run_results(all_run_info: List[OAIEvalRunCreationInfo]) -> T
:raises EvaluationException: If the evaluation run fails or is not completed before timing out.
"""

LOGGER.info(f"AOAI: Retrieving results from {len(all_run_info)} evaluation run(s)...")
run_metrics = {}
output_df = pd.DataFrame()
for run_info in all_run_info:
for idx, run_info in enumerate(all_run_info):
LOGGER.info(f"AOAI: Fetching results for run {idx + 1}/{len(all_run_info)} (ID: {run_info['eval_run_id']})...")
cur_output_df, cur_run_metrics = _get_single_run_results(run_info)
output_df = pd.concat([output_df, cur_output_df], axis=1)
run_metrics.update(cur_run_metrics)

LOGGER.info(f"AOAI: Successfully retrieved all results. Combined dataframe shape: {output_df.shape}")
return output_df, run_metrics


Expand All @@ -223,8 +240,10 @@ def _get_single_run_results(
:raises EvaluationException: If the evaluation run fails or is not completed before timing out.
"""
# Wait for evaluation run to complete
LOGGER.info(f"AOAI: Waiting for eval run {run_info['eval_run_id']} to complete...")
run_results = _wait_for_run_conclusion(run_info["client"], run_info["eval_group_id"], run_info["eval_run_id"])

LOGGER.info(f"AOAI: Eval run {run_info['eval_run_id']} completed with status: {run_results.status}")
if run_results.status != "completed":
raise EvaluationException(
message=f"AOAI evaluation run {run_info['eval_group_id']}/{run_info['eval_run_id']}"
Expand All @@ -235,6 +254,7 @@ def _get_single_run_results(
)

# Convert run results into a dictionary of metrics
LOGGER.info(f"AOAI: Processing results and calculating metrics for run {run_info['eval_run_id']}...")
run_metrics: Dict[str, Any] = {}
if run_results.per_testing_criteria_results is None:
msg = (
Expand All @@ -255,8 +275,10 @@ def _get_single_run_results(
ratio = passed / (passed + failed) if (passed + failed) else 0.0
formatted_column_name = f"{grader_name}.pass_rate"
run_metrics[formatted_column_name] = ratio
LOGGER.info(f"AOAI: Grader '{grader_name}': {passed} passed, {failed} failed, pass_rate={ratio:.4f}")

# Collect all results with pagination
LOGGER.info(f"AOAI: Collecting output items for run {run_info['eval_run_id']} with pagination...")
all_results: List[Any] = []
next_cursor: Optional[str] = None
limit = 100 # Max allowed by API
Expand All @@ -280,6 +302,7 @@ def _get_single_run_results(
else:
break

LOGGER.info(f"AOAI: Collected {len(all_results)} total output items across all pages.")
listed_results: Dict[str, List[Any]] = {"index": []}
# Raw data has no order guarantees; capture datasource_item_id per row for ordering.
for row_result in all_results:
Expand Down Expand Up @@ -329,6 +352,7 @@ def _get_single_run_results(

# Ensure all columns are the same length as the 'index' list
num_rows = len(listed_results["index"])
LOGGER.info(f"AOAI: Processing {num_rows} result rows into dataframe...")
for col_name in list(listed_results.keys()):
if col_name != "index":
col_length = len(listed_results[col_name])
Expand Down Expand Up @@ -356,6 +380,7 @@ def _get_single_run_results(
expected = run_info.get("expected_rows", None)
if expected is not None:
pre_len = len(output_df)
LOGGER.info(f"AOAI: Validating result count: expected {expected} rows, received {pre_len} rows.")
# Assumes original datasource_item_id space is 0..expected-1
output_df = output_df.reindex(range(expected))
if pre_len != expected:
Expand Down Expand Up @@ -388,6 +413,9 @@ def _get_single_run_results(

# Reset to RangeIndex so downstream concatenation aligns on position
output_df.reset_index(drop=True, inplace=True)
LOGGER.info(
f"AOAI: Successfully processed run {run_info['eval_run_id']} with final dataframe shape: {output_df.shape}"
)
return output_df, run_metrics


Expand Down Expand Up @@ -481,11 +509,16 @@ def _get_graders_and_column_mappings(
:rtype: List[Tuple[Dict[str, AoaiGrader], Optional[Dict[str, str]]]]
"""

LOGGER.info(f"AOAI: Organizing {len(graders)} graders with column mappings...")
if column_mappings is None:
LOGGER.info("AOAI: No column mappings provided, each grader will have its own eval run.")
return [({name: grader}, None) for name, grader in graders.items()]
default_mapping = column_mappings.get("default", None)
if default_mapping is None:
default_mapping = {}
LOGGER.info(
f"AOAI: Using default mapping with {len(default_mapping)} entries for graders without specific mappings."
)
return [
({name: grader}, None if column_mappings is None else column_mappings.get(name, default_mapping))
for name, grader in graders.items()
Expand Down Expand Up @@ -593,17 +626,23 @@ def _generate_data_source_config(input_data_df: pd.DataFrame, column_mapping: Di
helper function.
"""
# Extract referenced data paths from mapping values of the form ${data.<path>} (ignore ${run.outputs.*})
LOGGER.info(
f"AOAI: Generating data source config for {len(input_data_df)} rows with {len(column_mapping)} column mapping(s)..."
)
referenced_paths: List[str] = []
for v in column_mapping.values():
m = DATA_PATH_PATTERN.match(v)
if m:
referenced_paths.append(m.group(1))

LOGGER.info(f"AOAI: Found {len(referenced_paths)} referenced paths in column mappings: {referenced_paths}")
# Decide if we have nested structures
has_nested = any("." in p for p in referenced_paths)
LOGGER.info(f"AOAI: Schema generation mode: {'nested' if has_nested else 'flat'}")

if not referenced_paths or not has_nested:
# Legacy flat behavior (existing logic): treat each mapping key as independent string field
LOGGER.info("AOAI: Using flat schema generation (no nested structures detected).")
data_source_config = {
"type": "custom",
"item_schema": {
Expand All @@ -617,6 +656,7 @@ def _generate_data_source_config(input_data_df: pd.DataFrame, column_mapping: Di
for key in column_mapping.keys():
props[key] = {"type": "string"}
req.append(key)
LOGGER.info(f"AOAI: Flat schema generated with {len(props)} properties: {list(props.keys())}")
return data_source_config

# NEW: If all nested paths share the same first segment (e.g. 'item'),
Expand All @@ -625,12 +665,14 @@ def _generate_data_source_config(input_data_df: pd.DataFrame, column_mapping: Di
first_segments = {p.split(".")[0] for p in referenced_paths}
strip_wrapper = False
wrapper_name = None
LOGGER.info(f"AOAI: First segments in referenced paths: {first_segments}")
if len(first_segments) == 1:
only_seg = next(iter(first_segments))
# We only strip if that segment looks like the canonical wrapper.
if only_seg == WRAPPER_KEY:
strip_wrapper = True
wrapper_name = only_seg
LOGGER.info(f"AOAI: All paths start with wrapper '{WRAPPER_KEY}', will strip from schema.")

effective_paths = referenced_paths
if strip_wrapper:
Expand All @@ -645,9 +687,12 @@ def _generate_data_source_config(input_data_df: pd.DataFrame, column_mapping: Di
# If stripping produced at least one usable path, adopt; else fall back to original.
if stripped:
effective_paths = stripped
LOGGER.info(f"AOAI: Effective paths after stripping wrapper: {effective_paths}")

LOGGER.info(f"AOAI: Building nested schema from {len(effective_paths)} effective paths...")
nested_schema = _build_schema_tree_from_paths(effective_paths, force_leaf_type="string")

LOGGER.info(f"AOAI: Nested schema generated successfully with type '{nested_schema.get('type')}'")
return {
"type": "custom",
"item_schema": nested_schema,
Expand Down Expand Up @@ -697,6 +742,23 @@ def _get_data_source(input_data_df: pd.DataFrame, column_mapping: Dict[str, str]
:return: A dictionary that can be used as the data source input for an OAI evaluation run.
:rtype: Dict[str, Any]
"""

def _convert_value_to_string(val: Any) -> str:
"""Convert a value to string representation for AOAI evaluation."""
if val is None:
return ""
elif isinstance(val, (str, int, float, bool)):
return str(val)
else:
try: # Attempt to JSON serialize lists/dicts
return json.dumps(val, ensure_ascii=False)
except (TypeError, ValueError):
# Fallback for unserializable objects
return str(val)

LOGGER.info(
f"AOAI: Building data source from {len(input_data_df)} rows with {len(column_mapping)} column mappings..."
)
# Gather path specs: list of tuples (original_mapping_value, relative_parts, dataframe_column_name)
# relative_parts excludes the wrapper (so schema + content align).
path_specs: List[Tuple[str, List[str], str]] = []
Expand Down Expand Up @@ -746,24 +808,21 @@ def _get_data_source(input_data_df: pd.DataFrame, column_mapping: Dict[str, str]
leaf_name = pieces[-1]
path_specs.append((formatted_entry, [leaf_name], run_col))

LOGGER.info(f"AOAI: Processed {len(path_specs)} path specifications from column mappings.")
content: List[Dict[str, Any]] = []

for _, row in input_data_df.iterrows():
item_root: Dict[str, Any] = {}

# Track which dataframe columns have been processed via column_mapping
processed_cols: Set[str] = set()

for _, rel_parts, df_col in path_specs:
# Safely fetch value
val = row.get(df_col, None)

# Convert value to string to match schema's "type": "string" leaves.
# (If you later infer types, you can remove the stringify.)
if val is None:
str_val = ""
elif isinstance(val, (str, int, float, bool)):
str_val = str(val)
else:
# Lists / dicts / other -> string for now
str_val = str(val)
str_val = _convert_value_to_string(val)

# Insert into nested dict
cursor = item_root
Expand All @@ -776,8 +835,19 @@ def _get_data_source(input_data_df: pd.DataFrame, column_mapping: Dict[str, str]
leaf_key = rel_parts[-1]
cursor[leaf_key] = str_val

# Mark this dataframe column as processed
processed_cols.add(df_col)

# Add any unmapped dataframe columns directly to item_root
for col_name in input_data_df.columns:
if col_name not in processed_cols:
val = row.get(col_name, None)
str_val = _convert_value_to_string(val)
item_root[col_name] = str_val

content.append({WRAPPER_KEY: item_root})

LOGGER.info(f"AOAI: Generated {len(content)} content items for data source.")
return {
"type": "jsonl",
"source": {
Expand Down Expand Up @@ -812,6 +882,7 @@ def _begin_eval_run(
:rtype: str
"""

LOGGER.info(f"AOAI: Creating eval run '{run_name}' for eval group {eval_group_id}...")
data_source = _get_data_source(input_data_df, column_mapping)
eval_run = client.evals.runs.create(
eval_id=eval_group_id,
Expand All @@ -820,6 +891,7 @@ def _begin_eval_run(
metadata={"sample_generation": "off", "file_format": "jsonl", "is_foundry_eval": "true"},
# TODO decide if we want to add our own timeout value?
)
LOGGER.info(f"AOAI: Eval run created successfully with ID: {eval_run.id}")
return eval_run.id


Expand Down Expand Up @@ -856,8 +928,11 @@ def _wait_for_run_conclusion(
if total_wait > max_wait_seconds:
wait_interval -= total_wait - max_wait_seconds
sleep(wait_interval)
iters += 1
response = client.evals.runs.retrieve(eval_id=eval_group_id, run_id=eval_run_id)
LOGGER.info(f"AOAI: Polling iteration {iters}, status: {response.status}, total wait: {total_wait:.1f}s")
if response.status not in ["queued", "in_progress"]:
LOGGER.info(f"AOAI: Eval run {eval_run_id} reached terminal status: {response.status}")
return response
if total_wait > max_wait_seconds:
raise EvaluationException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"query": "What is the capital of France?", "response": "Paris is the capital of France.", "ground_truth": "Paris"}
{"query": "What is 2+2?", "response": "The answer is 4.", "ground_truth": "4"}
{"query": "Who wrote Hamlet?", "response": "William Shakespeare wrote Hamlet.", "ground_truth": "Shakespeare"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"item": {"query": "What security policies exist?", "context": {"company": {"policy": {"security": {"passwords": {"rotation_days": 90, "min_length": 12}, "network": {"vpn": {"required": true, "provider": "Cisco"}}}}}}, "response": "Password rotation is required every 90 days with minimum 12 characters. VPN is required using Cisco provider.", "ground_truth": "Security policies include password rotation every 90 days and VPN requirement."}}
{"item": {"query": "What are the database settings?", "context": {"company": {"infrastructure": {"database": {"host": "db.example.com", "port": 5432, "type": "PostgreSQL"}}}}, "response": "The database is PostgreSQL hosted at db.example.com on port 5432.", "ground_truth": "PostgreSQL database on db.example.com:5432"}}
{"item": {"query": "What is the deployment process?", "context": {"company": {"devops": {"deployment": {"strategy": "blue-green", "frequency": "daily", "tools": ["Jenkins", "Kubernetes"]}}}}, "response": "We use blue-green deployment strategy daily with Jenkins and Kubernetes.", "ground_truth": "Blue-green deployment daily using Jenkins and Kubernetes"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"item": {"query": "Simple query", "response": "Simple response", "ground_truth": "Simple truth"}}
{"item": {"query": "Another query", "response": "Another response", "ground_truth": "Another truth"}}
Loading