Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1046,18 +1046,73 @@ def _evaluate( # pylint: disable=too-many-locals,too-many-statements
result, LOGGER, eval_id, eval_run_id, evaluators_and_graders, eval_run_summary_dict, eval_meta_data
)
if app_insights_configuration := kwargs.get("_app_insights_configuration"):
emit_eval_result_events_to_app_insights(app_insights_configuration, result["_evaluation_results_list"])
emit_eval_result_events_to_app_insights(app_insights_configuration, result["_evaluation_results_list"], evaluator_config)

if output_path:
_write_output(output_path, result)
return result


def _build_internal_log_attributes(
event_data: Dict[str, Any],
metric_name: str,
evaluator_config: Optional[Dict[str, EvaluatorConfig]],
internal_log_attributes: Dict[str, str]
) -> Dict[str, str]:
"""
Build internal log attributes for OpenTelemetry logging.

:param event_data: The event data containing threshold and name information
:type event_data: Dict[str, Any]
:param metric_name: The name of the metric being evaluated
:type metric_name: str
:param evaluator_config: Configuration for evaluators
:type evaluator_config: Optional[Dict[str, EvaluatorConfig]]
:return: Dictionary of internal log attributes
:rtype: Dict[str, str]
"""
# Add threshold if present
if event_data.get("threshold"):
internal_log_attributes["gen_ai.evaluation.threshold"] = str(event_data["threshold"])

# Add testing criteria details if present
testing_criteria_name = event_data.get("name")
if testing_criteria_name:
internal_log_attributes["gen_ai.evaluation.testing_criteria.name"] = testing_criteria_name

# Get evaluator definition details
if evaluator_config and testing_criteria_name in evaluator_config:
testing_criteria_config = evaluator_config[testing_criteria_name]

if "evaluator_name" in testing_criteria_config and testing_criteria_config["evaluator_name"]:
internal_log_attributes["gen_ai.evaluator.name"] = testing_criteria_config["evaluator_name"]

if "evaluator_version" in testing_criteria_config and testing_criteria_config["evaluator_version"] is not None:
internal_log_attributes["gen_ai.evaluator.version"] = str(testing_criteria_config["evaluator_version"])

if "evaluator_id" in testing_criteria_config and testing_criteria_config["evaluator_id"] is not None:
internal_log_attributes["gen_ai.evaluator.id"] = str(testing_criteria_config["evaluator_id"])

if ("evaluator_definition" in testing_criteria_config and
testing_criteria_config["evaluator_definition"] and
metric_name in testing_criteria_config["evaluator_definition"]["metrics"]):
metric_config_detail = testing_criteria_config["evaluator_definition"]["metrics"][metric_name]

if metric_config_detail:
if metric_config_detail.get("min_value") is not None:
internal_log_attributes["gen_ai.evaluation.min_value"] = str(metric_config_detail["min_value"])
if metric_config_detail.get("max_value") is not None:
internal_log_attributes["gen_ai.evaluation.max_value"] = str(metric_config_detail["max_value"])

return internal_log_attributes


