Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions so_vector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ This track accepts the following parameters with Rally 0.8.0+ using `--track-par
* `bulk_size` (default: 500)
* `bulk_indexing_clients` (default: 1)
* `esql_enabled` (default: false) : Controls if the ESQL-specific benchmarks are enabled.
* `esql_profiling_enabled` (default: true) : True to add separate profiling runs for ESQL operations.
* `ingest_percentage` (default: 100)
* `max_num_segments` (default: 1)
* `force_merge_timeout` (default: 7200) : How long force merge should be allowed to run before aborting.
Expand All @@ -74,6 +75,7 @@ This track accepts the following parameters with Rally 0.8.0+ using `--track-par
* `corpora` (default: "so_vector_float"): The dataset to use. The default data set represents vectors as float arrays. Use "so_vector_base64" for the same dataset with vectors encoded as base64 strings.
* `warmup_iterations` (default: 100) - Number of iterations that each client should execute to warmup the benchmark candidate.
* `iterations` (default: 100) - Number of measurement iterations that each client executes.
* `profile_iterations` (default: 100) - Number of profiling iterations that each client executes.
* `search_clients` (default: 8) - Number of clients that issue search requests in the multi-client search challenges.

### License
Expand Down
27 changes: 27 additions & 0 deletions so_vector/challenges/default.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{% set is_esql_enabled = (esql_enabled | default(false)) %}
{% set is_esql_profiling_enabled = (esql_profiling_enabled | default(true)) %}

