Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion it/test_all_tracks_and_challenges.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@


class TestTrackRepository:
skip_tracks = ["elastic/logs", "elastic/security", "k8s_metrics", "sql", "elser-ingest-speedtest", "msmarco-v2-vector", "openai_vector"]
skip_tracks = [
"elastic/logs",
"elastic/security",
"k8s_metrics",
"sql",
"elser-ingest-speedtest",
"msmarco-v2-vector",
"openai_vector",
"random_vector",
]
disable_assertions = {
"http_logs": ["append-no-conflicts", "runtime-fields"],
"nyc_taxis": ["update-aggs-only"],
Expand Down
29 changes: 23 additions & 6 deletions random_vector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,25 @@ of brute force search over vectors filtered by a partition ID.

## Indexing

To begin indexing, the track initiates `index_clients` clients, each executing `index_iterations` bulk operations of size `index_bulk_size`.
Consequently, the total number of documents indexed by the track is calculated as follows: `index_clients` * `index_iterations` * `index_bulk_size`.
Indexing runs in one of two modes, depending on whether `index_target_throughput` is specified.
The track launches `index_clients` parallel clients. Each client sends `index_iterations` bulk requests, with each request containing `index_bulk_size` documents.

Each document in the bulk is assigned a random vector of dimensions `dims` and a random partition ID.
The resulting index is sorted on the partition id. This helps make sure vectors are close together when we do filtered searches.
The total number of documents indexed is:
`index_clients` × `index_iterations` × `index_bulk_size`

* If `index_target_throughput` is set, each client will send bulk operations at a rate of:
`index_target_throughput` ÷ `index_clients` bulk requests per second.
* If `index_target_throughput` is not set, each client will send bulk operations as fast as possible.

### Document content and index layout

Each document indexed includes:

* A random vector with `dims` dimensions.
* A randomly assigned partition ID.

The index is sorted by partition ID.
This ensures that vectors from the same partition are stored close together, improving the efficiency of filtered searches.

## Search Operations

Expand All @@ -22,13 +36,16 @@ These operations are executed against the index using various DSL flavors, inclu

This track accepts the following parameters with Rally 0.8.0+ using `--track-params`:

- use_synthetic_source (default: true)
- number_of_shards (default: 1)
- number_of_replicas (default: 0)
- vector_index_type (default: flat)
- vector_index_type (default: bbq_flat)
- index_target_throughput (default: undefined)
- index_clients (default: 1)
- index_iterations (default: 1000)
- index_bulk_size (default: 1000)
- search_iterations (default: 1000)
- search_clients (default: 8)
- dims (default: 128)
- partitions (default: 1000)
- partitions (default: 1000)
- rescore_oversample (default: 0)
55 changes: 24 additions & 31 deletions random_vector/challenges/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,25 @@
"default": true,
"schedule": [
{
"name": "delete-index",
"operation": "delete-index"
"name": "delete-data-stream",
"operation": {
"operation-type": "delete-data-stream",
"data-stream": [
"vectors-benchmark-random"
]
}
},
{
"name": "delete-templates",
"operation": {
"operation-type": "delete-composable-template"
}
},
{
"name": "create-index",
"operation": "create-index"
"name": "create-templates",
"operation": {
"operation-type": "create-composable-template"
}
},
{
"name": "check-cluster-health",
Expand All @@ -18,9 +31,11 @@
{
"name": "random-indexing",
"operation": "random-bulk-indexing",
{%- if index_target_throughput is defined %}
"target-throughput": {{ index_target_throughput | int }},
{%- endif %}
"clients": {{ index_clients | default(1) | int }},
"iterations": {{ index_iterations | default(1000) | int }},
"bulk-size": {{ index_bulk_size | default(1000)}}
"iterations": {{ index_iterations | default(1000) | int }}
},
{
"name": "refresh-after-index",
Expand All @@ -31,33 +46,11 @@
}
},
{
"name": "script-score-filtered-search-single-client",
"operation": "brute-force-filtered-search",
"script": true,
"warmup-iterations": 100,
"iterations": {{ search_iterations | default(1000) | int }}
},
{
"name": "script-score-filtered-search-multiple-client",
"operation": "brute-force-filtered-search",
"script": true,
"warmup-iterations": 100,
"iterations": {{ search_iterations | default(1000) | int }},
"clients": {{ search_clients | default(8) | int }}
},
{
"name": "knn-filtered-search-single-client",
"operation": "brute-force-filtered-search",
"script": false,
"warmup-iterations": 100,
"iterations": {{ search_iterations | default(10000) | int }}
},
{
"name": "knn-filtered-search-multiple-client",
"name": "brute-force-filtered-search",
"operation": "brute-force-filtered-search",
"script": false,
"warmup-iterations": 100,
"iterations": {{ search_iterations | default(1000) | int }},
"warmup-iterations": 1000,
"iterations": {{ search_iterations | default(10000) | int }},
"clients": {{ search_clients | default(8) | int }}
}
]
Expand Down
32 changes: 0 additions & 32 deletions random_vector/index-mapping.json

This file was deleted.

38 changes: 38 additions & 0 deletions random_vector/index-template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"index_patterns": ["vectors-benchmark-*"],
"priority": 500,
"data_stream": {},
"template": {
"settings": {
{# non-serverless-index-settings-marker-start #}{%- if build_flavor != "serverless" or serverless_operator == true -%}
"number_of_shards": {{number_of_shards | default(1)}},
"number_of_replicas": {{number_of_replicas | default(0)}},
{%- endif -%}{# non-serverless-index-settings-marker-end #}
{%- if use_synthetic_source | default(true) -%}
"mapping.source.mode": "synthetic",
{%- endif -%}
"sort": {
"field": "partition_id"
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"partition_id": {
"type": "keyword"
},
"emb": {
"type": "dense_vector",
"dims": {{ dims | default(128) | tojson }},
"index": true,
"similarity": "cosine",
"index_options": {
"type": {{ vector_index_type | default("bbq_flat") | tojson }}
}
}
}
}
}
}
20 changes: 6 additions & 14 deletions random_vector/operations/default.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,3 @@
{
"name": "delete-index",
"operation-type": "delete-index",
"include-in-reporting": false
},
{
"name": "create-index",
"operation-type": "create-index",
"include-in-reporting": false
},
{
"name": "check-cluster-health",
"operation-type": "cluster-health",
Expand All @@ -21,14 +11,16 @@
"name": "random-bulk-indexing",
"operation-type": "bulk",
"param-source": "random-bulk-param-source",
"dims": {{dims | default(128)}},
"partitions": {{partitions | default(1000)}}
"dims": {{ dims | default(128) | int }},
"partitions": {{ partitions | default(1000) | int }},
"bulk-size": {{ index_bulk_size | default(1000)}}
},
{
"name": "brute-force-filtered-search",
"operation-type": "search",
"param-source": "knn-param-source",
"dims": {{dims | default(128)}},
"partitions": {{partitions | default(1000)}}
"dims": {{ dims | default(128) | int }},
"partitions": {{ partitions | default(1000) | int }},
"rescore-oversample": {{ rescore_oversample | default(0) | int }}
}

10 changes: 7 additions & 3 deletions random_vector/track.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
{
"version": 2,
"description": "Benchmarking filtered search on random vectors",
"indices": [
"composable-templates": [
{
"name": "index",
"body": "index-mapping.json"
"name": "vector-index-template",
"index-pattern": "vectors-benchmark-*",
"template": "index-template.json"
}
],
"data-streams": [
{"name": "vectors-benchmark-random"}
],
"operations": [
{{ rally.collect(parts="operations/*.json") }}
],
Expand Down
44 changes: 14 additions & 30 deletions random_vector/track.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import random
import time

from esrally.track.params import ParamSource

Expand All @@ -7,19 +8,21 @@ class RandomBulkParamSource(ParamSource):
def __init__(self, track, params, **kwargs):
super().__init__(track, params, **kwargs)
self._bulk_size = params.get("bulk-size", 1000)
self._index_name = params.get("index", track.indices[0].name)
self._index_name = track.data_streams[0].name
self._dims = params.get("dims", 128)
self._partitions = params.get("partitions", 1000)

def params(self):
import numpy as np

timestamp = int(time.time()) * 1000
bulk_data = []
for _ in range(self._bulk_size):
vec = np.random.rand(self._dims)
partition_id = random.randint(0, self._partitions)
bulk_data.append({"index": {"_index": self._index_name, "routing": partition_id}})
bulk_data.append({"partition_id": partition_id, "emb": vec.tolist()})
metadata = {"_index": self._index_name}
bulk_data.append({"create": metadata})
bulk_data.append({"@timestamp": timestamp, "partition_id": partition_id, "emb": vec.tolist()})

return {
"body": bulk_data,
Expand All @@ -31,44 +34,28 @@ def params(self):
}


def generate_knn_query(query_vector, partition_id, k):
def generate_knn_query(query_vector, partition_id, k, rescore_oversample):
return {
"_source": {"exclude_vectors": True},
"knn": {
"field": "emb",
"query_vector": query_vector,
"k": k,
"num_candidates": k,
"filter": {"term": {"partition_id": partition_id}},
}
}


def generate_script_query(query_vector, partition_id):
return {
"query": {
"script_score": {
"query": {"term": {"partition_id": partition_id}},
"script": {"source": "cosineSimilarity(params.query_vector, 'emb') + 1.0", "params": {"query_vector": query_vector}},
}
}
"rescore_vector": {"oversample": rescore_oversample},
},
}


class RandomSearchParamSource:
def __init__(self, track, params, **kwargs):
# choose a suitable index: if there is only one defined for this track
# choose that one, but let the user always override index
if len(track.indices) == 1:
default_index = track.indices[0].name
else:
default_index = "_all"

self._index_name = params.get("index", default_index)
self._index_name = track.data_streams[0].name
self._cache = params.get("cache", False)
self._partitions = params.get("partitions", 1000)
self._dims = params.get("dims", 128)
self._top_k = params.get("k", 10)
self._script = params.get("script", True)
self._rescore_oversample = params.get("rescore-oversample", 0)
self.infinite = True

def partition(self, partition_index, total_partitions):
Expand All @@ -79,11 +66,8 @@ def params(self):

partition_id = random.randint(0, self._partitions)
query_vec = np.random.rand(self._dims).tolist()
if self._script:
query = generate_script_query(query_vec, partition_id)
else:
query = generate_knn_query(query_vec, partition_id, self._topk)
return {"index": self._index_name, "cache": self._cache, "size": self._top_k, "_source_excludes": ["emb"], "body": query}
query = generate_knn_query(query_vec, partition_id, self._top_k, self._rescore_oversample)
return {"index": self._index_name, "cache": self._cache, "size": self._top_k, "body": query}


def register(registry):
Expand Down
Loading