diff --git a/it/test_all_tracks_and_challenges.py b/it/test_all_tracks_and_challenges.py index e8bbcd845..3f4267404 100644 --- a/it/test_all_tracks_and_challenges.py +++ b/it/test_all_tracks_and_challenges.py @@ -21,7 +21,16 @@ class TestTrackRepository: - skip_tracks = ["elastic/logs", "elastic/security", "k8s_metrics", "sql", "elser-ingest-speedtest", "msmarco-v2-vector", "openai_vector"] + skip_tracks = [ + "elastic/logs", + "elastic/security", + "k8s_metrics", + "sql", + "elser-ingest-speedtest", + "msmarco-v2-vector", + "openai_vector", + "random_vector", + ] disable_assertions = { "http_logs": ["append-no-conflicts", "runtime-fields"], "nyc_taxis": ["update-aggs-only"], diff --git a/random_vector/README.md b/random_vector/README.md index 94334593c..64714ce86 100644 --- a/random_vector/README.md +++ b/random_vector/README.md @@ -7,11 +7,25 @@ of brute force search over vectors filtered by a partition ID. ## Indexing -To begin indexing, the track initiates `index_clients` clients, each executing `index_iterations` bulk operations of size `index_bulk_size`. -Consequently, the total number of documents indexed by the track is calculated as follows: `index_clients` * `index_iterations` * `index_bulk_size`. +Indexing runs in one of two modes, depending on whether `index_target_throughput` is specified. +The track launches `index_clients` parallel clients. Each client sends `index_iterations` bulk requests, with each request containing `index_bulk_size` documents. -Each document in the bulk is assigned a random vector of dimensions `dims` and a random partition ID. -The resulting index is sorted on the partition id. This helps make sure vectors are close together when we do filtered searches. +The total number of documents indexed is: +`index_clients` × `index_iterations` × `index_bulk_size` + +* If `index_target_throughput` is set, each client will send bulk operations at a rate of: + `index_target_throughput` ÷ `index_clients` bulk requests per second. +* If `index_target_throughput` is not set, each client will send bulk operations as fast as possible. + +### Document content and index layout + +Each document indexed includes: + +* A random vector with `dims` dimensions. +* A randomly assigned partition ID. + +The index is sorted by partition ID. +This ensures that vectors from the same partition are stored close together, improving the efficiency of filtered searches. ## Search Operations @@ -22,13 +36,16 @@ These operations are executed against the index using various DSL flavors, inclu This track accepts the following parameters with Rally 0.8.0+ using `--track-params`: + - use_synthetic_source (default: true) - number_of_shards (default: 1) - number_of_replicas (default: 0) - - vector_index_type (default: flat) + - vector_index_type (default: bbq_flat) + - index_target_throughput (default: undefined) - index_clients (default: 1) - index_iterations (default: 1000) - index_bulk_size (default: 1000) - search_iterations (default: 1000) - search_clients (default: 8) - dims (default: 128) - - partitions (default: 1000) \ No newline at end of file + - partitions (default: 1000) + - rescore_oversample (default: 0) \ No newline at end of file diff --git a/random_vector/challenges/default.json b/random_vector/challenges/default.json index 7cdead899..eb2e02de7 100644 --- a/random_vector/challenges/default.json +++ b/random_vector/challenges/default.json @@ -4,12 +4,25 @@ "default": true, "schedule": [ { - "name": "delete-index", - "operation": "delete-index" + "name": "delete-data-stream", + "operation": { + "operation-type": "delete-data-stream", + "data-stream": [ + "vectors-benchmark-random" + ] + } + }, + { + "name": "delete-templates", + "operation": { + "operation-type": "delete-composable-template" + } }, { - "name": "create-index", - "operation": "create-index" + "name": "create-templates", + "operation": { + "operation-type": "create-composable-template" + } }, { "name": "check-cluster-health", @@ -18,9 +31,11 @@ { "name": "random-indexing", "operation": "random-bulk-indexing", + {%- if index_target_throughput is defined %} + "target-throughput": {{ index_target_throughput | int }}, + {%- endif %} "clients": {{ index_clients | default(1) | int }}, - "iterations": {{ index_iterations | default(1000) | int }}, - "bulk-size": {{ index_bulk_size | default(1000)}} + "iterations": {{ index_iterations | default(1000) | int }} }, { "name": "refresh-after-index", @@ -31,33 +46,11 @@ } }, { - "name": "script-score-filtered-search-single-client", - "operation": "brute-force-filtered-search", - "script": true, - "warmup-iterations": 100, - "iterations": {{ search_iterations | default(1000) | int }} - }, - { - "name": "script-score-filtered-search-multiple-client", - "operation": "brute-force-filtered-search", - "script": true, - "warmup-iterations": 100, - "iterations": {{ search_iterations | default(1000) | int }}, - "clients": {{ search_clients | default(8) | int }} - }, - { - "name": "knn-filtered-search-single-client", - "operation": "brute-force-filtered-search", - "script": false, - "warmup-iterations": 100, - "iterations": {{ search_iterations | default(10000) | int }} - }, - { - "name": "knn-filtered-search-multiple-client", + "name": "brute-force-filtered-search", "operation": "brute-force-filtered-search", "script": false, - "warmup-iterations": 100, - "iterations": {{ search_iterations | default(1000) | int }}, + "warmup-iterations": 1000, + "iterations": {{ search_iterations | default(10000) | int }}, "clients": {{ search_clients | default(8) | int }} } ] diff --git a/random_vector/index-mapping.json b/random_vector/index-mapping.json deleted file mode 100644 index 2dcbf5fb7..000000000 --- a/random_vector/index-mapping.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "settings": { - "index": { - {# non-serverless-index-settings-marker-start #}{%- if build_flavor != "serverless" or serverless_operator == true -%} - "number_of_shards": {{number_of_shards | default(1)}}, - "number_of_replicas": {{number_of_replicas | default(0)}}, - {%- endif -%}{# non-serverless-index-settings-marker-end #} - "sort": { - "field": "partition_id" - }, - "mapping.source.mode": "synthetic" - } - }, - "mappings": { - "_routing": { - "required": true - }, - "properties": { - "partition_id": { - "type": "keyword" - }, - "emb": { - "type": "dense_vector", - "index": true, - "similarity": "cosine", - "index_options": { - "type": {{ vector_index_type | default("flat") | tojson }} - } - } - } - } -} \ No newline at end of file diff --git a/random_vector/index-template.json b/random_vector/index-template.json new file mode 100644 index 000000000..4899ed685 --- /dev/null +++ b/random_vector/index-template.json @@ -0,0 +1,38 @@ +{ + "index_patterns": ["vectors-benchmark-*"], + "priority": 500, + "data_stream": {}, + "template": { + "settings": { + {# non-serverless-index-settings-marker-start #}{%- if build_flavor != "serverless" or serverless_operator == true -%} + "number_of_shards": {{number_of_shards | default(1)}}, + "number_of_replicas": {{number_of_replicas | default(0)}}, + {%- endif -%}{# non-serverless-index-settings-marker-end #} + {%- if use_synthetic_source | default(true) -%} + "mapping.source.mode": "synthetic", + {%- endif -%} + "sort": { + "field": "partition_id" + } + }, + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "partition_id": { + "type": "keyword" + }, + "emb": { + "type": "dense_vector", + "dims": {{ dims | default(128) | tojson }}, + "index": true, + "similarity": "cosine", + "index_options": { + "type": {{ vector_index_type | default("bbq_flat") | tojson }} + } + } + } + } + } +} \ No newline at end of file diff --git a/random_vector/operations/default.json b/random_vector/operations/default.json index 6bc19f69d..916b43e48 100644 --- a/random_vector/operations/default.json +++ b/random_vector/operations/default.json @@ -1,13 +1,3 @@ -{ - "name": "delete-index", - "operation-type": "delete-index", - "include-in-reporting": false -}, -{ - "name": "create-index", - "operation-type": "create-index", - "include-in-reporting": false -}, { "name": "check-cluster-health", "operation-type": "cluster-health", @@ -21,14 +11,16 @@ "name": "random-bulk-indexing", "operation-type": "bulk", "param-source": "random-bulk-param-source", - "dims": {{dims | default(128)}}, - "partitions": {{partitions | default(1000)}} + "dims": {{ dims | default(128) | int }}, + "partitions": {{ partitions | default(1000) | int }}, + "bulk-size": {{ index_bulk_size | default(1000)}} }, { "name": "brute-force-filtered-search", "operation-type": "search", "param-source": "knn-param-source", - "dims": {{dims | default(128)}}, - "partitions": {{partitions | default(1000)}} + "dims": {{ dims | default(128) | int }}, + "partitions": {{ partitions | default(1000) | int }}, + "rescore-oversample": {{ rescore_oversample | default(0) | int }} } diff --git a/random_vector/track.json b/random_vector/track.json index dcfaaf2b1..38e4bc2e6 100644 --- a/random_vector/track.json +++ b/random_vector/track.json @@ -2,12 +2,16 @@ { "version": 2, "description": "Benchmarking filtered search on random vectors", - "indices": [ + "composable-templates": [ { - "name": "index", - "body": "index-mapping.json" + "name": "vector-index-template", + "index-pattern": "vectors-benchmark-*", + "template": "index-template.json" } ], + "data-streams": [ + {"name": "vectors-benchmark-random"} + ], "operations": [ {{ rally.collect(parts="operations/*.json") }} ], diff --git a/random_vector/track.py b/random_vector/track.py index 5a4f223b4..68f599165 100644 --- a/random_vector/track.py +++ b/random_vector/track.py @@ -1,4 +1,5 @@ import random +import time from esrally.track.params import ParamSource @@ -7,19 +8,21 @@ class RandomBulkParamSource(ParamSource): def __init__(self, track, params, **kwargs): super().__init__(track, params, **kwargs) self._bulk_size = params.get("bulk-size", 1000) - self._index_name = params.get("index", track.indices[0].name) + self._index_name = track.data_streams[0].name self._dims = params.get("dims", 128) self._partitions = params.get("partitions", 1000) def params(self): import numpy as np + timestamp = int(time.time()) * 1000 bulk_data = [] for _ in range(self._bulk_size): vec = np.random.rand(self._dims) partition_id = random.randint(0, self._partitions) - bulk_data.append({"index": {"_index": self._index_name, "routing": partition_id}}) - bulk_data.append({"partition_id": partition_id, "emb": vec.tolist()}) + metadata = {"_index": self._index_name} + bulk_data.append({"create": metadata}) + bulk_data.append({"@timestamp": timestamp, "partition_id": partition_id, "emb": vec.tolist()}) return { "body": bulk_data, @@ -31,44 +34,28 @@ def params(self): } -def generate_knn_query(query_vector, partition_id, k): +def generate_knn_query(query_vector, partition_id, k, rescore_oversample): return { + "_source": {"exclude_vectors": True}, "knn": { "field": "emb", "query_vector": query_vector, "k": k, "num_candidates": k, "filter": {"term": {"partition_id": partition_id}}, - } - } - - -def generate_script_query(query_vector, partition_id): - return { - "query": { - "script_score": { - "query": {"term": {"partition_id": partition_id}}, - "script": {"source": "cosineSimilarity(params.query_vector, 'emb') + 1.0", "params": {"query_vector": query_vector}}, - } - } + "rescore_vector": {"oversample": rescore_oversample}, + }, } class RandomSearchParamSource: def __init__(self, track, params, **kwargs): - # choose a suitable index: if there is only one defined for this track - # choose that one, but let the user always override index - if len(track.indices) == 1: - default_index = track.indices[0].name - else: - default_index = "_all" - - self._index_name = params.get("index", default_index) + self._index_name = track.data_streams[0].name self._cache = params.get("cache", False) self._partitions = params.get("partitions", 1000) self._dims = params.get("dims", 128) self._top_k = params.get("k", 10) - self._script = params.get("script", True) + self._rescore_oversample = params.get("rescore-oversample", 0) self.infinite = True def partition(self, partition_index, total_partitions): @@ -79,11 +66,8 @@ def params(self): partition_id = random.randint(0, self._partitions) query_vec = np.random.rand(self._dims).tolist() - if self._script: - query = generate_script_query(query_vec, partition_id) - else: - query = generate_knn_query(query_vec, partition_id, self._topk) - return {"index": self._index_name, "cache": self._cache, "size": self._top_k, "_source_excludes": ["emb"], "body": query} + query = generate_knn_query(query_vec, partition_id, self._top_k, self._rescore_oversample) + return {"index": self._index_name, "cache": self._cache, "size": self._top_k, "body": query} def register(registry):