def _log_events_to_app_insights(
otel_logger,
events: List[Dict[str, Any]],
log_attributes: Dict[str, Any],
data_source_item: Optional[Dict[str, Any]] = None,
evaluator_config: Optional[Dict[str, EvaluatorConfig]] = None
) -> None:
"""
Log independent events directly to App Insights using OpenTelemetry logging.
Expand All @@ -1082,82 +1137,94 @@ def _log_events_to_app_insights(
response_id = None
conversation_id = None
previous_response_id = None
agent_name = None
agent_version = None
agent_id = None
agent_version = None
agent_name = None
if data_source_item:
for key, value in data_source_item.items():
if key.endswith("trace_id") and value and isinstance(value, str):
# Remove dashes if present
trace_id_str = str(value).replace("-", "").lower()
if len(trace_id_str) == 32: # Valid trace_id length
trace_id = int(trace_id_str, 16)
elif key.endswith("response_id") and value and isinstance(value, str):
elif key == "previous_response_id" and value and isinstance(value, str):
previous_response_id = value
elif key == "response_id" and value and isinstance(value, str):
response_id = value
elif key.endswith("conversation_id") and value and isinstance(value, str):
elif key == "conversation_id" and value and isinstance(value, str):
conversation_id = value
elif key.endswith("previous_response_id") and value and isinstance(value, str):
previous_response_id = value
elif key.endswith("agent_name") and value and isinstance(value, str):
agent_name = value
elif key.endswith("agent_version") and value and isinstance(value, str):
agent_version = value
elif key.endswith("agent_id") and value and isinstance(value, str):
elif key == "agent_id" and value and isinstance(value, str):
agent_id = value
elif key == "agent_version" and value and isinstance(value, str):
agent_version = value
elif key == "agent_name" and value and isinstance(value, str):
agent_name = value

# Log each event as a separate log record
for i, event_data in enumerate(events):
try:
# Add standard event attributes
log_attributes["microsoft.custom_event.name"] = EVALUATION_EVENT_NAME
log_attributes["gen_ai.evaluation.name"] = event_data.get("metric")
log_attributes["gen_ai.evaluation.score.value"] = event_data.get("score")
log_attributes["gen_ai.evaluation.score.label"] = event_data.get("label")
# Prepare log record attributes with specific mappings
# The standard attributes are already in https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-events.md#event-eventgen_aievaluationresult
metric_name = event_data.get("metric")
standard_log_attributes = {}
standard_log_attributes["microsoft.custom_event.name"] = EVALUATION_EVENT_NAME
standard_log_attributes["gen_ai.evaluation.name"] = metric_name
if event_data.get("score") is not None:
standard_log_attributes["gen_ai.evaluation.score.value"] = event_data.get("score")
if event_data.get("label") is not None:
standard_log_attributes["gen_ai.evaluation.score.label"] = event_data.get("label")

# Internal proposed attributes
# Put it in internal property bag for now, will be expanded if we got sign-off to Otel standard later.
internal_log_attributes = _build_internal_log_attributes(event_data, metric_name, evaluator_config, log_attributes)


# Optional field that may not always be present
if "reason" in event_data:
log_attributes["gen_ai.evaluation.explanation"] = str(event_data["reason"])
standard_log_attributes["gen_ai.evaluation.explanation"] = str(event_data["reason"])

# Handle error from sample if present
# Put the error message in error.type to follow OTel semantic conventions
error = event_data.get("sample", {}).get("error", {}).get("message", None)
if error:
log_attributes["error.type"] = error
standard_log_attributes["error.type"] = error

# Handle redteam attack properties if present
if "properties" in event_data:
properties = event_data["properties"]

if "attack_success" in properties:
log_attributes["gen_ai.redteam.attack.success"] = str(properties["attack_success"])
internal_log_attributes["gen_ai.redteam.attack.success"] = str(properties["attack_success"])

if "attack_technique" in properties:
log_attributes["gen_ai.redteam.attack.technique"] = str(properties["attack_technique"])
internal_log_attributes["gen_ai.redteam.attack.technique"] = str(properties["attack_technique"])

if "attack_complexity" in properties:
log_attributes["gen_ai.redteam.attack.complexity"] = str(properties["attack_complexity"])
internal_log_attributes["gen_ai.redteam.attack.complexity"] = str(properties["attack_complexity"])

if "attack_success_threshold" in properties:
log_attributes["gen_ai.redteam.attack.success_threshold"] = str(
internal_log_attributes["gen_ai.redteam.attack.success_threshold"] = str(
properties["attack_success_threshold"]
)

# Add response_id and conversation_id from data source if present
# Add data source item attributes if present
if response_id:
log_attributes["gen_ai.response.id"] = response_id
standard_log_attributes["gen_ai.response.id"] = response_id
if conversation_id:
log_attributes["gen_ai.conversation.id"] = conversation_id
standard_log_attributes["gen_ai.conversation.id"] = conversation_id
if previous_response_id:
log_attributes["gen_ai.previous.response_id"] = previous_response_id
internal_log_attributes["gen_ai.previous.response.id"] = previous_response_id
if agent_id:
standard_log_attributes["gen_ai.agent.id"] = agent_id
if agent_name:
log_attributes["gen_ai.agent.name"] = agent_name
standard_log_attributes["gen_ai.agent.name"] = agent_name
if agent_version:
log_attributes["gen_ai.agent.version"] = agent_version
if agent_id:
log_attributes["gen_ai.agent.id"] = agent_id
internal_log_attributes["gen_ai.agent.version"] = agent_version

# Combine standard and internal attributes, put internal under the properties bag
standard_log_attributes["internal_properties"] = json.dumps(internal_log_attributes)
# Anonymize IP address to prevent Azure GeoIP enrichment and location tracking
log_attributes["http.client_ip"] = "0.0.0.0"
standard_log_attributes["http.client_ip"] = "0.0.0.0"

# Create context with trace_id if present (for distributed tracing correlation)
ctx = None
Expand All @@ -1175,7 +1242,7 @@ def _log_events_to_app_insights(
timestamp=time.time_ns(),
observed_timestamp=time.time_ns(),
body=EVALUATION_EVENT_NAME,
attributes=log_attributes,
attributes=standard_log_attributes,
context=ctx,
)

Expand All @@ -1186,7 +1253,7 @@ def _log_events_to_app_insights(
LOGGER.error(f"Failed to log events to App Insights: {e}")


def emit_eval_result_events_to_app_insights(app_insights_config: AppInsightsConfig, results: List[Dict]) -> None:
def emit_eval_result_events_to_app_insights(app_insights_config: AppInsightsConfig, results: List[Dict], evaluator_config: Optional[Dict[str, EvaluatorConfig]] = None) -> None:
"""
Emit evaluation result events to App Insights using OpenTelemetry logging.
Each result is logged as an independent log record, potentially including trace context.
Expand Down Expand Up @@ -1256,6 +1323,7 @@ def emit_eval_result_events_to_app_insights(app_insights_config: AppInsightsConf
events=result["results"],
log_attributes=log_attributes,
data_source_item=result["datasource_item"] if "datasource_item" in result else None,
evaluator_config=evaluator_config
)
# Force flush to ensure events are sent
logger_provider.force_flush()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from abc import ABC
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field