{
"name": "index-and-search",
Expand Down Expand Up @@ -519,6 +520,32 @@
"iterations": {{ iterations | default(100) }},
"clients": 1
}
{% if is_esql_profiling_enabled %}
,{
"name": "esql-profile-knn-search-default-match-all",
"tags": ["profile"],
"operation": "esql-profile-knn-search-default-match-all",
"warmup-iterations": {{ warmup_iterations | default(100) }},
"iterations": {{ profile_iterations | default(100) }},
"clients": 1
},
{
"name": "esql-profile-knn-search-100-300-match-all",
"tags": ["profile"],
"operation": "esql-profile-knn-search-100-300-match-all",
"warmup-iterations": {{ warmup_iterations | default(100) }},
"iterations": {{ profile_iterations | default(100) }},
"clients": 1
},
{
"name": "esql-profile-script-score-query-match-all",
"tags": ["profile"],
"operation": "esql-profile-script-score-query-match-all",
"warmup-iterations": {{ warmup_iterations | default(100) }},
"iterations": {{ profile_iterations | default(100) }},
"clients": 1
}
{% endif %}
{% endif %}
{# non-serverless-after-force-merge-marker-start #}{%- if build_flavor != "serverless" or serverless_operator == true -%}
{% if p_include_force_merge %}
Expand Down
19 changes: 19 additions & 0 deletions so_vector/operations/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@
"operation-type": "esql",
"param-source": "esql-knn-param-source"
},
{
"name": "esql-profile-knn-search-default-match-all",
"operation-type": "esql-profile",
"param-source": "esql-knn-param-source"
},
{
"name": "knn-recall-default-match-all",
"operation-type": "knn-recall",
Expand Down Expand Up @@ -384,6 +389,13 @@
"k": 100,
"num_candidates": 300
},
{
"name": "esql-profile-knn-search-100-300-match-all",
"operation-type": "esql-profile",
"param-source": "esql-knn-param-source",
"k": 100,
"num_candidates": 300
},
{
"name": "knn-recall-100-300-match-all",
"operation-type": "knn-recall",
Expand Down Expand Up @@ -537,6 +549,13 @@
"exact": true,
"k": 10
},
{
"name": "esql-profile-script-score-query-match-all",
"operation-type": "esql-profile",
"param-source": "esql-knn-param-source",
"exact": true,
"k": 10
},
{
"name": "script-score-query-10-50-random-10-percent",
"operation-type": "search",
Expand Down
103 changes: 103 additions & 0 deletions so_vector/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import os
from typing import Any, Final, List, Optional

from esrally.driver import runner

logger = logging.getLogger(__name__)
QUERIES_FILENAME: str = "queries.json.bz2"
TRUE_KNN_FILENAME: str = "queries-recall.json.bz2"
Expand Down Expand Up @@ -285,8 +287,109 @@ def __repr__(self, *args, **kwargs):
return "knn-recall"


class EsqlProfileRunner(runner.Runner):
"""
Runs an ES|QL query using profile: true, and adds the profile information to the result:

- meta.query.took_ms: Total query time took
- meta.planning.took_ms: Planning time before query execution, includes parsing, preanalysis, analysis
- meta.parsing.took_ms: Time it took to parse the ESQL query
- meta.preanalysis.took_ms: Preanalysis, including field_caps, enrich policies, lookup indices
- meta.analysis.took_ms: Analysis time before optimizations
- meta.<plan>.cpu_ms: Total plan CPU time
- meta.<plan>.took_ms: Total plan took time
- meta.<plan>.logical_optimization.took_ms: Plan logical optimization took time
- meta.<plan>.physical_optimization.took_ms: Plan physical optimization took time
- meta.<plan>.reduction.took_ms: : Node reduction plan generation took time
- meta.<plan>.<operator>.process_ms: Processing time for each operator in the plan
"""

async def __call__(self, es, params):
import time

# Extract transport-level parameters (timeouts, headers, etc.)
params, request_params, transport_params, headers = self._transport_request_params(params)
es = es.options(**transport_params)

# Get the ESQL query and params (mandatory parameters)
query = runner.mandatory(params, "query", self)

# Build the request body with the query and profile enabled
body = params.get("body", {})
body["query"] = query
body["profile"] = True

# Add optional filter if provided
query_filter = params.get("filter")
if query_filter:
body["filter"] = query_filter

# Set headers if not provided (preserves prior behavior)
if not bool(headers):
headers = None

# Execute the ESQL query with profiling
response = await es.perform_request(method="POST", path="/_query", headers=headers, body=body, params=request_params)
profile = response["profile"]

# Build took_ms entries for each profiled phase
result = {}
if profile:
for phase_name in ["query", "planning", "parsing", "preanalysis", "dependency_resolution", "analysis"]:
if phase_name in profile:
took_nanos = profile.get(phase_name, []).get("took_nanos", 0)
if took_nanos > 0:
result[f"{phase_name}.took_ms"] = took_nanos / 1_000_000 # Convert to milliseconds

# Extract driver-level metrics
drivers = profile.get("drivers", [])
for driver in drivers:
driver_name = driver.get("description", "unknown")
took_nanos = driver.get("took_nanos", 0)
cpu_nanos = driver.get("cpu_nanos", 0)

# Add driver-level timing metrics
result[f"{driver_name}.took_ms"] = took_nanos / 1_000_000 # Convert to milliseconds
result[f"{driver_name}.cpu_ms"] = cpu_nanos / 1_000_000

# Extract operator-level metrics
operators = driver.get("operators", [])
for idx, operator in enumerate(operators):
operator_name = operator.get("operator", f"operator_{idx}")
# Sanitize operator name for use as a metric key (remove brackets)
safe_operator_name = operator_name.split("[")[0] if "[" in operator_name else operator_name

# Get process_nanos and cpu_nanos from operator status
status = operator.get("status", {})

process_nanos = status.get("process_nanos", 0)
if process_nanos > 0:
metric_key = f"{driver_name}.{safe_operator_name}.process_ms"
result[metric_key] = result.get(metric_key, 0) + process_nanos / 1_000_000 # Convert to milliseconds

# Extract plan-level metrics
plans = profile.get("plans", [])
for plan in plans:
plan_name = plan.get("description", "unknown")

# Extract optimization level metrics
for optimization in ["logical_optimization_nanos", "physical_optimization_nanos", "reduction_nanos"]:
optimization_nanos = plan.get(optimization, 0)
if optimization_nanos > 0:
# Remove "_nanos" suffix from the metric name
metric_name = optimization.replace("_nanos", "")
metric_key = f"{plan_name}.{metric_name}.took_ms"
result[metric_key] = result.get(metric_key, 0) + optimization_nanos / 1_000_000 # Convert to milliseconds

return result

def __repr__(self, *args, **kwargs):
return "esql-profile"


def register(registry):
registry.register_param_source("knn-param-source", KnnParamSource)
registry.register_param_source("esql-knn-param-source", ESQLKnnParamSource)
registry.register_param_source("knn-recall-param-source", KnnRecallParamSource)
registry.register_runner("knn-recall", KnnRecallRunner(), async_runner=True)
registry.register_runner("esql-profile", EsqlProfileRunner(), async_runner=True)
Loading