diff --git a/src/lightspeed_evaluation/core/metrics/__init__.py b/src/lightspeed_evaluation/core/metrics/__init__.py index 8f670d01..048d977e 100644 --- a/src/lightspeed_evaluation/core/metrics/__init__.py +++ b/src/lightspeed_evaluation/core/metrics/__init__.py @@ -5,4 +5,9 @@ from lightspeed_evaluation.core.metrics.ragas import RagasMetrics from lightspeed_evaluation.core.metrics.script import ScriptEvalMetrics -__all__ = ["RagasMetrics", "DeepEvalMetrics", "CustomMetrics", "ScriptEvalMetrics"] +__all__ = [ + "RagasMetrics", + "DeepEvalMetrics", + "CustomMetrics", + "ScriptEvalMetrics", +] diff --git a/src/lightspeed_evaluation/core/metrics/deepeval.py b/src/lightspeed_evaluation/core/metrics/deepeval.py index e8eccc2a..0f316233 100644 --- a/src/lightspeed_evaluation/core/metrics/deepeval.py +++ b/src/lightspeed_evaluation/core/metrics/deepeval.py @@ -1,5 +1,11 @@ -"""DeepEval metrics evaluation using LLM Manager.""" +"""DeepEval metrics evaluation using LLM Manager. +This module provides integration with DeepEval metrics including: +1. Standard DeepEval metrics (conversation completeness, relevancy, knowledge retention) +2. GEval integration for configurable custom evaluation criteria +""" + +import logging from typing import Any, Optional import litellm @@ -16,28 +22,45 @@ from lightspeed_evaluation.core.llm.deepeval import DeepEvalLLMManager from lightspeed_evaluation.core.llm.manager import LLMManager from lightspeed_evaluation.core.models import EvaluationScope, TurnData +from lightspeed_evaluation.core.metrics.geval import GEvalHandler + +logger = logging.getLogger(__name__) class DeepEvalMetrics: # pylint: disable=too-few-public-methods - """Handles DeepEval metrics evaluation using LLM Manager.""" + """Handles DeepEval metrics evaluation using LLM Manager. + + This class provides a unified interface for both standard DeepEval metrics + and GEval (configurable custom metrics). It shares LLM resources between + both evaluation types for efficiency. + """ - def __init__(self, llm_manager: LLMManager): + def __init__(self, llm_manager: LLMManager, registry_path: str | None = None): """Initialize with LLM Manager. Args: llm_manager: Pre-configured LLMManager with validated parameters + registry_path: Optional path to GEval metrics registry YAML """ + # Setup cache if enabled (shared across all DeepEval operations) if llm_manager.get_config().cache_enabled and litellm.cache is None: cache_dir = llm_manager.get_config().cache_dir # Modifying global litellm cache as there is no clear way how to do it per model # Checking if the litellm.cache as there is potential conflict with Ragas code litellm.cache = Cache(type=LiteLLMCacheType.DISK, disk_cache_dir=cache_dir) - # Create LLM Manager for DeepEval metrics + # Create shared LLM Manager for all DeepEval metrics (standard + GEval) self.llm_manager = DeepEvalLLMManager( llm_manager.get_model_name(), llm_manager.get_llm_params() ) + # Initialize GEval handler with shared LLM manager + self.geval_handler = GEvalHandler( + deepeval_llm_manager=self.llm_manager, + registry_path=registry_path, + ) + + # Standard DeepEval metrics routing self.supported_metrics = { "conversation_completeness": self._evaluate_conversation_completeness, "conversation_relevancy": self._evaluate_conversation_relevancy, @@ -72,16 +95,42 @@ def evaluate( conv_data: Any, scope: EvaluationScope, ) -> tuple[Optional[float], str]: - """Evaluate a DeepEval metric.""" - if metric_name not in self.supported_metrics: - return None, f"Unsupported DeepEval metric: {metric_name}" - - try: - return self.supported_metrics[metric_name]( - conv_data, scope.turn_idx, scope.turn_data, scope.is_conversation - ) - except (ValueError, AttributeError, KeyError) as e: - return None, f"DeepEval {metric_name} evaluation failed: {str(e)}" + """Evaluate a DeepEval metric (standard or GEval). + + This method routes evaluation to either: + - Standard DeepEval metrics (hardcoded implementations) + - GEval metrics (configuration-driven custom metrics) + + Args: + metric_name: Name of metric (for GEval, this should NOT include "geval:" prefix) + conv_data: Conversation data object + scope: EvaluationScope containing turn info and conversation flag + + Returns: + Tuple of (score, reason) + """ + # Route to standard DeepEval metrics + if metric_name in self.supported_metrics: + try: + return self.supported_metrics[metric_name]( + conv_data, scope.turn_idx, scope.turn_data, scope.is_conversation + ) + except (ValueError, AttributeError, KeyError) as e: + return None, f"DeepEval {metric_name} evaluation failed: {str(e)}" + + # Otherwise, assume it's a GEval metric + normalized_metric_name = ( + metric_name.split(":", 1)[1] + if metric_name.startswith("geval:") + else metric_name + ) + return self.geval_handler.evaluate( + metric_name=normalized_metric_name, + conv_data=conv_data, + _turn_idx=scope.turn_idx, + turn_data=scope.turn_data, + is_conversation=scope.is_conversation, + ) def _evaluate_conversation_completeness( self, diff --git a/src/lightspeed_evaluation/core/metrics/geval.py b/src/lightspeed_evaluation/core/metrics/geval.py new file mode 100644 index 00000000..2ca53395 --- /dev/null +++ b/src/lightspeed_evaluation/core/metrics/geval.py @@ -0,0 +1,528 @@ +"""GEval metrics handler using LLM Manager. + +This module provides integration with DeepEval's GEval for configurable custom evaluation criteria. +GEval allows runtime-defined evaluation metrics through YAML configuration. +""" + +import logging +from pathlib import Path +from typing import Any, Optional + +import yaml +from deepeval.metrics import GEval +from deepeval.test_case import LLMTestCase, LLMTestCaseParams + +from lightspeed_evaluation.core.llm.deepeval import DeepEvalLLMManager + +logger = logging.getLogger(__name__) + + +class GEvalHandler: # pylint: disable=R0903 + """Handler for configurable GEval metrics. + + This class integrates with the lightspeed-evaluation framework + to provide GEval evaluation with criteria defined either in: + 1. A centralized metric registry (config/registry/geval_metrics.yaml) + 2. Runtime YAML configuration (turn_metrics_metadata) + + Priority: Runtime metadata overrides registry definitions. + """ + + # Class-level registry cache (shared across instances) + _registry: dict[str, Any] | None = None + _registry_path: Path | None = None + + def __init__( + self, + deepeval_llm_manager: DeepEvalLLMManager, + registry_path: str | None = None, + ) -> None: + """Initialize GEval handler. + + Args: + deepeval_llm_manager: Shared DeepEvalLLMManager instance + registry_path: Optional path to metric registry YAML. + If not provided, looks for config/registry/geval_metrics.yaml + relative to project root. + """ + self.deepeval_llm_manager = deepeval_llm_manager + self._load_registry(registry_path) + + def _load_registry(self, registry_path: str | None = None) -> None: + """Load the GEval metric registry from a YAML configuration file. + + This method initializes the class-level `_registry`. + It supports both user-specified and auto-discovered paths, searching common + locations relative to the current working directory and the package root. + + If no valid registry file is found, it logs a warning and initializes an + empty registry (meaning GEval will rely solely on runtime metadata). + + Args: + registry_path (str | None): Optional explicit path to a registry YAML file. + + Behavior: + - If the registry has already been loaded, the function returns immediately. + - If `registry_path` is provided, it is used directly. + - Otherwise, common fallback paths are checked for existence. + - If a registry is found, it is parsed with `yaml.safe_load`. + - Any exceptions during file access or parsing are logged, and an empty + registry is used as a fallback. + """ + # Only load once per class + if GEvalHandler._registry is not None: + return + + # Ensure variables are always bound for static analysis - + path: Optional[Path] = None + possible_paths: list[Path] = [] + + # Normalize user-specified path vs. auto-discovery + if registry_path is not None: + try: + path = Path(registry_path) + except TypeError: + # Bad type passed in; treat as no path provided + path = None + if path is not None: + possible_paths = [path] + else: + package_root = Path(__file__).resolve().parents[3] + possible_paths = [ + Path.cwd() / "config" / "registry" / "geval_metrics.yaml", + package_root / "config" / "registry" / "geval_metrics.yaml", + ] + + # If no explicit file exists yet, search candidates + if path is None or not path.exists(): + for candidate in possible_paths: + if candidate.exists(): + path = candidate + break + + # Handle missing or invalid registry + if path is None or not path.exists(): + GEvalHandler._registry = {} + GEvalHandler._registry_path = None + logger.warning( + "GEval metric registry not found at expected locations. Tried: %s. " + "Will fall back to runtime metadata only.", + [str(p) for p in possible_paths], + ) + return + + # Load registry file + try: + with path.open(encoding="utf-8") as f: + loaded = yaml.safe_load(f) or {} + # Guard against non-dict YAML (e.g., list/null) + if not isinstance(loaded, dict): + logger.warning( + "GEval registry file %s did not contain a mapping; using empty registry.", + path, + ) + loaded = {} + GEvalHandler._registry = loaded + GEvalHandler._registry_path = path + logger.info("Loaded %d GEval metrics from %s", len(loaded), path) + except Exception as e: # noqa: BLE001 # pylint: disable=broad-exception-caught + logger.error("Failed to load GEval registry from %s: %s", path, e) + GEvalHandler._registry = {} + GEvalHandler._registry_path = None + + def evaluate( # pylint: disable=R0913,R0917 + self, + metric_name: str, + conv_data: Any, + _turn_idx: int | None, + turn_data: Any | None, + is_conversation: bool, + ) -> tuple[float | None, str]: + """Evaluate using GEval with runtime configuration. + + This method is the central entry point for running GEval evaluations. + It retrieves the appropriate metric configuration (from registry or runtime + metadata), extracts evaluation parameters, and delegates the actual scoring + to either conversation-level or turn-level evaluators. + + Args: + metric_name (str): + The name of the metric to evaluate (e.g., "technical_accuracy"). + conv_data (Any): + The conversation data object containing context, messages, and + associated metadata. + turn_idx (int | None): + The index of the current turn in the conversation. + (Currently unused but kept for interface compatibility.) + turn_data (Any | None): + The turn-level data object, required when evaluating turn-level metrics. + is_conversation (bool): + Indicates whether the evaluation should run on the entire + conversation (`True`) or on an individual turn (`False`). + + Returns: + tuple[float | None, str]: + A tuple containing: + - **score** (float | None): The computed metric score, or None if evaluation failed. + - **reason** (str): A descriptive reason or error message. + + Behavior: + 1. Fetch GEval configuration from metadata using `_get_geval_config()`. + 2. Validate that required fields (e.g., "criteria") are present. + 3. Extract key parameters such as criteria, evaluation steps, and threshold. + 4. Delegate to `_evaluate_conversation()` or `_evaluate_turn()` depending + on the `is_conversation` flag. + """ + # Extract GEval configuration from metadata + # May come from runtime metadata or a preloaded registry + geval_config = self._get_geval_config( + metric_name, conv_data, turn_data, is_conversation + ) + + # If no configuration is available, return early with an informative message. + if not geval_config: + return None, f"GEval configuration not found for metric '{metric_name}'" + + # Extract configuration parameters + criteria = geval_config.get("criteria") + evaluation_params = geval_config.get("evaluation_params", []) + evaluation_steps = geval_config.get("evaluation_steps") + threshold = geval_config.get("threshold", 0.5) + + # The criteria field defines what the model is being judged on. + # Without it, we cannot perform evaluation. Evaluation steps can be generated + if not criteria: + return None, "GEval requires 'criteria' in configuration" + + # Perform evaluation based on level (turn or conversation) + if is_conversation: + return self._evaluate_conversation( + conv_data, criteria, evaluation_params, evaluation_steps, threshold + ) + return self._evaluate_turn( + turn_data, criteria, evaluation_params, evaluation_steps, threshold + ) + + def _convert_evaluation_params( + self, params: list[str] + ) -> list[LLMTestCaseParams] | None: + """Convert a list of string parameter names into `LLMTestCaseParams` enum values. + + This helper ensures that the evaluation parameters passed into GEval are properly + typed as `LLMTestCaseParams` (used by DeepEval's test-case schema). If any + parameter is not a valid enum member, the function treats the entire parameter + list as "custom" and returns `None`. This allows GEval to automatically infer + the required fields at runtime rather than forcing strict schema compliance. + + Args: + params (list[str]): + A list of string identifiers (e.g., ["input", "actual_output"]). + These typically come from a YAML or runtime configuration and + may not always match enum names exactly. + + Returns: + List of LLMTestCaseParams enum values, or None if params are custom strings + """ + # Return early if no parameters were supplied + if not params: + return None + + # Try to convert strings to enum values + converted: list[LLMTestCaseParams] = [] + + # Attempt to convert each string into a valid enum value + for param in params: + try: + # Try to match as enum value (e.g., "INPUT", "ACTUAL_OUTPUT") + enum_value = LLMTestCaseParams[param.upper().replace(" ", "_")] + converted.append(enum_value) + except (KeyError, AttributeError): + # Not a valid enum - these are custom params, skip them + logger.debug( + "Skipping custom evaluation_param '%s' - " + "not a valid LLMTestCaseParams enum. " + "GEval will auto-detect required fields.", + param, + ) + return None + + # Return the successfully converted list, or None if it ended up empty + return converted if converted else None + + def _evaluate_turn( # pylint: disable=R0913,R0917 + self, + turn_data: Any, + criteria: str, + evaluation_params: list[str], + evaluation_steps: list[str] | None, + threshold: float, + ) -> tuple[float | None, str]: + """Evaluate a single turn using GEval. + + Args: + turn_data (Any): + The turn-level data object containing fields like query, response, + expected_response, and context. + criteria (str): + Natural-language description of what the evaluation should judge. + Example: "Assess factual correctness and command validity." + evaluation_params (list[str]): + A list of string parameters defining which fields to include + (e.g., ["input", "actual_output"]). + evaluation_steps (list[str] | None): + Optional step-by-step evaluation guidance for the model. + threshold (float): + Minimum score threshold that determines pass/fail behavior. + + Returns: + tuple[float | None, str]: + A tuple of (score, reason). If evaluation fails, score will be None + and the reason will contain an error message. + """ + # Validate that we actually have turn data + if not turn_data: + return None, "Turn data required for turn-level GEval" + + # Convert evaluation_params to enum values if valid, otherwise use defaults + converted_params = self._convert_evaluation_params(evaluation_params) + + # Create GEval metric with runtime configuration + metric_kwargs: dict[str, Any] = { + "name": "GEval Turn Metric", + "criteria": criteria, + "evaluation_params": converted_params, + "model": self.deepeval_llm_manager.get_llm(), + "threshold": threshold, + "top_logprobs": 5, + } + + # Only set evaluation_params if we have valid enum conversions + # or if no params were provided at all (then use defaults) + if converted_params is None: + if not evaluation_params: + metric_kwargs["evaluation_params"] = [ + LLMTestCaseParams.INPUT, + LLMTestCaseParams.ACTUAL_OUTPUT, + ] + # else: leave unset so GEval can auto-detect from custom strings + else: + metric_kwargs["evaluation_params"] = converted_params + + # Add evaluation steps if provided + if evaluation_steps: + metric_kwargs["evaluation_steps"] = evaluation_steps + + # Instantiate the GEval metric object + metric = GEval(**metric_kwargs) + + # Prepare test case arguments, only including non-None optional fields + test_case_kwargs = { + "input": turn_data.query, + "actual_output": turn_data.response or "", + } + + # Add optional fields only if they have values + if turn_data.expected_response: + test_case_kwargs["expected_output"] = turn_data.expected_response + + if turn_data.contexts: + test_case_kwargs["context"] = turn_data.contexts + + # Create test case for a single turn + test_case = LLMTestCase(**test_case_kwargs) + + # Evaluate + try: + metric.measure(test_case) + score = metric.score if metric.score is not None else 0.0 + reason = ( + str(metric.reason) + if hasattr(metric, "reason") and metric.reason + else "No reason provided" + ) + return score, reason + except Exception as e: # pylint: disable=W0718 + logger.error( + "GEval turn-level evaluation failed: %s: %s", type(e).__name__, str(e) + ) + logger.debug( + "Test case input: %s...", + test_case.input[:100] if test_case.input else "None", + ) + logger.debug( + "Test case output: %s...", + test_case.actual_output[:100] if test_case.actual_output else "None", + ) + return None, f"GEval evaluation error: {str(e)}" + + def _evaluate_conversation( # pylint: disable=R0913,R0917,R0914 + self, + conv_data: Any, + criteria: str, + evaluation_params: list[str], + evaluation_steps: list[str] | None, + threshold: float, + ) -> tuple[float | None, str]: + """Evaluate a conversation using GEval. + + This method aggregates all conversation turns into a single LLMTestCase + and evaluates the conversation against the provided criteria. + + Args: + conv_data (Any): + Conversation data object containing all turns. + criteria (str): + Description of the overall evaluation goal. + evaluation_params (list[str]): + List of field names to include (same semantics as turn-level). + evaluation_steps (list[str] | None): + Optional instructions guiding how the evaluation should proceed. + threshold (float): + Minimum acceptable score before the metric is considered failed. + + Returns: + tuple[float | None, str]: + Tuple containing (score, reason). Returns None on error. + """ + # Convert evaluation_params to enum values if valid, otherwise use defaults + converted_params = self._convert_evaluation_params(evaluation_params) + + # Configure the GEval metric for conversation-level evaluation + metric_kwargs: dict[str, Any] = { + "name": "GEval Conversation Metric", + "criteria": criteria, + "evaluation_params": converted_params, + "model": self.deepeval_llm_manager.get_llm(), + "threshold": threshold, + "top_logprobs": 5, # Vertex/Gemini throws an error if over 20. + } + + if converted_params is None: + if not evaluation_params: + metric_kwargs["evaluation_params"] = [ + LLMTestCaseParams.INPUT, + LLMTestCaseParams.ACTUAL_OUTPUT, + ] + else: + metric_kwargs["evaluation_params"] = converted_params + + # Add evaluation steps if provided + if evaluation_steps: + metric_kwargs["evaluation_steps"] = evaluation_steps + + # Instantiate the GEval metric object + metric = GEval(**metric_kwargs) + + # GEval only accepts LLMTestCase, not ConversationalTestCase + # Aggregate conversation turns into a single test case + conversation_input = [] + conversation_output = [] + + for i, turn in enumerate(conv_data.turns, 1): + conversation_input.append(f"Turn {i} - User: {turn.query}") + conversation_output.append(f"Turn {i} - Assistant: {turn.response or ''}") + + # Create aggregated test case for conversation evaluation + test_case = LLMTestCase( + input="\n".join(conversation_input), + actual_output="\n".join(conversation_output), + ) + + # Evaluate + try: + metric.measure(test_case) + score = metric.score if metric.score is not None else 0.0 + reason = ( + str(metric.reason) + if hasattr(metric, "reason") and metric.reason + else "No reason provided" + ) + return score, reason + except Exception as e: # pylint: disable=W0718 + logger.error( + "GEval conversation-level evaluation failed: %s: %s", + type(e).__name__, + str(e), + ) + logger.debug("Conversation turns: %d", len(conv_data.turns)) + logger.debug( + "Test case input preview: %s...", + test_case.input[:200] if test_case.input else "None", + ) + return None, f"GEval evaluation error: {str(e)}" + + def _get_geval_config( + self, + metric_name: str, + conv_data: Any, + turn_data: Any | None, + is_conversation: bool, + ) -> dict[str, Any] | None: + """Extract GEval configuration from metadata or registry. + + The method checks multiple sources in priority order: + 1. Turn-level metadata (runtime override) + 2. Conversation-level metadata (runtime override) + 3. Metric registry (shared, persistent YAML definitions) + + Args: + metric_name (str): + Name of the metric to retrieve (e.g., "completeness"). + conv_data (Any): + The full conversation data object, which may contain + conversation-level metadata. + turn_data (Any | None): + Optional turn-level data object, for per-turn metrics. + is_conversation (bool): + True if evaluating a conversation-level metric, False for turn-level. + + Returns: + dict[str, Any] | None: + The GEval configuration dictionary if found, otherwise None. + """ + metric_key = f"geval:{metric_name}" + + # Turn level metadata override + # Used when individual turns define custom GEval settings + if ( + not is_conversation + and turn_data + and hasattr(turn_data, "turn_metrics_metadata") + and turn_data.turn_metrics_metadata + and metric_key in turn_data.turn_metrics_metadata + ): + logger.debug("Using runtime metadata for metric '%s'", metric_name) + return turn_data.turn_metrics_metadata[metric_key] + + # Conversation-level metadata override + # Used when the conversation defines shared GEval settings + if ( + hasattr(conv_data, "conversation_metrics_metadata") + and conv_data.conversation_metrics_metadata + and metric_key in conv_data.conversation_metrics_metadata + ): + logger.debug("Using runtime metadata for metric '%s'", metric_name) + return conv_data.conversation_metrics_metadata[metric_key] + + # Registry definition + # Fallback to shared YAML registry if no runtime metadata is found + if ( + GEvalHandler._registry + and metric_name in GEvalHandler._registry # pylint: disable=E1135 + ): # pylint: disable=E1135 + logger.debug("Using registry definition for metric '%s'", metric_name) + return GEvalHandler._registry[metric_name] # pylint: disable=E1136 + + # Config not found anywhere + available_metrics = ( + list(GEvalHandler._registry.keys()) # pylint: disable=E1136 + if GEvalHandler._registry + else [] + ) + logger.warning( + "Metric '%s' not found in runtime metadata or registry. " + "Available registry metrics: %s", + metric_name, + available_metrics, + ) + return None