diff --git a/so_vector/README.md b/so_vector/README.md index 3125b416d..c7510486a 100644 --- a/so_vector/README.md +++ b/so_vector/README.md @@ -65,6 +65,7 @@ This track accepts the following parameters with Rally 0.8.0+ using `--track-par * `bulk_size` (default: 500) * `bulk_indexing_clients` (default: 1) * `esql_enabled` (default: false) : Controls if the ESQL-specific benchmarks are enabled. +* `esql_profiling_enabled` (default: true) : True to add separate profiling runs for ESQL operations. * `ingest_percentage` (default: 100) * `max_num_segments` (default: 1) * `force_merge_timeout` (default: 7200) : How long force merge should be allowed to run before aborting. @@ -74,6 +75,7 @@ This track accepts the following parameters with Rally 0.8.0+ using `--track-par * `corpora` (default: "so_vector_float"): The dataset to use. The default data set represents vectors as float arrays. Use "so_vector_base64" for the same dataset with vectors encoded as base64 strings. * `warmup_iterations` (default: 100) - Number of iterations that each client should execute to warmup the benchmark candidate. * `iterations` (default: 100) - Number of measurement iterations that each client executes. +* `profile_iterations` (default: 100) - Number of profiling iterations that each client executes. * `search_clients` (default: 8) - Number of clients that issue search requests in the multi-client search challenges. ### License diff --git a/so_vector/challenges/default.json b/so_vector/challenges/default.json index 1df076440..05afbd5c1 100644 --- a/so_vector/challenges/default.json +++ b/so_vector/challenges/default.json @@ -1,4 +1,5 @@ {% set is_esql_enabled = (esql_enabled | default(false)) %} +{% set is_esql_profiling_enabled = (esql_profiling_enabled | default(true)) %} { "name": "index-and-search", @@ -519,6 +520,32 @@ "iterations": {{ iterations | default(100) }}, "clients": 1 } + {% if is_esql_profiling_enabled %} + ,{ + "name": "esql-profile-knn-search-default-match-all", + "tags": ["profile"], + "operation": "esql-profile-knn-search-default-match-all", + "warmup-iterations": {{ warmup_iterations | default(100) }}, + "iterations": {{ profile_iterations | default(100) }}, + "clients": 1 + }, + { + "name": "esql-profile-knn-search-100-300-match-all", + "tags": ["profile"], + "operation": "esql-profile-knn-search-100-300-match-all", + "warmup-iterations": {{ warmup_iterations | default(100) }}, + "iterations": {{ profile_iterations | default(100) }}, + "clients": 1 + }, + { + "name": "esql-profile-script-score-query-match-all", + "tags": ["profile"], + "operation": "esql-profile-script-score-query-match-all", + "warmup-iterations": {{ warmup_iterations | default(100) }}, + "iterations": {{ profile_iterations | default(100) }}, + "clients": 1 + } + {% endif %} {% endif %} {# non-serverless-after-force-merge-marker-start #}{%- if build_flavor != "serverless" or serverless_operator == true -%} {% if p_include_force_merge %} diff --git a/so_vector/operations/default.json b/so_vector/operations/default.json index 2a1d08042..5f04fcffb 100644 --- a/so_vector/operations/default.json +++ b/so_vector/operations/default.json @@ -308,6 +308,11 @@ "operation-type": "esql", "param-source": "esql-knn-param-source" }, +{ + "name": "esql-profile-knn-search-default-match-all", + "operation-type": "esql-profile", + "param-source": "esql-knn-param-source" +}, { "name": "knn-recall-default-match-all", "operation-type": "knn-recall", @@ -384,6 +389,13 @@ "k": 100, "num_candidates": 300 }, +{ + "name": "esql-profile-knn-search-100-300-match-all", + "operation-type": "esql-profile", + "param-source": "esql-knn-param-source", + "k": 100, + "num_candidates": 300 +}, { "name": "knn-recall-100-300-match-all", "operation-type": "knn-recall", @@ -537,6 +549,13 @@ "exact": true, "k": 10 }, +{ + "name": "esql-profile-script-score-query-match-all", + "operation-type": "esql-profile", + "param-source": "esql-knn-param-source", + "exact": true, + "k": 10 +}, { "name": "script-score-query-10-50-random-10-percent", "operation-type": "search", diff --git a/so_vector/track.py b/so_vector/track.py index 788fe64b2..4149da26e 100644 --- a/so_vector/track.py +++ b/so_vector/track.py @@ -4,6 +4,8 @@ import os from typing import Any, Final, List, Optional +from esrally.driver import runner + logger = logging.getLogger(__name__) QUERIES_FILENAME: str = "queries.json.bz2" TRUE_KNN_FILENAME: str = "queries-recall.json.bz2" @@ -285,8 +287,109 @@ def __repr__(self, *args, **kwargs): return "knn-recall" +class EsqlProfileRunner(runner.Runner): + """ + Runs an ES|QL query using profile: true, and adds the profile information to the result: + + - meta.query.took_ms: Total query time took + - meta.planning.took_ms: Planning time before query execution, includes parsing, preanalysis, analysis + - meta.parsing.took_ms: Time it took to parse the ESQL query + - meta.preanalysis.took_ms: Preanalysis, including field_caps, enrich policies, lookup indices + - meta.analysis.took_ms: Analysis time before optimizations + - meta..cpu_ms: Total plan CPU time + - meta..took_ms: Total plan took time + - meta..logical_optimization.took_ms: Plan logical optimization took time + - meta..physical_optimization.took_ms: Plan physical optimization took time + - meta..reduction.took_ms: : Node reduction plan generation took time + - meta...process_ms: Processing time for each operator in the plan + """ + + async def __call__(self, es, params): + import time + + # Extract transport-level parameters (timeouts, headers, etc.) + params, request_params, transport_params, headers = self._transport_request_params(params) + es = es.options(**transport_params) + + # Get the ESQL query and params (mandatory parameters) + query = runner.mandatory(params, "query", self) + + # Build the request body with the query and profile enabled + body = params.get("body", {}) + body["query"] = query + body["profile"] = True + + # Add optional filter if provided + query_filter = params.get("filter") + if query_filter: + body["filter"] = query_filter + + # Set headers if not provided (preserves prior behavior) + if not bool(headers): + headers = None + + # Execute the ESQL query with profiling + response = await es.perform_request(method="POST", path="/_query", headers=headers, body=body, params=request_params) + profile = response["profile"] + + # Build took_ms entries for each profiled phase + result = {} + if profile: + for phase_name in ["query", "planning", "parsing", "preanalysis", "dependency_resolution", "analysis"]: + if phase_name in profile: + took_nanos = profile.get(phase_name, []).get("took_nanos", 0) + if took_nanos > 0: + result[f"{phase_name}.took_ms"] = took_nanos / 1_000_000 # Convert to milliseconds + + # Extract driver-level metrics + drivers = profile.get("drivers", []) + for driver in drivers: + driver_name = driver.get("description", "unknown") + took_nanos = driver.get("took_nanos", 0) + cpu_nanos = driver.get("cpu_nanos", 0) + + # Add driver-level timing metrics + result[f"{driver_name}.took_ms"] = took_nanos / 1_000_000 # Convert to milliseconds + result[f"{driver_name}.cpu_ms"] = cpu_nanos / 1_000_000 + + # Extract operator-level metrics + operators = driver.get("operators", []) + for idx, operator in enumerate(operators): + operator_name = operator.get("operator", f"operator_{idx}") + # Sanitize operator name for use as a metric key (remove brackets) + safe_operator_name = operator_name.split("[")[0] if "[" in operator_name else operator_name + + # Get process_nanos and cpu_nanos from operator status + status = operator.get("status", {}) + + process_nanos = status.get("process_nanos", 0) + if process_nanos > 0: + metric_key = f"{driver_name}.{safe_operator_name}.process_ms" + result[metric_key] = result.get(metric_key, 0) + process_nanos / 1_000_000 # Convert to milliseconds + + # Extract plan-level metrics + plans = profile.get("plans", []) + for plan in plans: + plan_name = plan.get("description", "unknown") + + # Extract optimization level metrics + for optimization in ["logical_optimization_nanos", "physical_optimization_nanos", "reduction_nanos"]: + optimization_nanos = plan.get(optimization, 0) + if optimization_nanos > 0: + # Remove "_nanos" suffix from the metric name + metric_name = optimization.replace("_nanos", "") + metric_key = f"{plan_name}.{metric_name}.took_ms" + result[metric_key] = result.get(metric_key, 0) + optimization_nanos / 1_000_000 # Convert to milliseconds + + return result + + def __repr__(self, *args, **kwargs): + return "esql-profile" + + def register(registry): registry.register_param_source("knn-param-source", KnnParamSource) registry.register_param_source("esql-knn-param-source", ESQLKnnParamSource) registry.register_param_source("knn-recall-param-source", KnnRecallParamSource) registry.register_runner("knn-recall", KnnRecallRunner(), async_runner=True) + registry.register_runner("esql-profile", EsqlProfileRunner(), async_runner=True)