diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py index 1da491668d86..3a939b2e258a 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py @@ -1046,18 +1046,73 @@ def _evaluate( # pylint: disable=too-many-locals,too-many-statements result, LOGGER, eval_id, eval_run_id, evaluators_and_graders, eval_run_summary_dict, eval_meta_data ) if app_insights_configuration := kwargs.get("_app_insights_configuration"): - emit_eval_result_events_to_app_insights(app_insights_configuration, result["_evaluation_results_list"]) + emit_eval_result_events_to_app_insights(app_insights_configuration, result["_evaluation_results_list"], evaluator_config) if output_path: _write_output(output_path, result) return result +def _build_internal_log_attributes( + event_data: Dict[str, Any], + metric_name: str, + evaluator_config: Optional[Dict[str, EvaluatorConfig]], + internal_log_attributes: Dict[str, str] +) -> Dict[str, str]: + """ + Build internal log attributes for OpenTelemetry logging. + + :param event_data: The event data containing threshold and name information + :type event_data: Dict[str, Any] + :param metric_name: The name of the metric being evaluated + :type metric_name: str + :param evaluator_config: Configuration for evaluators + :type evaluator_config: Optional[Dict[str, EvaluatorConfig]] + :return: Dictionary of internal log attributes + :rtype: Dict[str, str] + """ + # Add threshold if present + if event_data.get("threshold"): + internal_log_attributes["gen_ai.evaluation.threshold"] = str(event_data["threshold"]) + + # Add testing criteria details if present + testing_criteria_name = event_data.get("name") + if testing_criteria_name: + internal_log_attributes["gen_ai.evaluation.testing_criteria.name"] = testing_criteria_name + + # Get evaluator definition details + if evaluator_config and testing_criteria_name in evaluator_config: + testing_criteria_config = evaluator_config[testing_criteria_name] + + if "evaluator_name" in testing_criteria_config and testing_criteria_config["evaluator_name"]: + internal_log_attributes["gen_ai.evaluator.name"] = testing_criteria_config["evaluator_name"] + + if "evaluator_version" in testing_criteria_config and testing_criteria_config["evaluator_version"] is not None: + internal_log_attributes["gen_ai.evaluator.version"] = str(testing_criteria_config["evaluator_version"]) + + if "evaluator_id" in testing_criteria_config and testing_criteria_config["evaluator_id"] is not None: + internal_log_attributes["gen_ai.evaluator.id"] = str(testing_criteria_config["evaluator_id"]) + + if ("evaluator_definition" in testing_criteria_config and + testing_criteria_config["evaluator_definition"] and + metric_name in testing_criteria_config["evaluator_definition"]["metrics"]): + metric_config_detail = testing_criteria_config["evaluator_definition"]["metrics"][metric_name] + + if metric_config_detail: + if metric_config_detail.get("min_value") is not None: + internal_log_attributes["gen_ai.evaluation.min_value"] = str(metric_config_detail["min_value"]) + if metric_config_detail.get("max_value") is not None: + internal_log_attributes["gen_ai.evaluation.max_value"] = str(metric_config_detail["max_value"]) + + return internal_log_attributes + + def _log_events_to_app_insights( otel_logger, events: List[Dict[str, Any]], log_attributes: Dict[str, Any], data_source_item: Optional[Dict[str, Any]] = None, + evaluator_config: Optional[Dict[str, EvaluatorConfig]] = None ) -> None: """ Log independent events directly to App Insights using OpenTelemetry logging. @@ -1082,9 +1137,9 @@ def _log_events_to_app_insights( response_id = None conversation_id = None previous_response_id = None - agent_name = None - agent_version = None agent_id = None + agent_version = None + agent_name = None if data_source_item: for key, value in data_source_item.items(): if key.endswith("trace_id") and value and isinstance(value, str): @@ -1092,72 +1147,84 @@ def _log_events_to_app_insights( trace_id_str = str(value).replace("-", "").lower() if len(trace_id_str) == 32: # Valid trace_id length trace_id = int(trace_id_str, 16) - elif key.endswith("response_id") and value and isinstance(value, str): + elif key == "previous_response_id" and value and isinstance(value, str): + previous_response_id = value + elif key == "response_id" and value and isinstance(value, str): response_id = value - elif key.endswith("conversation_id") and value and isinstance(value, str): + elif key == "conversation_id" and value and isinstance(value, str): conversation_id = value - elif key.endswith("previous_response_id") and value and isinstance(value, str): - previous_response_id = value - elif key.endswith("agent_name") and value and isinstance(value, str): - agent_name = value - elif key.endswith("agent_version") and value and isinstance(value, str): - agent_version = value - elif key.endswith("agent_id") and value and isinstance(value, str): + elif key == "agent_id" and value and isinstance(value, str): agent_id = value + elif key == "agent_version" and value and isinstance(value, str): + agent_version = value + elif key == "agent_name" and value and isinstance(value, str): + agent_name = value # Log each event as a separate log record for i, event_data in enumerate(events): try: - # Add standard event attributes - log_attributes["microsoft.custom_event.name"] = EVALUATION_EVENT_NAME - log_attributes["gen_ai.evaluation.name"] = event_data.get("metric") - log_attributes["gen_ai.evaluation.score.value"] = event_data.get("score") - log_attributes["gen_ai.evaluation.score.label"] = event_data.get("label") + # Prepare log record attributes with specific mappings + # The standard attributes are already in https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-events.md#event-eventgen_aievaluationresult + metric_name = event_data.get("metric") + standard_log_attributes = {} + standard_log_attributes["microsoft.custom_event.name"] = EVALUATION_EVENT_NAME + standard_log_attributes["gen_ai.evaluation.name"] = metric_name + if event_data.get("score") is not None: + standard_log_attributes["gen_ai.evaluation.score.value"] = event_data.get("score") + if event_data.get("label") is not None: + standard_log_attributes["gen_ai.evaluation.score.label"] = event_data.get("label") + + # Internal proposed attributes + # Put it in internal property bag for now, will be expanded if we got sign-off to Otel standard later. + internal_log_attributes = _build_internal_log_attributes(event_data, metric_name, evaluator_config, log_attributes) + # Optional field that may not always be present if "reason" in event_data: - log_attributes["gen_ai.evaluation.explanation"] = str(event_data["reason"]) + standard_log_attributes["gen_ai.evaluation.explanation"] = str(event_data["reason"]) # Handle error from sample if present # Put the error message in error.type to follow OTel semantic conventions error = event_data.get("sample", {}).get("error", {}).get("message", None) if error: - log_attributes["error.type"] = error + standard_log_attributes["error.type"] = error # Handle redteam attack properties if present if "properties" in event_data: properties = event_data["properties"] if "attack_success" in properties: - log_attributes["gen_ai.redteam.attack.success"] = str(properties["attack_success"]) + internal_log_attributes["gen_ai.redteam.attack.success"] = str(properties["attack_success"]) if "attack_technique" in properties: - log_attributes["gen_ai.redteam.attack.technique"] = str(properties["attack_technique"]) + internal_log_attributes["gen_ai.redteam.attack.technique"] = str(properties["attack_technique"]) if "attack_complexity" in properties: - log_attributes["gen_ai.redteam.attack.complexity"] = str(properties["attack_complexity"]) + internal_log_attributes["gen_ai.redteam.attack.complexity"] = str(properties["attack_complexity"]) if "attack_success_threshold" in properties: - log_attributes["gen_ai.redteam.attack.success_threshold"] = str( + internal_log_attributes["gen_ai.redteam.attack.success_threshold"] = str( properties["attack_success_threshold"] ) - # Add response_id and conversation_id from data source if present + # Add data source item attributes if present if response_id: - log_attributes["gen_ai.response.id"] = response_id + standard_log_attributes["gen_ai.response.id"] = response_id if conversation_id: - log_attributes["gen_ai.conversation.id"] = conversation_id + standard_log_attributes["gen_ai.conversation.id"] = conversation_id if previous_response_id: - log_attributes["gen_ai.previous.response_id"] = previous_response_id + internal_log_attributes["gen_ai.previous.response.id"] = previous_response_id + if agent_id: + standard_log_attributes["gen_ai.agent.id"] = agent_id if agent_name: - log_attributes["gen_ai.agent.name"] = agent_name + standard_log_attributes["gen_ai.agent.name"] = agent_name if agent_version: - log_attributes["gen_ai.agent.version"] = agent_version - if agent_id: - log_attributes["gen_ai.agent.id"] = agent_id + internal_log_attributes["gen_ai.agent.version"] = agent_version + # Combine standard and internal attributes, put internal under the properties bag + standard_log_attributes["internal_properties"] = json.dumps(internal_log_attributes) # Anonymize IP address to prevent Azure GeoIP enrichment and location tracking - log_attributes["http.client_ip"] = "0.0.0.0" + standard_log_attributes["http.client_ip"] = "0.0.0.0" # Create context with trace_id if present (for distributed tracing correlation) ctx = None @@ -1175,7 +1242,7 @@ def _log_events_to_app_insights( timestamp=time.time_ns(), observed_timestamp=time.time_ns(), body=EVALUATION_EVENT_NAME, - attributes=log_attributes, + attributes=standard_log_attributes, context=ctx, ) @@ -1186,7 +1253,7 @@ def _log_events_to_app_insights( LOGGER.error(f"Failed to log events to App Insights: {e}") -def emit_eval_result_events_to_app_insights(app_insights_config: AppInsightsConfig, results: List[Dict]) -> None: +def emit_eval_result_events_to_app_insights(app_insights_config: AppInsightsConfig, results: List[Dict], evaluator_config: Optional[Dict[str, EvaluatorConfig]] = None) -> None: """ Emit evaluation result events to App Insights using OpenTelemetry logging. Each result is logged as an independent log record, potentially including trace context. @@ -1256,6 +1323,7 @@ def emit_eval_result_events_to_app_insights(app_insights_config: AppInsightsConf events=result["results"], log_attributes=log_attributes, data_source_item=result["datasource_item"] if "datasource_item" in result else None, + evaluator_config=evaluator_config ) # Force flush to ensure events are sent logger_provider.force_flush() diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluator_definition.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluator_definition.py new file mode 100644 index 000000000000..bb6c27ba934b --- /dev/null +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluator_definition.py @@ -0,0 +1,87 @@ +from abc import ABC +from typing import Dict, List, Optional, Any +from dataclasses import dataclass, field + + +@dataclass +class EvaluatorMetric: + type: str = "ordinal" + desirable_direction: Optional[str] = None + min_value: Optional[float] = None + max_value: Optional[float] = None + + def to_dict(self) -> Dict[str, Any]: + result = {"type": self.type} + if self.desirable_direction is not None: + result["desirable_direction"] = self.desirable_direction + if self.min_value is not None: + result["min_value"] = self.min_value + if self.max_value is not None: + result["max_value"] = self.max_value + return result + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "EvaluatorMetric": + return cls( + type=data.get("type", "ordinal"), + desirable_direction=data.get("desirable_direction"), + min_value=data.get("min_value"), + max_value=data.get("max_value") + ) + +@dataclass +class ObjectParameterDescriptorWithRequired: + required: List[str] = field(default_factory=list) + type: str = "object" + properties: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "required": self.required, + "type": self.type, + "properties": self.properties + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "ObjectParameterDescriptorWithRequired": + return cls( + required=data.get("required", []), + type=data.get("type", "object"), + properties=data.get("properties", {}) + ) + +class EvaluatorDefinition(ABC): + """Base class for evaluator definitions""" + + def __init__(self): + self.init_parameters: ObjectParameterDescriptorWithRequired = ObjectParameterDescriptorWithRequired() + self.metrics: Dict[str, EvaluatorMetric] = {} + self.data_schema: ObjectParameterDescriptorWithRequired = ObjectParameterDescriptorWithRequired() + self.type: str = "unknown" + + def to_dict(self) -> Dict[str, Any]: + result = { + "type": self.type, + "init_parameters": self.init_parameters.to_dict(), + "metrics": {k: v.to_dict() for k, v in self.metrics.items()}, + "data_schema": self.data_schema.to_dict() + } + return result + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "EvaluatorDefinition": + # Create a generic instance since specific subclasses are not defined + instance = cls.__new__(cls) + instance.__init__() + + instance.init_parameters = ObjectParameterDescriptorWithRequired.from_dict( + data.get("init_parameters", {}) + ) + instance.metrics = { + k: EvaluatorMetric.from_dict(v) + for k, v in data.get("metrics", {}).items() + } + instance.data_schema = ObjectParameterDescriptorWithRequired.from_dict( + data.get("data_schema", {}) + ) + return instance \ No newline at end of file diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_model_configurations.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_model_configurations.py index 057c1140d9b0..40fb4fc594ab 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_model_configurations.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_model_configurations.py @@ -5,6 +5,8 @@ from typing import Any, Dict, List, Literal, TypedDict, Union from typing_extensions import NotRequired +from ._evaluator_definition import EvaluatorDefinition +from typing import Dict, List, Optional, Any class AzureOpenAIModelConfiguration(TypedDict): @@ -105,6 +107,20 @@ class EvaluatorConfig(TypedDict, total=False): column_mapping: Dict[str, str] """Dictionary mapping evaluator input name to column in data""" + evaluator_name: NotRequired[Optional[str]] = None + """Name of the evaluator from the evaluator asset, currently only used for Otel emission""" + + evaluator_version: NotRequired[Optional[str]] = None + """Version of the evaluator from the evaluator asset, currently only used for Otel emission""" + + evaluator_id : NotRequired[Optional[str]] = None + """ID of the evaluator from the evaluator asset, currently only used for Otel emission""" + + evaluator_definition: NotRequired[Optional[EvaluatorDefinition]] = None + """Definition of the evaluator to be used from the evaluator asset""" + """Currently only used for Otel emission, will be changed to used in AOAI eval results converter as well in the future.""" + + class Message(TypedDict): role: str