diff --git a/aiperf/common/config/user_config.py b/aiperf/common/config/user_config.py index 091625e96..eb5b82b26 100644 --- a/aiperf/common/config/user_config.py +++ b/aiperf/common/config/user_config.py @@ -227,9 +227,13 @@ def _count_dataset_entries(self) -> int: @model_validator(mode="after") def _compute_config(self) -> Self: - """Compute additional configuration. - - This method is automatically called after the model is validated to compute additional configuration. + """ + Compute derived configuration fields and populate any missing artifact directory. + + If `output.artifact_directory` was not set by the user, computes and assigns it from `_compute_artifact_directory()`. + + Returns: + self: The same UserConfig instance with computed fields applied. """ if "artifact_directory" not in self.output.model_fields_set: diff --git a/aiperf/common/enums/metric_enums.py b/aiperf/common/enums/metric_enums.py index 6be3e60d2..efe327417 100644 --- a/aiperf/common/enums/metric_enums.py +++ b/aiperf/common/enums/metric_enums.py @@ -205,7 +205,18 @@ class PowerMetricUnitInfo(BaseMetricUnitInfo): watts: float def convert_to(self, other_unit: "MetricUnitT", value: int | float) -> float: - """Convert a value from this unit to another unit.""" + """ + Convert a power quantity from this unit to another unit. + + If the target unit is not a power unit, defers to the base unit conversion behavior. + + Parameters: + other_unit (MetricUnitT): Target unit to convert to. + value (int | float): Power value expressed in this unit. + + Returns: + float: The converted power value expressed in the target unit. + """ if not isinstance(other_unit, PowerMetricUnit | PowerMetricUnitInfo): return super().convert_to(other_unit, value) @@ -228,17 +239,32 @@ class PowerMetricUnit(BaseMetricUnit): @cached_property def info(self) -> PowerMetricUnitInfo: - """Get the info for the power unit.""" + """ + Return the metadata object associated with this power unit. + + Returns: + PowerMetricUnitInfo: Unit metadata containing attributes such as `long_name` and `watts`. + """ return self._info # type: ignore @cached_property def watts(self) -> float: - """The number of watts in the power unit.""" + """ + Power of this unit expressed in watts. + + Returns: + float: Number of watts represented by this power unit. + """ return self.info.watts @cached_property def long_name(self) -> str: - """The long name of the power unit.""" + """ + Human-readable long name for the power unit. + + Returns: + long_name (str): The long descriptive name of the unit. + """ return self.info.long_name @@ -249,7 +275,16 @@ class EnergyMetricUnitInfo(BaseMetricUnitInfo): joules: float def convert_to(self, other_unit: "MetricUnitT", value: int | float) -> float: - """Convert a value from this unit to another unit.""" + """ + Convert a numeric energy value from this unit into the specified target energy unit. + + Parameters: + other_unit (MetricUnitT): Target unit; expected to be an EnergyMetricUnit or EnergyMetricUnitInfo. + value (int | float): Energy value in this unit to convert. + + Returns: + float: The input `value` expressed in `other_unit`. + """ if not isinstance(other_unit, EnergyMetricUnit | EnergyMetricUnitInfo): return super().convert_to(other_unit, value) @@ -277,7 +312,12 @@ class EnergyMetricUnit(BaseMetricUnit): @cached_property def info(self) -> EnergyMetricUnitInfo: - """Get the info for the energy unit.""" + """ + Get the EnergyMetricUnitInfo associated with this energy unit. + + Returns: + EnergyMetricUnitInfo: The info object for this energy unit. + """ return self._info # type: ignore @cached_property @@ -287,7 +327,12 @@ def joules(self) -> float: @cached_property def long_name(self) -> str: - """The long name of the energy unit.""" + """ + Human-readable long name of the energy unit. + + Returns: + long_name (str): The long, human-readable name of the energy unit. + """ return self.info.long_name @@ -463,7 +508,15 @@ def dtype(self) -> Any: @classmethod def from_python_type(cls, type: type[MetricValueTypeT]) -> "MetricValueType": - """Get the MetricValueType for a given type.""" + """ + Map a Python type to the corresponding MetricValueType. + + Parameters: + type (type[MetricValueTypeT]): The Python type to map (e.g., float, int, list[int], or the type variable `MetricValueTypeVarT`). + + Returns: + MetricValueType: The MetricValueType that corresponds to the provided Python type. If `MetricValueTypeVarT` is provided, returns the float-backed MetricValueType. + """ # If the type is a simple type like float or int, we have to use __name__. # This is because using str() on float or int will return or , etc. type_name = type.__name__ @@ -482,7 +535,18 @@ class FrequencyMetricUnitInfo(BaseMetricUnitInfo): hertz: float def convert_to(self, other_unit: "MetricUnitT", value: int | float) -> float: - """Convert a value from this unit to another unit.""" + """ + Convert a numeric value from this frequency unit to another frequency unit. + + If `other_unit` is not a frequency unit, delegation is performed to the base implementation. + + Parameters: + other_unit (MetricUnitT): Target unit (typically a FrequencyMetricUnit or FrequencyMetricUnitInfo). + value (int | float): Numeric value in this unit to convert. + + Returns: + float: The input value expressed in `other_unit`. + """ if not isinstance(other_unit, FrequencyMetricUnit | FrequencyMetricUnitInfo): return super().convert_to(other_unit, value) @@ -510,7 +574,12 @@ class FrequencyMetricUnit(BaseMetricUnit): @cached_property def info(self) -> FrequencyMetricUnitInfo: - """Get the info for the frequency unit.""" + """ + Access the FrequencyMetricUnit's associated FrequencyMetricUnitInfo. + + Returns: + FrequencyMetricUnitInfo: The unit's info object containing metadata such as `hertz` and `long_name`. + """ return self._info # type: ignore @cached_property @@ -520,7 +589,12 @@ def hertz(self) -> float: @cached_property def long_name(self) -> str: - """The long name of the frequency unit.""" + """ + Return the long human-readable name of the frequency unit. + + Returns: + long_name (str): The unit's descriptive long name. + """ return self.info.long_name @@ -532,7 +606,16 @@ class TemperatureMetricUnitInfo(BaseMetricUnitInfo): offset: float = 0.0 def convert_to(self, other_unit: "MetricUnitT", value: int | float) -> float: - """Convert a value from this unit to another unit.""" + """ + Convert a temperature value from this temperature unit to another temperature unit. + + Parameters: + other_unit (MetricUnitT): Target unit; must be a TemperatureMetricUnit or TemperatureMetricUnitInfo — otherwise conversion is delegated to the base implementation. + value (int | float): Temperature value expressed in this unit. + + Returns: + float: The temperature converted to the target unit. + """ if not isinstance( other_unit, TemperatureMetricUnit | TemperatureMetricUnitInfo ): @@ -567,22 +650,42 @@ class TemperatureMetricUnit(BaseMetricUnit): @cached_property def info(self) -> TemperatureMetricUnitInfo: - """Get the info for the temperature unit.""" + """ + Return the TemperatureMetricUnitInfo associated with this unit. + + Returns: + info (TemperatureMetricUnitInfo): Metadata and conversion rules for this temperature unit. + """ return self._info # type: ignore @cached_property def celsius(self) -> float: - """The celsius conversion factor.""" + """ + Conversion factor to convert a value in this temperature unit to degrees Celsius. + + Returns: + celsius (float): Multiplier to convert a value from this unit into degrees Celsius. + """ return self.info.celsius @cached_property def offset(self) -> float: - """The offset for temperature conversion.""" + """ + Offset added to temperature values during conversion. + + Returns: + float: The offset value for this temperature unit. + """ return self.info.offset @cached_property def long_name(self) -> str: - """The long name of the temperature unit.""" + """ + Human-readable long name of the temperature unit. + + Returns: + long_name (str): The long-form name of the unit. + """ return self.info.long_name diff --git a/aiperf/common/messages/telemetry_messages.py b/aiperf/common/messages/telemetry_messages.py index 910b536ac..2e2fc7662 100644 --- a/aiperf/common/messages/telemetry_messages.py +++ b/aiperf/common/messages/telemetry_messages.py @@ -31,7 +31,12 @@ class TelemetryRecordsMessage(BaseServiceMessage): @property def valid(self) -> bool: - """Whether the telemetry collection was valid.""" + """ + Indicates whether collected telemetry records are present and no error occurred. + + Returns: + `true` if `error` is None and there is at least one record in `records`, `false` otherwise. + """ return self.error is None and len(self.records) > 0 diff --git a/aiperf/common/models/telemetry_models.py b/aiperf/common/models/telemetry_models.py index e401b32c4..81c126f5b 100644 --- a/aiperf/common/models/telemetry_models.py +++ b/aiperf/common/models/telemetry_models.py @@ -115,11 +115,12 @@ class GpuMetricTimeSeries(AIPerfBaseModel): ) def append_snapshot(self, metrics: dict[str, float], timestamp_ns: int) -> None: - """Add new snapshot with all metrics at once. - - Args: - metrics: Dictionary of metric_name -> value for this timestamp - timestamp_ns: Timestamp when measurements were taken + """ + Append a timestamped snapshot containing the provided metric values to the time series. + + Parameters: + metrics (dict[str, float]): Mapping of metric names to values; entries with `None` values are omitted. + timestamp_ns (int): Timestamp for the snapshot in nanoseconds. """ snapshot = GpuTelemetrySnapshot( timestamp_ns=timestamp_ns, @@ -145,19 +146,21 @@ def get_metric_values(self, metric_name: str) -> list[tuple[float, int]]: def to_metric_result( self, metric_name: str, tag: str, header: str, unit: str ) -> MetricResult: - """Convert metric time series to MetricResult with statistical summary. - - Args: - metric_name: Name of the metric to analyze - tag: Unique identifier for this metric (used by dashboard, exports, API) - header: Human-readable name for display - unit: Unit of measurement (e.g., "W" for Watts, "%" for percentage) - + """ + Create a MetricResult summarizing the time-series values for a given metric. + + Parameters: + metric_name (str): Metric key to extract from the time series. + tag (str): Identifier used by dashboards, exports, and APIs for this metric. + header (str): Human-readable display name for the metric. + unit (str): Measurement unit (for example, "W" for watts or "%" for percent). + Returns: - MetricResult with min/max/avg/percentiles computed from time series - + MetricResult: Aggregated statistics for the metric, including min, max, average, + standard deviation, count, and selected percentiles (1, 5, 25, 50, 75, 90, 95, 99). + Raises: - NoMetricValue: If no data points are available for the specified metric + NoMetricValue: If no data points exist for the specified metric. """ data_points = self.get_metric_values(metric_name) @@ -205,12 +208,14 @@ class GpuTelemetryData(AIPerfBaseModel): ) def add_record(self, record: TelemetryRecord) -> None: - """Add telemetry record as a grouped snapshot. - - Args: - record: New telemetry data point from DCGM collector - - Note: Groups all metric values from the record into a single snapshot + """ + Append a grouped snapshot of present metric values from a TelemetryRecord to the GPU time series. + + Parameters: + record (TelemetryRecord): Telemetry data point from the DCGM collector. + + Description: + Creates a snapshot containing only metrics with non-`None` values from `record` and appends it to the underlying time series if at least one metric is present. """ metric_mapping = { "gpu_power_usage": record.gpu_power_usage, @@ -268,14 +273,11 @@ class TelemetryHierarchy(AIPerfBaseModel): ) def add_record(self, record: TelemetryRecord) -> None: - """Add telemetry record to hierarchical storage. - - Args: - record: New telemetry data from GPU monitoring - - Note: Automatically creates hierarchy levels as needed: - - New DCGM endpoints get empty GPU dict - - New GPUs get initialized with metadata and empty metrics + """ + Store a TelemetryRecord in the hierarchy organized by DCGM endpoint URL and GPU UUID, initializing missing endpoint entries and per-GPU metadata. + + Parameters: + record (TelemetryRecord): Telemetry data point to store; may initialize a new GPU entry with its static metadata if the GPU is not yet present. """ if record.dcgm_url not in self.dcgm_endpoints: diff --git a/aiperf/common/protocols.py b/aiperf/common/protocols.py index ac0b8533b..f7b28d312 100644 --- a/aiperf/common/protocols.py +++ b/aiperf/common/protocols.py @@ -504,7 +504,15 @@ async def process_result( self, result: dict[MetricTagT, "MetricValueTypeT"] ) -> None: ... - async def summarize(self) -> list["MetricResult"]: ... + async def summarize(self) -> list["MetricResult"]: """ +Produce summarized telemetry metrics as a list of MetricResult objects. + +Aggregate processed telemetry records into hierarchical, tagged MetricResult objects that preserve metadata grouping and support downstream telemetry consumers (for example, dashboard filtering). + +Returns: + list[MetricResult]: Aggregated telemetry metrics where each MetricResult includes hierarchical tags and metadata that maintain grouping context. +""" +... @runtime_checkable @@ -517,19 +525,26 @@ class TelemetryResultsProcessorProtocol(Protocol): """ async def process_telemetry_record(self, record: TelemetryRecord) -> None: - """Process individual telemetry record with rich metadata. - - Args: - record: TelemetryRecord containing GPU metrics and hierarchical metadata + """ + Process a single telemetry record and incorporate its metrics into the processor's internal aggregation. + + Implementations should ingest or aggregate the provided TelemetryRecord — which includes GPU metrics, timestamp, and hierarchical metadata (e.g., service/component tags) — so that those metrics are available for later summarize() calls. + + Parameters: + record (TelemetryRecord): Telemetry entry with GPU metrics and associated hierarchical metadata used for grouping and aggregation. """ ... async def summarize(self) -> list["MetricResult"]: - """Generate MetricResult list with hierarchical tags for telemetry data. - + """ + Summarize processed telemetry into hierarchical MetricResult objects. + + Each MetricResult preserves telemetry metadata grouping (for example, dcgm_url then gpu_uuid) + by encoding tags hierarchically to allow downstream dashboards and filters to maintain grouping. + Returns: - List of MetricResult objects with hierarchical tags that preserve - dcgm_url -> gpu_uuid grouping structure for dashboard filtering. + list[MetricResult]: A list of MetricResult objects whose tags encode hierarchical + telemetry grouping (e.g., `dcgm_url -> gpu_uuid`). """ ... diff --git a/aiperf/controller/system_controller.py b/aiperf/controller/system_controller.py index 2497d03d0..70d93fedc 100644 --- a/aiperf/controller/system_controller.py +++ b/aiperf/controller/system_controller.py @@ -80,6 +80,21 @@ def __init__( service_config: ServiceConfig, service_id: str | None = None, ) -> None: + """ + Initialize the SystemController and configure its managers, UI, required service layout, and internal state used for profiling, telemetry coordination, and shutdown. + + Parameters: + user_config (UserConfig): Runtime user-provided configuration for the controller and services. + service_config (ServiceConfig): Service-level configuration that controls service run types, counts, UI type, and telemetry behavior. + service_id (str | None): Optional explicit service identifier; if omitted a default is used. + + Notes: + - Sets up required_services mapping (counts per ServiceType) and scaling behavior for record processors. + - Creates and stores ProxyManager, ServiceManagerProtocol, and AIPerfUIProtocol instances and attaches the UI as a child lifecycle. + - Initializes internal tracking fields used across the controller lifecycle: + - _was_cancelled, _stop_tasks, _profile_results, _telemetry_results, _profile_results_received, + _should_wait_for_telemetry, _shutdown_triggered, _endpoints_tested, _endpoints_reachable, _exit_errors. + """ super().__init__( service_config=service_config, user_config=user_config, @@ -136,7 +151,9 @@ def __init__( self.debug("System Controller created") async def request_realtime_metrics(self) -> None: - """Request real-time metrics from the RecordsManager.""" + """ + Request real-time metrics from the RecordsManager and wait for its command response. + """ await self.send_command_and_wait_for_response( RealtimeMetricsCommand( service_id=self.service_id, @@ -167,12 +184,10 @@ async def _initialize_system_controller(self) -> None: @on_start async def _start_services(self) -> None: - """Bootstrap the system services. - - This method will: - - Initialize all required services - - Wait for all required services to be registered - - Start all required services + """ + Bootstrap and start the system services, configure them for profiling, and begin profiling. + + Starts the Service Manager, launches optional services required for configuration (for example TelemetryManager), waits for all services to register, issues profile configuration to registered services, and then starts profiling across the system. """ self.debug("System Controller is bootstrapping services") @@ -196,10 +211,10 @@ async def _start_services(self) -> None: self.info("AIPerf System is PROFILING") async def _profile_configure_all_services(self) -> None: - """Configure all services to start profiling. - - This is a blocking call that will wait for all registered services to be configured before returning. - Timeout ensures we don't wait forever for optional services that may shut down. + """ + Send a PROFILE_CONFIGURE command to all registered services and wait for their responses. + + Sends profiling configuration (from the controller's user_config) to every known service, waits up to DEFAULT_PROFILE_CONFIGURE_TIMEOUT for replies, records any service-side errors, and raises a lifecycle error if any services report failures. Logs the total time taken to configure all services. """ self.info("Configuring all services to start profiling") begin = time.perf_counter() @@ -330,13 +345,13 @@ async def _process_credits_complete_message( @on_message(MessageType.STATUS) async def _process_status_message(self, message: StatusMessage) -> None: - """Process a generic service lifecycle status message. - - Updates the service registry with lifecycle state changes (initializing, - running, stopping, etc.). - - Args: - message: The status message to process + """ + Update a registered service's lifecycle state based on an incoming StatusMessage. + + If the message refers to an unknown service, it is ignored; otherwise the service's stored state is set to the message's state. + + Parameters: + message (StatusMessage): Message containing `service_id`, `service_type`, and the lifecycle `state` to apply. """ service_id = message.service_id service_type = message.service_type @@ -365,9 +380,13 @@ async def _process_status_message(self, message: StatusMessage) -> None: async def _on_telemetry_status_message( self, message: TelemetryStatusMessage ) -> None: - """Handle telemetry status from TelemetryManager. - - TelemetryStatusMessage informs SystemController if telemetry results will be available. + """ + Update the controller's telemetry endpoint state and whether shutdown should wait for telemetry results. + + Parameters: + message (TelemetryStatusMessage): Message containing `endpoints_tested` (list of endpoint identifiers tested), + `endpoints_reachable` (list of endpoints confirmed reachable), and `enabled` (True if telemetry results + should be awaited before shutdown). """ self._endpoints_tested = message.endpoints_tested @@ -376,7 +395,12 @@ async def _on_telemetry_status_message( @on_message(MessageType.COMMAND_RESPONSE) async def _process_command_response_message(self, message: CommandResponse) -> None: - """Process a command response message.""" + """ + Handle an incoming command response message and log its result. + + Logs a debug message for success, acknowledged, and unhandled statuses. For failure + responses, logs an error containing the command, service id, and error details. + """ self.debug(lambda: f"Received command response message: {message}") if message.status == CommandResponseStatus.SUCCESS: self.debug(f"Command {message.command} succeeded from {message.service_id}") @@ -420,7 +444,14 @@ async def _handle_shutdown_workers_command( async def _on_process_records_result_message( self, message: ProcessRecordsResultMessage ) -> None: - """Handle a profile results message.""" + """ + Process and store profile results received from a records-processing service and initiate shutdown coordination. + + This records the provided ProcessRecordsResultMessage.results into the controller's profile results state, logs any reported errors or an absence of records, marks that profile results have been received, and invokes shutdown coordination to determine whether the controller should stop. + + Parameters: + message (ProcessRecordsResultMessage): Message containing profiling results and any associated errors. + """ self.debug(lambda: f"Received profile results message: {message}") if message.results.errors: self.error( @@ -444,7 +475,14 @@ async def _on_process_records_result_message( async def _on_process_telemetry_result_message( self, message: ProcessTelemetryResultMessage ) -> None: - """Handle a telemetry results message.""" + """ + Handle an incoming ProcessTelemetryResultMessage and store its telemetry results. + + Processes the telemetry result payload, logs any reported errors or missing records, attaches endpoint test metadata, saves the resulting TelemetryResults into the controller state, and triggers shutdown coordination logic as appropriate. + + Parameters: + message (ProcessTelemetryResultMessage): Message containing the processed telemetry results and any associated errors. + """ self.debug(lambda: f"Received telemetry results message: {message}") if message.telemetry_result.errors: @@ -472,12 +510,10 @@ async def _on_process_telemetry_result_message( await self._check_and_trigger_shutdown() async def _check_and_trigger_shutdown(self) -> None: - """Check if all required results are received and trigger unified export + shutdown. - - Coordination logic: - 1. Always wait for profile results (ProcessRecordsResultMessage) - 2. If telemetry disabled OR telemetry results received → proceed with shutdown - 3. Otherwise → wait (telemetry results arrive nearly simultaneously and will call this method again) + """ + Coordinate readiness of profiling and telemetry results and initiate unified export and shutdown when both are ready. + + This method verifies that profile results have been received and that either telemetry is disabled or telemetry results are available. When readiness conditions are met it marks shutdown as triggered and initiates the controller shutdown sequence; otherwise it returns without side effects. """ if self._shutdown_triggered: return @@ -497,10 +533,11 @@ async def _check_and_trigger_shutdown(self) -> None: self.debug("Waiting for telemetry results...") async def _handle_signal(self, sig: int) -> None: - """Handle received signals by triggering graceful shutdown. - - Args: - sig: The signal number received + """ + Handle an OS signal by initiating a graceful shutdown or forcefully killing services if shutdown is already in progress. + + Parameters: + sig (int): OS signal number received. """ if self.stop_requested: # If we are already in a stopping state, we need to kill the process to be safe. @@ -561,7 +598,11 @@ def _print_exit_errors_and_log_file(self) -> None: console.file.flush() async def _print_post_benchmark_info_and_metrics(self) -> None: - """Print post benchmark info and metrics to the console.""" + """ + Display and export post-benchmark metrics and related information to the console. + + If profile results are available, exports full dataset (including telemetry when present) to data files (e.g., CSV/JSON) and a console export, then prints the CLI command used, benchmark duration, exported file paths, and log file information. If the profiling run was cancelled early, prints a warning that results may be incomplete. + """ if not self._profile_results or not self._profile_results.results.records: self.warning("No profile results to export") return diff --git a/aiperf/exporters/console_metrics_exporter.py b/aiperf/exporters/console_metrics_exporter.py index 54c06318f..ccd2384f2 100644 --- a/aiperf/exporters/console_metrics_exporter.py +++ b/aiperf/exporters/console_metrics_exporter.py @@ -32,6 +32,14 @@ def __init__(self, exporter_config: ExporterConfig, **kwargs) -> None: self._endpoint_type = exporter_config.user_config.endpoint.type async def export(self, console: Console) -> None: + """ + Render and print stored metric records to the provided Rich Console. + + If there are no records to export this function does nothing. + + Parameters: + console (Console): Rich Console instance used to render and print the metrics table. + """ if not self._results.records: self.debug("No records to export") return diff --git a/aiperf/exporters/csv_exporter.py b/aiperf/exporters/csv_exporter.py index a2cebcddd..68b8b5a13 100644 --- a/aiperf/exporters/csv_exporter.py +++ b/aiperf/exporters/csv_exporter.py @@ -36,6 +36,17 @@ class CsvExporter(AIPerfLoggerMixin): """Exports records to a CSV file in a legacy, two-section format.""" def __init__(self, exporter_config: ExporterConfig, **kwargs) -> None: + """ + Initialize the CsvExporter with configuration and prepare internal export state. + + Sets exporter results, optional telemetry results, output directory, metric registry, + default CSV file path, and percentile keys used when formatting exported CSV content. + + Parameters: + exporter_config (ExporterConfig): Exporter configuration containing `results` (metrics to export), + `user_config.output.artifact_directory` (destination directory), and optionally + `telemetry_results` (GPU telemetry data). Additional keyword arguments are passed to the superclass. + """ super().__init__(**kwargs) self.debug(lambda: f"Initializing CsvExporter with config: {exporter_config}") self._results = exporter_config.results @@ -54,6 +65,11 @@ def get_export_info(self) -> FileExportInfo: ) async def export(self) -> None: + """ + Write collected metrics and optional telemetry results to the configured CSV file. + + This method ensures the output directory exists, converts stored metric records to display units when present, generates CSV content (including an optional GPU telemetry section), and writes the content to the exporter file path. Any exception raised during generation or file I/O is logged and re-raised. + """ self._output_directory.mkdir(parents=True, exist_ok=True) self.debug(lambda: f"Exporting data to CSV file: {self._file_path}") @@ -79,6 +95,16 @@ async def export(self) -> None: def _generate_csv_content( self, records: Mapping[str, MetricResult], telemetry_results=None ) -> str: + """ + Generate the CSV file content for the provided metric records and optional telemetry results. + + Parameters: + records (Mapping[str, MetricResult]): Mapping of metric tag to MetricResult to include in the CSV. + telemetry_results (optional): Telemetry results object; when provided, a GPU telemetry section is appended to the CSV. + + Returns: + csv_content (str): Complete CSV-formatted content including request metrics, system metrics, and an optional telemetry section. + """ buf = io.StringIO() writer = csv.writer(buf) @@ -101,7 +127,15 @@ def _generate_csv_content( def _split_metrics( self, records: Mapping[str, MetricResult] ) -> tuple[dict[str, MetricResult], dict[str, MetricResult]]: - """Split metrics into request metrics (with percentiles) and system metrics (single values).""" + """ + Partition metric records into request-style metrics that include percentiles and system-style metrics that do not. + + Parameters: + records (Mapping[str, MetricResult]): Mapping of metric tag to MetricResult objects to classify. + + Returns: + tuple[dict[str, MetricResult], dict[str, MetricResult]]: A tuple (request_metrics, system_metrics) where `request_metrics` maps tags to metrics that have percentile values and `system_metrics` maps tags to metrics that do not. + """ request_metrics: dict[str, MetricResult] = {} system_metrics: dict[str, MetricResult] = {} @@ -122,6 +156,15 @@ def _write_request_metrics( writer: csv.writer, records: Mapping[str, MetricResult], ) -> None: + """ + Write the request-style metrics section to the CSV writer using STAT_KEYS as the column headers. + + Writes a header row of "Metric" followed by STAT_KEYS, then emits one row per metric (sorted by metric tag) that passes the exporter filter. Each row starts with the formatted metric name and contains the metric's STAT_KEYS values formatted for display. + + Parameters: + writer (csv.writer): CSV writer to receive header and metric rows. + records (Mapping[str, MetricResult]): Mapping of metric tag to MetricResult for request metrics. + """ header = ["Metric"] + list(STAT_KEYS) writer.writerow(header) @@ -148,6 +191,15 @@ def _write_system_metrics( writer: csv.writer, records: Mapping[str, MetricResult], ) -> None: + """ + Write the system-level metrics section to the CSV output. + + Writes a header row ["Metric", "Value"] and then, for each metric in `records` sorted by tag, writes a row containing the formatted metric name and its formatted average value. Metrics that are not eligible for export are skipped. + + Parameters: + writer (csv.writer): CSV writer used to emit rows. + records (Mapping[str, MetricResult]): Mapping of metric tag to MetricResult objects to export. + """ writer.writerow(["Metric", "Value"]) for _, metric in sorted(records.items(), key=lambda kv: kv[0]): if not self._should_export(metric): @@ -164,7 +216,15 @@ def _format_metric_name(self, metric: MetricResult) -> str: return name def _format_number(self, value) -> str: - """Format a number for CSV output.""" + """ + Convert a value into a CSV-friendly string representation. + + Parameters: + value: The value to format; may be None, bool, Integral, Real, Decimal, or any other type. + + Returns: + A string suitable for CSV output: an empty string for `None`, `"True"`/`"False"` for booleans, an integer string for integral values, a floating representation with two decimal places for real numbers and `Decimal`, and `str(value)` for all other types. + """ if value is None: return "" # Handle bools explicitly (bool is a subclass of int) @@ -180,7 +240,11 @@ def _format_number(self, value) -> str: return str(value) def _write_telemetry_section(self, writer, telemetry_results) -> None: - """Write GPU telemetry data section to CSV with statistical aggregation.""" + """ + Write GPU telemetry sections to the CSV for each telemetry endpoint that contains data. + + For each endpoint in telemetry_results.telemetry_data.dcgm_endpoints, emits a labeled section that lists selected GPU metrics (power, energy, utilization, memory, temperature, SM and memory clocks) when data for that metric exists across any GPU. Each metric section contains a header row and rows per GPU with the following columns: "GPU Index", "GPU Name", "GPU UUID", "Avg", "Min", "Max", "P99", "P90", "P75", "Std". Endpoints with no GPUs or metrics with no data are skipped. Individual GPU entries that fail to produce a metric result are omitted. + """ writer.writerow([]) writer.writerow([]) @@ -257,7 +321,16 @@ def _write_telemetry_section(self, writer, telemetry_results) -> None: writer.writerow([]) def _gpu_has_metric(self, gpu_data, metric_key: str) -> bool: - """Check if GPU has data for the specified metric.""" + """ + Determine whether the given GPU data contains a result for the specified metric key. + + Parameters: + gpu_data: An object exposing get_metric_result(metric_key, ...) used to probe for the metric. + metric_key (str): The metric identifier to check for on the GPU. + + Returns: + bool: `True` if a MetricResult can be retrieved for `metric_key`, `False` otherwise. + """ try: gpu_data.get_metric_result(metric_key, metric_key, "test", "test") return True diff --git a/aiperf/exporters/exporter_manager.py b/aiperf/exporters/exporter_manager.py index 382b0637f..6c6b3ca94 100644 --- a/aiperf/exporters/exporter_manager.py +++ b/aiperf/exporters/exporter_manager.py @@ -26,6 +26,16 @@ def __init__( telemetry_results: TelemetryResults | None = None, **kwargs, ) -> None: + """ + Initialize the ExporterManager with profiling results and configuration used to construct exporter instances. + + Parameters: + results (ProfileResults): Collected profiling results to be exported. + input_config (UserConfig): User-provided configuration that influences exporter behaviour and output. + service_config (ServiceConfig): Service-level configuration required by exporters. + telemetry_results (TelemetryResults | None): Optional telemetry data to include in exporter configuration. + **kwargs: Additional keyword arguments forwarded to the superclass initializer. + """ super().__init__(**kwargs) self._results = results self._input_config = input_config @@ -39,6 +49,13 @@ def __init__( ) def _task_done_callback(self, task: asyncio.Task) -> None: + """ + Handle completion of an asyncio Task created by ExporterManager. + + This callback logs the task result or any exception raised, and removes the task from the manager's internal tracking set. + Parameters: + task (asyncio.Task): The completed task whose outcome will be logged and which will be removed from self._tasks. + """ self.debug(lambda: f"Task done: {task}") if task.exception(): self.error(f"Error exporting records: {task.exception()}") diff --git a/aiperf/exporters/gpu_telemetry_console_exporter.py b/aiperf/exporters/gpu_telemetry_console_exporter.py index d66345309..bcdc4c295 100644 --- a/aiperf/exporters/gpu_telemetry_console_exporter.py +++ b/aiperf/exporters/gpu_telemetry_console_exporter.py @@ -24,6 +24,12 @@ class GPUTelemetryConsoleExporter(AIPerfLoggerMixin): """ def __init__(self, exporter_config: ExporterConfig, **kwargs) -> None: + """ + Initialize the exporter with the given ExporterConfig and capture runtime data needed for rendering telemetry. + + Parameters: + exporter_config (ExporterConfig): Configuration object for the exporter; used to obtain benchmarking results, service configuration, and optional telemetry results. + """ super().__init__(**kwargs) self._results = exporter_config.results self._service_config = exporter_config.service_config @@ -31,7 +37,14 @@ def __init__(self, exporter_config: ExporterConfig, **kwargs) -> None: self._telemetry_results = getattr(exporter_config, "telemetry_results", None) async def export(self, console: Console) -> None: - """Export telemetry data to console if verbose mode is enabled.""" + """ + Export GPU telemetry to the provided console when verbose mode is enabled and telemetry data is available. + + If verbose mode is disabled on the service configuration or no telemetry results are present, the method returns without producing output. When data is available, builds a renderable representation and prints it to the console. + + Parameters: + console (Console): Rich Console instance used to render the telemetry output. + """ if not self._service_config.verbose: return @@ -44,7 +57,16 @@ async def export(self, console: Console) -> None: ) def _print_renderable(self, console: Console, renderable: RenderableType) -> None: - """Print the renderable to the console.""" + """ + Print the given renderable to the provided Rich console and flush the console output. + + Parameters: + console (Console): Rich console to receive the printed renderable. + renderable (RenderableType): The renderable object (table, text, group, etc.) to print. + + Side effects: + Writes a leading newline, prints the renderable to the console, and flushes the console's output buffer. + """ console.print("\n") console.print(renderable) console.file.flush() @@ -52,7 +74,18 @@ def _print_renderable(self, console: Console, renderable: RenderableType) -> Non def get_renderable( self, telemetry_results: TelemetryResults, console: Console ) -> RenderableType: - """Create Rich tables showing GPU telemetry metrics with consolidated single-table format.""" + """ + Build a Rich renderable that summarizes GPU telemetry by DCGM endpoint and GPU. + + Constructs one or more Rich Tables (grouped into a Group) with per-GPU metrics for each reachable DCGM endpoint; if no metric tables can be produced, returns a dim italic Text explaining that no telemetry was collected and listing unreachable endpoints and any error summary. + + Parameters: + telemetry_results (TelemetryResults): Telemetry data and metadata including telemetry_data.dcgm_endpoints, endpoints_tested, endpoints_successful, and optional error_summary. + console (Console): Rich Console used for rendering context (passed through for consistent styling/width). + + Returns: + RenderableType: A Group containing per-GPU Tables when telemetry is available, or a Text object with diagnostic information when no tables can be rendered. + """ renderables = [] diff --git a/aiperf/exporters/json_exporter.py b/aiperf/exporters/json_exporter.py index 493ec32e2..c4d2773a9 100644 --- a/aiperf/exporters/json_exporter.py +++ b/aiperf/exporters/json_exporter.py @@ -41,6 +41,14 @@ class JsonExporter(AIPerfLoggerMixin): """ def __init__(self, exporter_config: ExporterConfig, **kwargs) -> None: + """ + Initialize the JSON exporter and configure its internal state from the given ExporterConfig. + + The initializer stores references to the exporter results and optional telemetry results, captures the user's input configuration and output artifact directory, assigns the metric registry, and computes the default JSON output file path. + + Parameters: + exporter_config (ExporterConfig): Configuration object containing export results, optional telemetry_results, and user_config used to derive output paths and exporter inputs. + """ super().__init__(**kwargs) self.debug(lambda: f"Initializing JsonExporter with config: {exporter_config}") self._results = exporter_config.results @@ -68,6 +76,11 @@ def _should_export(self, metric: MetricResult) -> bool: return res async def export(self) -> None: + """ + Export the collected results and optional telemetry summary to the configured JSON file on disk. + + Builds an export data object containing converted, exportable metric records, run metadata (start/end times, cancellation and error summaries), and, when telemetry results are available, a telemetry summary and per-endpoint statistics; ensures the output directory exists and writes the serialized JSON to the exporter's configured file path. + """ self._output_directory.mkdir(parents=True, exist_ok=True) start_time = ( diff --git a/aiperf/gpu_telemetry/telemetry_data_collector.py b/aiperf/gpu_telemetry/telemetry_data_collector.py index f8c19936e..e07a65731 100644 --- a/aiperf/gpu_telemetry/telemetry_data_collector.py +++ b/aiperf/gpu_telemetry/telemetry_data_collector.py @@ -42,6 +42,16 @@ def __init__( error_callback: Callable[[Exception], None] | None = None, collector_id: str = "telemetry_collector", ) -> None: + """ + Initialize the TelemetryDataCollector with its DCGM endpoint, collection interval, and optional callbacks. + + Parameters: + dcgm_url (str): HTTP URL of the DCGM Prometheus-formatted metrics endpoint to poll. + collection_interval (float): Seconds between periodic collections. + record_callback (Callable[[list[TelemetryRecord]], None] | None): Optional callable (sync or async) invoked with a list of TelemetryRecord objects when metrics are successfully collected. + error_callback (Callable[[Exception], None] | None): Optional callable (sync or async) invoked with an Exception or error details when collection or processing fails. + collector_id (str): Identifier used for the collector instance. + """ self._dcgm_url = dcgm_url self._collection_interval = collection_interval self._record_callback = record_callback @@ -53,16 +63,20 @@ def __init__( @on_init async def _initialize_http_client(self) -> None: - """Initialize the aiohttp client session.""" + """ + Create and store an aiohttp ClientSession configured with the module's URL reachability timeout. + + This initializes the collector's internal HTTP session so it can perform requests to the configured DCGM endpoint. + """ timeout = aiohttp.ClientTimeout(total=URL_REACHABILITY_TIMEOUT) self._session = aiohttp.ClientSession(timeout=timeout) @on_stop async def _cleanup_http_client(self) -> None: - """Clean up the aiohttp client session. - - Race conditions with background tasks are handled by checking - self.stop_requested in the background task itself. + """ + Close and clear the internal aiohttp ClientSession if one exists. + + If a session is present, it is closed and the internal reference is set to None. Background tasks are expected to coordinate using the collector's stop request mechanism to avoid races. """ if self._session: await self._session.close() @@ -96,11 +110,10 @@ async def is_url_reachable(self) -> bool: @background_task(immediate=True, interval=lambda self: self._collection_interval) async def _collect_telemetry_task(self) -> None: - """Background task for collecting telemetry data at regular intervals. - - This uses the @background_task decorator which automatically handles - lifecycle management and stopping when the collector is stopped. - The interval is set to the collection_interval so this runs periodically. + """ + Run one collection-and-processing iteration and handle any errors. + + Attempts to collect and process telemetry; if a CancelledError occurs it is re-raised. On other exceptions, invokes the configured error callback with an ErrorDetails and collector id if provided, logging a warning if the callback itself fails; otherwise logs the error. """ try: await self._collect_and_process_metrics() @@ -116,9 +129,10 @@ async def _collect_telemetry_task(self) -> None: self.error(f"Telemetry collection error: {e}") async def _collect_and_process_metrics(self) -> None: - """Collect metrics from DCGM endpoint and process them into TelemetryRecord objects. - - This method fetches metrics, parses them, and sends them via callback. + """ + Collect metrics from the DCGM endpoint, convert them to TelemetryRecord objects, and deliver them to the configured record callback. + + If one or more records are produced and a record callback is configured, the callback is invoked with (records, self.id). The callback may be a coroutine or a regular callable; any exceptions raised by the callback are logged and not propagated by this method. Exceptions raised while fetching or parsing metrics are logged and re-raised. """ try: metrics_data = await self._fetch_metrics() @@ -138,14 +152,16 @@ async def _collect_and_process_metrics(self) -> None: raise async def _fetch_metrics(self) -> str: - """Fetch raw metrics data from DCGM endpoint using aiohttp. - + """ + Retrieve raw Prometheus-formatted metrics from the configured DCGM endpoint. + Returns: - Raw metrics text in Prometheus format - + Raw metrics text in Prometheus format. + Raises: - aiohttp.ClientError: If HTTP request fails - asyncio.CancelledError: If collector is being stopped + aiohttp.ClientError: If the HTTP request fails or the response status is not successful. + asyncio.CancelledError: If collection is being stopped or the HTTP session is closed during shutdown. + RuntimeError: If the HTTP session has not been initialized. """ if self.stop_requested: raise asyncio.CancelledError("Telemetry collector is being stopped") @@ -162,13 +178,14 @@ async def _fetch_metrics(self) -> str: return text def _parse_metrics_to_records(self, metrics_data: str) -> list[TelemetryRecord]: - """Parse DCGM metrics text into TelemetryRecord objects using prometheus_client. - - Args: - metrics_data: Raw metrics text from DCGM exporter - + """ + Convert Prometheus-formatted DCGM metrics text into TelemetryRecord objects grouped by GPU. + + Parameters: + metrics_data (str): Raw Prometheus-formatted metrics text from the DCGM exporter. + Returns: - List of TelemetryRecord objects, one per GPU with valid data + list[TelemetryRecord]: List of TelemetryRecord objects, one for each GPU that has parsed metrics. """ if not metrics_data.strip(): self.warning("Response from DCGM metrics endpoint is empty") @@ -237,13 +254,14 @@ def _parse_metrics_to_records(self, metrics_data: str) -> list[TelemetryRecord]: return records def _apply_scaling_factors(self, metrics: dict) -> dict: - """Apply scaling factors to convert raw DCGM units to display units. - - Args: - metrics: Dict of metric_name -> raw_value - + """ + Apply configured scaling factors to metric values. + + Parameters: + metrics (dict): Mapping from metric names to raw numeric values. + Returns: - Dict with scaled values for display + dict: Copy of `metrics` where any value for a metric present in the collector's scaling factors has been multiplied by that factor; other entries are unchanged. """ scaled_metrics = metrics.copy() for metric, factor in self._scaling_factors.items(): diff --git a/aiperf/gpu_telemetry/telemetry_manager.py b/aiperf/gpu_telemetry/telemetry_manager.py index f81173b25..80b72f8ff 100644 --- a/aiperf/gpu_telemetry/telemetry_manager.py +++ b/aiperf/gpu_telemetry/telemetry_manager.py @@ -53,6 +53,20 @@ def __init__( user_config: UserConfig, service_id: str | None = None, ) -> None: + """ + Initialize the TelemetryManager, register service metadata, and prepare internal collector state. + + Parameters: + service_config (ServiceConfig): Configuration for the service runtime and framework integration. + user_config (UserConfig): User-provided configuration; `server_metrics_url` is used to derive DCGM endpoints. + service_id (str | None): Optional explicit service identifier; if omitted the base service will assign one. + + Side effects: + - Calls the base service initializer. + - Creates an internal mapping for telemetry collectors. + - Builds the DCGM endpoints list, ensuring the default endpoint is present at the front. + - Sets the telemetry collection interval to the module default. + """ super().__init__( service_config=service_config, user_config=user_config, @@ -72,14 +86,22 @@ def __init__( @on_init async def _initialize(self) -> None: - """Initialize telemetry manager.""" + """ + Perform any required initialization for the telemetry manager. + + This coroutine is a placeholder that currently performs no actions but is reserved for asynchronous initialization tasks needed by the service. + """ pass @on_command(CommandType.PROFILE_CONFIGURE) async def _profile_configure_command( self, message: ProfileConfigureCommand ) -> None: - """Configure the telemetry collectors but don't start them yet.""" + """ + Configure telemetry collectors for the configured DCGM endpoints without starting them. + + Creates a TelemetryDataCollector for each configured endpoint, tests reachability, registers only reachable collectors, and publishes a TelemetryStatusMessage that lists tested and reachable endpoints. If no endpoints are reachable, schedules the service to stop. This method does not start any collectors. + """ reachable_count = 0 for _i, dcgm_url in enumerate(self._dcgm_endpoints): @@ -122,7 +144,14 @@ async def _profile_configure_command( @on_command(CommandType.PROFILE_START) async def _on_start_profiling(self, message) -> None: - """Start all telemetry collectors.""" + """ + Start configured telemetry collectors that are reachable. + + Attempts to start each configured collector after verifying its endpoint is reachable. Logs an error for any collector that fails to start and logs a warning if no collectors were started. + + Parameters: + message: The profile start message that triggered this action (not used by this handler). + """ if not self._collectors: return @@ -144,21 +173,39 @@ async def _on_start_profiling(self, message) -> None: async def _handle_profile_cancel_command( self, message: ProfileCancelCommand ) -> None: - """Stop all telemetry collectors when profiling is cancelled.""" + """ + Stop all active telemetry collectors in response to a profiling cancellation. + + Parameters: + message (ProfileCancelCommand): The incoming cancel command (unused); triggers stopping all collectors. + """ await self._stop_all_collectors() @on_command(CommandType.SHUTDOWN) async def _handle_shutdown_command(self, message) -> None: - """Handle shutdown command from SystemController.""" + """ + Stop all telemetry collectors in response to a shutdown command. + + Parameters: + message: The shutdown command message received from the SystemController (unused). + """ await self._stop_all_collectors() @on_stop async def _telemetry_manager_stop(self) -> None: - """Stop all telemetry collectors during service shutdown.""" + """ + Stop all managed telemetry collectors and wait for each to finish shutting down. + + This is invoked during service shutdown to ensure every TelemetryDataCollector is cleanly stopped. + """ await self._stop_all_collectors() async def _stop_all_collectors(self) -> None: - """Stop all telemetry collectors.""" + """ + Stop all managed telemetry collectors and await their shutdown. + + If no collectors are configured this returns immediately. Failures to stop individual collectors are logged and suppressed; the method does not raise for per-collector stop errors. + """ if not self._collectors: return @@ -172,9 +219,13 @@ async def _stop_all_collectors(self) -> None: async def _on_telemetry_records( self, records: list[TelemetryRecord], collector_id: str ) -> None: - """Async callback for receiving telemetry records from collectors. - - Sends TelemetryRecordsMessage to RecordsManager via message system. + """ + Handle telemetry records emitted by a collector and publish them as a TelemetryRecordsMessage. + + If `records` is empty the call is ignored. Otherwise constructs a TelemetryRecordsMessage containing this service's `service_id`, the provided `collector_id`, the `records`, and `error=None`, then publishes it. Any exceptions raised while publishing are logged. + Parameters: + records (list[TelemetryRecord]): Telemetry records produced by a collector. + collector_id (str): Identifier of the collector that produced the records. """ if not records: @@ -194,9 +245,14 @@ async def _on_telemetry_records( self.error(f"Failed to send telemetry records: {e}") async def _on_telemetry_error(self, error: ErrorDetails, collector_id: str) -> None: - """Async callback for receiving telemetry errors from collectors. - - Sends error TelemetryRecordsMessage to RecordsManager via message system. + """ + Handle a telemetry error reported by a collector and publish it as a TelemetryRecordsMessage. + + Constructs a TelemetryRecordsMessage containing an empty records list and the provided error, then publishes it to the messaging system. + + Parameters: + error (ErrorDetails): Details of the telemetry error to forward. + collector_id (str): Identifier of the collector that reported the error. """ try: @@ -219,7 +275,15 @@ async def _send_telemetry_status( endpoints_tested: list[str] | None = None, endpoints_reachable: list[str] | None = None, ) -> None: - """Send telemetry status message directly to SystemController.""" + """ + Publish a TelemetryStatusMessage describing telemetry availability and tested endpoints to the SystemController. + + Parameters: + enabled (bool): Whether telemetry collection is currently enabled. + reason (str | None): Optional human-readable explanation when telemetry is disabled or its status is noteworthy. + endpoints_tested (list[str] | None): List of DCGM endpoint URLs that were tested for reachability (defaults to an empty list). + endpoints_reachable (list[str] | None): List of DCGM endpoint URLs that were determined reachable (defaults to an empty list). + """ try: status_message = TelemetryStatusMessage( service_id=self.service_id, diff --git a/aiperf/metrics/base_metric.py b/aiperf/metrics/base_metric.py index 37c9ced34..80926d8f1 100644 --- a/aiperf/metrics/base_metric.py +++ b/aiperf/metrics/base_metric.py @@ -91,8 +91,11 @@ def __init_subclass__(cls, **kwargs): @classmethod def _verify_base_class(cls) -> None: - """Verify that the class is a subclass of BaseRecordMetric, BaseAggregateMetric, or BaseDerivedMetric. - This is done to ensure that the class is a valid metric type. + """ + Ensure the class inherits from one of the concrete metric base classes. + + Raises: + TypeError: If `cls` is not a subclass of BaseRecordMetric, BaseAggregateMetric, BaseDerivedMetric, or BaseTelemetryMetric. """ # Note: this is valid because the below imports are abstract, so they will not get here from aiperf.metrics import ( diff --git a/aiperf/metrics/base_telemetry_metric.py b/aiperf/metrics/base_telemetry_metric.py index c3f867c05..12db69656 100644 --- a/aiperf/metrics/base_telemetry_metric.py +++ b/aiperf/metrics/base_telemetry_metric.py @@ -31,7 +31,17 @@ def _extract_value(self, record: TelemetryRecord) -> float | None: def process_telemetry_batch( self, telemetry_records: list[TelemetryRecord] ) -> dict[int, list[MetricValueTypeVarT]]: - """Process batch of telemetry records, returning values grouped by GPU index.""" + """ + Group extracted metric values from telemetry records by GPU index. + + Processes the provided telemetry records, extracts a metric value from each record via _extract_value, and collects non-missing values into lists keyed by the record's GPU index. Records for which no value can be extracted are ignored. + + Parameters: + telemetry_records (list[TelemetryRecord]): Telemetry records to process. + + Returns: + dict[int, list[MetricValueTypeVarT]]: Mapping from GPU index to a list of extracted metric values. + """ gpu_values = {} for record in telemetry_records: value = self._extract_value(record) diff --git a/aiperf/metrics/gpu_telemetry_types/energy_consumption_metric.py b/aiperf/metrics/gpu_telemetry_types/energy_consumption_metric.py index 20afd978d..135c2644e 100644 --- a/aiperf/metrics/gpu_telemetry_types/energy_consumption_metric.py +++ b/aiperf/metrics/gpu_telemetry_types/energy_consumption_metric.py @@ -15,4 +15,13 @@ class EnergyConsumptionMetric(BaseTelemetryMetric[float]): flags = MetricFlags.GPU_TELEMETRY def _extract_value(self, record: TelemetryRecord) -> float | None: + """ + Extracts the energy consumption value from a telemetry record. + + Parameters: + record (TelemetryRecord): Telemetry record to read energy consumption from. + + Returns: + (float | None): Energy consumption in millijoules, or None if not available. + """ return record.energy_consumption diff --git a/aiperf/metrics/gpu_telemetry_types/gpu_memory_used_metric.py b/aiperf/metrics/gpu_telemetry_types/gpu_memory_used_metric.py index b0d45facf..8db87cad1 100644 --- a/aiperf/metrics/gpu_telemetry_types/gpu_memory_used_metric.py +++ b/aiperf/metrics/gpu_telemetry_types/gpu_memory_used_metric.py @@ -15,4 +15,13 @@ class GpuMemoryUsedMetric(BaseTelemetryMetric[float]): flags = MetricFlags.GPU_TELEMETRY def _extract_value(self, record: TelemetryRecord) -> float | None: + """ + Extracts the GPU memory used value from a telemetry record. + + Parameters: + record (TelemetryRecord): Telemetry record containing GPU metrics. + + Returns: + float | None: GPU memory used in gigabytes, or `None` if the value is not present. + """ return record.gpu_memory_used diff --git a/aiperf/metrics/gpu_telemetry_types/gpu_power_usage_metric.py b/aiperf/metrics/gpu_telemetry_types/gpu_power_usage_metric.py index 3205ed10f..83ad1dff4 100644 --- a/aiperf/metrics/gpu_telemetry_types/gpu_power_usage_metric.py +++ b/aiperf/metrics/gpu_telemetry_types/gpu_power_usage_metric.py @@ -15,4 +15,13 @@ class GpuPowerUsageMetric(BaseTelemetryMetric[float]): flags = MetricFlags.GPU_TELEMETRY def _extract_value(self, record: TelemetryRecord) -> float | None: + """ + Extracts GPU power usage from a TelemetryRecord. + + Parameters: + record (TelemetryRecord): Telemetry record containing GPU telemetry fields. + + Returns: + float | None: GPU power usage in watts, or None if the value is not present. + """ return record.gpu_power_usage diff --git a/aiperf/metrics/gpu_telemetry_types/gpu_temperature_metric.py b/aiperf/metrics/gpu_telemetry_types/gpu_temperature_metric.py index 39c0695f9..6b802f89e 100644 --- a/aiperf/metrics/gpu_telemetry_types/gpu_temperature_metric.py +++ b/aiperf/metrics/gpu_telemetry_types/gpu_temperature_metric.py @@ -20,4 +20,13 @@ class GpuTemperatureMetric(BaseTelemetryMetric[float]): flags = MetricFlags.GPU_TELEMETRY def _extract_value(self, record: TelemetryRecord) -> float | None: + """ + Extracts the GPU temperature from a TelemetryRecord. + + Parameters: + record (TelemetryRecord): Telemetry record containing GPU telemetry fields. + + Returns: + float | None: The GPU temperature in degrees Celsius, or `None` if the value is unavailable. + """ return record.gpu_temperature diff --git a/aiperf/metrics/gpu_telemetry_types/gpu_utilization_metric.py b/aiperf/metrics/gpu_telemetry_types/gpu_utilization_metric.py index fedd77884..94fc3338d 100644 --- a/aiperf/metrics/gpu_telemetry_types/gpu_utilization_metric.py +++ b/aiperf/metrics/gpu_telemetry_types/gpu_utilization_metric.py @@ -15,4 +15,13 @@ class GpuUtilizationMetric(BaseTelemetryMetric[float]): flags = MetricFlags.GPU_TELEMETRY def _extract_value(self, record: TelemetryRecord) -> float | None: + """ + Extracts the GPU utilization percentage from a telemetry record. + + Parameters: + record (TelemetryRecord): Telemetry record containing GPU telemetry fields. + + Returns: + float | None: GPU utilization percentage from the record, or `None` if not available. + """ return record.gpu_utilization diff --git a/aiperf/metrics/gpu_telemetry_types/memory_clock_frequency_metric.py b/aiperf/metrics/gpu_telemetry_types/memory_clock_frequency_metric.py index 488561d10..ab43b89a3 100644 --- a/aiperf/metrics/gpu_telemetry_types/memory_clock_frequency_metric.py +++ b/aiperf/metrics/gpu_telemetry_types/memory_clock_frequency_metric.py @@ -20,4 +20,13 @@ class MemoryClockFrequencyMetric(BaseTelemetryMetric[float]): flags = MetricFlags.GPU_TELEMETRY def _extract_value(self, record: TelemetryRecord) -> float | None: + """ + Extract the memory clock frequency value from a telemetry record. + + Parameters: + record (TelemetryRecord): Telemetry record containing GPU metrics. + + Returns: + float | None: Memory clock frequency in megahertz, or `None` if the value is unavailable. + """ return record.memory_clock_frequency diff --git a/aiperf/metrics/gpu_telemetry_types/memory_temperature_metric.py b/aiperf/metrics/gpu_telemetry_types/memory_temperature_metric.py index 1c5868c46..942b56633 100644 --- a/aiperf/metrics/gpu_telemetry_types/memory_temperature_metric.py +++ b/aiperf/metrics/gpu_telemetry_types/memory_temperature_metric.py @@ -20,4 +20,13 @@ class MemoryTemperatureMetric(BaseTelemetryMetric[float]): flags = MetricFlags.GPU_TELEMETRY def _extract_value(self, record: TelemetryRecord) -> float | None: + """ + Retrieve the memory temperature from a TelemetryRecord. + + Parameters: + record (TelemetryRecord): Telemetry record to extract the memory temperature from. + + Returns: + float | None: Memory temperature in degrees Celsius, or `None` if not present. + """ return record.memory_temperature diff --git a/aiperf/metrics/gpu_telemetry_types/sm_clock_frequency_metric.py b/aiperf/metrics/gpu_telemetry_types/sm_clock_frequency_metric.py index de0560a2d..924b8a333 100644 --- a/aiperf/metrics/gpu_telemetry_types/sm_clock_frequency_metric.py +++ b/aiperf/metrics/gpu_telemetry_types/sm_clock_frequency_metric.py @@ -20,4 +20,13 @@ class SmClockFrequencyMetric(BaseTelemetryMetric[float]): flags = MetricFlags.GPU_TELEMETRY def _extract_value(self, record: TelemetryRecord) -> float | None: + """ + Return the SM (streaming multiprocessor) clock frequency from a telemetry record in megahertz. + + Parameters: + record (TelemetryRecord): Telemetry record containing GPU telemetry fields. + + Returns: + float | None: SM clock frequency in MHz if present, otherwise None. + """ return record.sm_clock_frequency diff --git a/aiperf/post_processors/telemetry_results_processor.py b/aiperf/post_processors/telemetry_results_processor.py index 7a311bd91..c527c2413 100644 --- a/aiperf/post_processors/telemetry_results_processor.py +++ b/aiperf/post_processors/telemetry_results_processor.py @@ -21,6 +21,16 @@ class TelemetryResultsProcessor(BaseMetricsProcessor): """Process individual TelemetryRecord objects into hierarchical storage.""" def __init__(self, user_config: UserConfig, **kwargs: Any): + """ + Initialize the TelemetryResultsProcessor, preparing internal storage and metric unit mappings. + + Parameters: + user_config (UserConfig): Configuration for the current user/session used by the processor; passed to the base class initializer. + + Detailed behavior: + - Creates an empty TelemetryHierarchy to store incoming telemetry records in a hierarchical structure. + - Initializes a mapping of telemetry metric names to their display units (e.g., "gpu_power_usage" -> "W", "energy_consumption" -> "MJ"). + """ super().__init__(user_config=user_config, **kwargs) self._telemetry_hierarchy = TelemetryHierarchy() @@ -37,10 +47,11 @@ def __init__(self, user_config: UserConfig, **kwargs: Any): } async def process_telemetry_record(self, record: TelemetryRecord) -> None: - """Process individual telemetry record into hierarchical storage. - - Args: - record: TelemetryRecord containing GPU metrics and hierarchical metadata + """ + Add a telemetry record to the processor's hierarchical storage. + + Parameters: + record (TelemetryRecord): Telemetry record containing GPU metrics and hierarchical metadata (for example `dcgm_url`, `gpu_uuid`, and metric samples). """ if self.is_trace_enabled: @@ -51,16 +62,13 @@ async def process_telemetry_record(self, record: TelemetryRecord) -> None: self._telemetry_hierarchy.add_record(record) async def summarize(self) -> list[MetricResult]: - """Generate MetricResult list for real-time display and final export. - - This method is called by RecordsManager for: - 1. Final results generation when profiling completes - 2. [AIP-355] TODO: @ilana-n [FUTURE] real-time dashboard updates (every DEFAULT_REALTIME_METRICS_INTERVAL) - when user-set flag is enabled - + """ + Produce MetricResult objects for each tracked metric of every GPU for display and export. + + Constructs per-metric tags and headers (including a sanitized DCGM endpoint identifier and GPU index/UUID prefix) and retrieves a MetricResult from the stored telemetry for each combination. + Returns: - List of MetricResult objects, one per GPU per metric type. - Tags follow hierarchical naming pattern for dashboard filtering. + list[MetricResult]: MetricResult objects, one per GPU per tracked metric. Tags are formatted for dashboard filtering using the sanitized DCGM URL and GPU identifiers. """ results = [] diff --git a/aiperf/records/records_manager.py b/aiperf/records/records_manager.py index b8279cfd3..45a0676c4 100644 --- a/aiperf/records/records_manager.py +++ b/aiperf/records/records_manager.py @@ -84,6 +84,11 @@ def __init__( user_config: UserConfig, service_id: str | None = None, ) -> None: + """ + Initialize the RecordsManager service and configure its internal processing and telemetry state. + + Sets up the base service (pull client bound to RECORDS) and initializes synchronization primitives, processing timers and counters, per-worker and global processing statistics, error-summary stores, telemetry hierarchy and telemetry-specific error tracking, and result processor instances (partitioned into metric and telemetry processor lists). + """ super().__init__( service_config=service_config, user_config=user_config, @@ -149,7 +154,14 @@ def __init__( @on_pull_message(MessageType.METRIC_RECORDS) async def _on_metric_records(self, message: MetricRecordsMessage) -> None: - """Handle a metric records message.""" + """ + Process incoming metric records for the profiling phase, forward applicable results to metric processors, and update processing statistics and error summaries. + + Only handles messages from the PROFILING credit phase; messages from other phases are ignored. Determines whether the results fall within the expected profiling duration and, if so and the message is marked valid, forwards results to metric result processors. Updates per-worker and global processed or error counters accordingly; if the message contains an error, increments the aggregated error summary. Always triggers a completion check after handling the message. + + Parameters: + message (MetricRecordsMessage): The incoming metric records message containing worker id, validity, results, credit phase, and optional error information. + """ if self.is_trace_enabled: self.trace(f"Received metric records: {message}") @@ -199,12 +211,13 @@ async def _on_metric_records(self, message: MetricRecordsMessage) -> None: @on_message(MessageType.TELEMETRY_RECORDS) async def _on_telemetry_records(self, message: TelemetryRecordsMessage) -> None: - """Handle telemetry records message from Telemetry Manager. - The RecordsManager acts as the central hub for all record processing, - whether inference metrics or GPU telemetry. - - Args: - message: Batch of telemetry records from a DCGM collector + """ + Handle an incoming TelemetryRecordsMessage by dispatching valid telemetry records to telemetry processors and updating internal telemetry state. + + If the message is valid, forwards its records to telemetry result processors and adds each record to the telemetry hierarchy under the telemetry lock. If the message is invalid and contains an error, increments the telemetry error counter for that error under the error-counts lock. + + Parameters: + message (TelemetryRecordsMessage): Batch of telemetry records from the Telemetry Manager. """ if message.valid: @@ -223,13 +236,14 @@ async def _on_telemetry_records(self, message: TelemetryRecordsMessage) -> None: def _should_include_request_by_duration( self, results: list[dict[MetricTagT, MetricValueTypeT]] ) -> bool: - """Determine if the request should be included based on benchmark duration. - - Args: - results: List of metric results for a single request - + """ + Decide whether a request's metric results fall within the configured benchmark duration (including the configured grace period). + + Parameters: + results (list[dict[MetricTagT, MetricValueTypeT]]): Metric results for a single request where entries may include `MinRequestTimestampMetric.tag` and `RequestLatencyMetric.tag`. + Returns: - True if the request should be included, else False + True if every result's final response timestamp is at or before the benchmark end time (start_time_ns + expected_duration_sec + grace period), `False` if any response was received after that end time. """ if not self.expected_duration_sec: return True @@ -310,7 +324,12 @@ async def _check_if_all_records_received(self) -> None: async def _send_results_to_results_processors( self, results: list[dict[MetricTagT, MetricValueTypeT]] ) -> None: - """Send the results to inference metric results processors only.""" + """ + Dispatch each metric result to all configured metric results processors. + + Parameters: + results (list[dict[MetricTagT, MetricValueTypeT]]): A list of metric result mappings where each mapping's keys are metric tags and values are metric values; each mapping will be processed by every metric results processor. + """ await asyncio.gather( *[ results_processor.process_result(result) @@ -322,10 +341,11 @@ async def _send_results_to_results_processors( async def _send_telemetry_to_results_processors( self, telemetry_records: list[TelemetryRecord] ) -> None: - """Send individual telemetry records to telemetry results processors only. - - Args: - telemetry_records: Batch of records from single collection cycle + """ + Forward telemetry records to each registered telemetry results processor, invoking per-record processing. + + Parameters: + telemetry_records (list[TelemetryRecord]): Telemetry records collected in a single cycle to be processed individually by telemetry processors. """ await asyncio.gather( *[ @@ -339,7 +359,12 @@ async def _send_telemetry_to_results_processors( async def _on_credit_phase_start( self, phase_start_msg: CreditPhaseStartMessage ) -> None: - """Handle a credit phase start message in order to track the total number of expected requests.""" + """ + Record profiling phase start metadata (start time, expected duration, and expected request count). + + Parameters: + phase_start_msg (CreditPhaseStartMessage): Message describing the credit phase start. If the message's phase is PROFILING, the manager will store its start timestamp, expected duration, and total expected requests; otherwise no action is taken. + """ if phase_start_msg.phase != CreditPhase.PROFILING: return async with self.processing_status_lock: @@ -466,7 +491,12 @@ async def _report_realtime_metrics(self) -> None: ) async def _generate_realtime_metrics(self) -> list[MetricResult]: - """Generate the real-time metrics for the profile run.""" + """ + Collect current realtime metric summaries from all metric results processors. + + Returns: + metrics (list[MetricResult]): Flattened list of MetricResult instances returned by the metric results processors; excludes non-MetricResult values and processor results that failed. + """ results = await asyncio.gather( *[ results_processor.summarize() @@ -483,7 +513,17 @@ async def _generate_realtime_metrics(self) -> list[MetricResult]: ] async def _process_results(self, cancelled: bool) -> ProcessRecordsResult: - """Process the results.""" + """ + Finalize and publish aggregated metric results and trigger independent telemetry publishing. + + Gathers summaries from metric-only (inference) results processors, aggregates metric records and any processing errors, builds a ProfileResults object containing records, counts, timing, and an error summary, and publishes a ProcessRecordsResultMessage. After publishing the combined metric results, initiates separate telemetry result publishing. The final aggregated ProcessRecordsResult is returned. + + Parameters: + cancelled (bool): Whether profiling was cancelled before completion. + + Returns: + ProcessRecordsResult: The aggregated processing outcome including profile results and any errors. + """ self.debug(lambda: f"Processing records (cancelled: {cancelled})") self.info("Processing records results...") @@ -536,13 +576,11 @@ async def _process_results(self, cancelled: bool) -> ProcessRecordsResult: return result async def export_telemetry_independently(self) -> TelemetryResults | None: - """Export telemetry data independently from inference results. - - This method provides a separate export path for telemetry data that doesn't - interfere with the inference results pipeline. - + """ + Provide telemetry results separately from the inference/results processing pipeline. + Returns: - TelemetryResults if telemetry data was collected, None otherwise + TelemetryResults: Collected telemetry data with timing, endpoints, and error summary, or `None` if no telemetry endpoints were present. """ async with self._telemetry_hierarchy_lock: if not self._telemetry_hierarchy.dcgm_endpoints: @@ -562,7 +600,14 @@ async def export_telemetry_independently(self) -> TelemetryResults | None: return telemetry_results async def _publish_telemetry_results(self, cancelled: bool) -> None: - """Publish telemetry results independently - mirrors inference results pattern.""" + """ + Publish collected telemetry results as a ProcessTelemetryResultMessage. + + If telemetry data is available, include the exported TelemetryResults and the list of unique telemetry errors; otherwise publish an empty TelemetryResults wrapper with no errors. + + Parameters: + cancelled (bool): Indicates whether processing was cancelled; accepted for parity with result-publishing APIs. + """ try: telemetry_results = await self.export_telemetry_independently() if telemetry_results: @@ -598,7 +643,12 @@ async def _publish_telemetry_results(self, cancelled: bool) -> None: self.error(f"Failed to publish telemetry results: {e}") async def get_error_summary(self) -> list[ErrorDetailsCount]: - """Generate a summary of the error records.""" + """ + Builds a list summarizing recorded errors and their observed counts. + + Returns: + list[ErrorDetailsCount]: Each element contains `error_details` and the corresponding `count` observed. + """ async with self.error_summary_lock: return [ ErrorDetailsCount(error_details=error_details, count=count) @@ -606,7 +656,12 @@ async def get_error_summary(self) -> list[ErrorDetailsCount]: ] async def get_telemetry_error_summary(self) -> list[ErrorDetailsCount]: - """Generate a summary of the telemetry error records.""" + """ + Return counts of telemetry error details accumulated during processing. + + Returns: + list[ErrorDetailsCount]: A list of ErrorDetailsCount objects pairing each telemetry error details entry with its occurrence count. + """ async with self._telemetry_error_counts_lock: return [ ErrorDetailsCount(error_details=error_details, count=count) diff --git a/tests/gpu_telemetry/conftest.py b/tests/gpu_telemetry/conftest.py index ec78d5284..6998d4668 100644 --- a/tests/gpu_telemetry/conftest.py +++ b/tests/gpu_telemetry/conftest.py @@ -11,7 +11,16 @@ @pytest.fixture def sample_dcgm_data(): - """Sample DCGM metrics data in Prometheus format (single GPU).""" + """ + Return a sample Prometheus-formatted DCGM metrics payload for a single GPU. + + The string contains metric HELP/TYPE headers and sample values for one GPU, including: + SM and memory clock frequencies (MHz), power usage (W), total energy consumption since boot (mJ), + GPU utilization (%), and framebuffer memory used (MiB). + + Returns: + str: Prometheus-formatted metrics representing a single GPU's DCGM output. + """ return """# HELP DCGM_FI_DEV_SM_CLOCK SM clock frequency (in MHz) # TYPE DCGM_FI_DEV_SM_CLOCK gauge @@ -36,7 +45,19 @@ def sample_dcgm_data(): @pytest.fixture def multi_gpu_dcgm_data(): - """Sample DCGM metrics data with multiple GPUs.""" + """ + Provide a Prometheus-formatted sample of DCGM metrics for three GPUs. + + The returned text includes HELP/TYPE headers and metric lines for: + - DCGM_FI_DEV_POWER_USAGE (power draw in W) + - DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION (total energy since boot in mJ, counter) + - DCGM_FI_DEV_GPU_UTIL (GPU utilization in %) + - DCGM_FI_DEV_FB_USED (framebuffer memory used in MiB) + + Each metric line contains per-GPU labels: gpu, UUID, pci_bus_id, device, modelName, and Hostname. + Returns: + A multi-line string with Prometheus-format DCGM metrics for three GPUs. + """ return """# HELP DCGM_FI_DEV_POWER_USAGE Power draw (in W) # TYPE DCGM_FI_DEV_POWER_USAGE gauge @@ -63,7 +84,14 @@ def multi_gpu_dcgm_data(): @pytest.fixture def sample_telemetry_records(): - """Sample TelemetryRecord objects for testing.""" + """ + Provide a single example TelemetryRecord representing one GPU telemetry snapshot. + + The record includes identifying fields (timestamp_ns, dcgm_url, gpu_index, gpu_model_name, gpu_uuid, pci_bus_id, device, hostname) and measured metrics: gpu_power_usage (watts), energy_consumption (millijoules), gpu_utilization (percent), and gpu_memory_used (gigabytes). + + Returns: + list[TelemetryRecord]: A list containing one populated TelemetryRecord instance. + """ return [ TelemetryRecord( diff --git a/tests/gpu_telemetry/test_telemetry_data_collector.py b/tests/gpu_telemetry/test_telemetry_data_collector.py index 6d193b846..080d8676b 100644 --- a/tests/gpu_telemetry/test_telemetry_data_collector.py +++ b/tests/gpu_telemetry/test_telemetry_data_collector.py @@ -31,9 +31,23 @@ def setup_method(self): self.errors_received = [] def record_callback(records, collector_id): + """ + Append received telemetry records to the test collector's received-records list. + + Parameters: + records (Iterable[TelemetryRecord]): Telemetry records produced by the collector. + collector_id (str): Identifier of the collector that produced the records. + """ self.records_received.extend(records) def error_callback(error, collector_id): + """ + Record an error reported by a telemetry collector. + + Parameters: + error: The exception or error object produced during collection. + collector_id (str): Identifier of the collector that reported the error. + """ self.errors_received.append(error) self.record_callback = record_callback @@ -234,6 +248,13 @@ async def test_successful_collection_loop(self, sample_dcgm_data): records_received = [] def record_callback(records, collector_id): + """ + Append received telemetry records to the shared `records_received` list. + + Parameters: + records (iterable): Telemetry records delivered by the collector. + collector_id (str): Identifier of the collector that produced the records. + """ records_received.extend(records) collector = TelemetryDataCollector( @@ -325,6 +346,16 @@ async def test_callback_exception_resilience(self, sample_dcgm_data): call_count = 0 def failing_callback(records, collector_id): + """ + Simulates a failing record callback by incrementing a shared invocation counter and then raising an exception. + + Parameters: + records: The telemetry records passed to the callback. + collector_id: Identifier of the collector invoking the callback. + + Raises: + ValueError: Always raised to simulate a callback failure. + """ nonlocal call_count call_count += 1 raise ValueError("Callback failed")