@dataclass
class EvaluatorMetric:
type: str = "ordinal"
desirable_direction: Optional[str] = None
min_value: Optional[float] = None
max_value: Optional[float] = None

def to_dict(self) -> Dict[str, Any]:
result = {"type": self.type}
if self.desirable_direction is not None:
result["desirable_direction"] = self.desirable_direction
if self.min_value is not None:
result["min_value"] = self.min_value
if self.max_value is not None:
result["max_value"] = self.max_value
return result

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "EvaluatorMetric":
return cls(
type=data.get("type", "ordinal"),
desirable_direction=data.get("desirable_direction"),
min_value=data.get("min_value"),
max_value=data.get("max_value")
)

@dataclass
class ObjectParameterDescriptorWithRequired:
required: List[str] = field(default_factory=list)
type: str = "object"
properties: Dict[str, Any] = field(default_factory=dict)

def to_dict(self) -> Dict[str, Any]:
return {
"required": self.required,
"type": self.type,
"properties": self.properties
}

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ObjectParameterDescriptorWithRequired":
return cls(
required=data.get("required", []),
type=data.get("type", "object"),
properties=data.get("properties", {})
)

class EvaluatorDefinition(ABC):
"""Base class for evaluator definitions"""

def __init__(self):
self.init_parameters: ObjectParameterDescriptorWithRequired = ObjectParameterDescriptorWithRequired()
self.metrics: Dict[str, EvaluatorMetric] = {}
self.data_schema: ObjectParameterDescriptorWithRequired = ObjectParameterDescriptorWithRequired()
self.type: str = "unknown"

def to_dict(self) -> Dict[str, Any]:
result = {
"type": self.type,
"init_parameters": self.init_parameters.to_dict(),
"metrics": {k: v.to_dict() for k, v in self.metrics.items()},
"data_schema": self.data_schema.to_dict()
}
return result

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "EvaluatorDefinition":
# Create a generic instance since specific subclasses are not defined
instance = cls.__new__(cls)
instance.__init__()

instance.init_parameters = ObjectParameterDescriptorWithRequired.from_dict(
data.get("init_parameters", {})
)
instance.metrics = {
k: EvaluatorMetric.from_dict(v)
for k, v in data.get("metrics", {}).items()
}
instance.data_schema = ObjectParameterDescriptorWithRequired.from_dict(
data.get("data_schema", {})
)
return instance
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from typing import Any, Dict, List, Literal, TypedDict, Union

from typing_extensions import NotRequired
from ._evaluator_definition import EvaluatorDefinition
from typing import Dict, List, Optional, Any


class AzureOpenAIModelConfiguration(TypedDict):
Expand Down Expand Up @@ -105,6 +107,20 @@ class EvaluatorConfig(TypedDict, total=False):
column_mapping: Dict[str, str]
"""Dictionary mapping evaluator input name to column in data"""

evaluator_name: NotRequired[Optional[str]] = None
"""Name of the evaluator from the evaluator asset, currently only used for Otel emission"""

evaluator_version: NotRequired[Optional[str]] = None
"""Version of the evaluator from the evaluator asset, currently only used for Otel emission"""

evaluator_id : NotRequired[Optional[str]] = None
"""ID of the evaluator from the evaluator asset, currently only used for Otel emission"""

evaluator_definition: NotRequired[Optional[EvaluatorDefinition]] = None
"""Definition of the evaluator to be used from the evaluator asset"""
"""Currently only used for Otel emission, will be changed to used in AOAI eval results converter as well in the future."""



class Message(TypedDict):
role: str
Expand Down