diff --git a/docs/track.rst b/docs/track.rst index 49e631093..a47b3f3bf 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -281,7 +281,7 @@ Each composable template in this list consists of the following properties: * ``name`` (mandatory): Composable template name. * ``index-pattern`` (mandatory): Index pattern that matches the composable template. This must match the definition in the template file. -* ``delete-matching-indices`` (optional, defaults to ``true``): Delete all indices that match the provided index pattern if the template is deleted. +* ``delete-matching-indices`` (optional, defaults to ``true``): Delete all indices that match the provided index pattern if the template is deleted. This setting is ignored in `Stateless Elasticsearch `_ - please use data streams and ``delete-data-stream`` operation instead. * ``template`` (mandatory): Composable template file name. * ``template-path`` (optional): JSON field inside the file content that contains the template. @@ -985,7 +985,7 @@ Properties * ``index`` (optional): An `index pattern `_ that defines which indices or data streams should be targeted by this query. Only needed if the ``indices`` or ``data-streams`` section contains more than one index or data stream respectively. Otherwise, Rally will automatically derive the index or data stream to use. If you have defined multiple indices or data streams and want to query all of them, just specify ``"index": "_all"``. * ``type`` (optional): Defines the type within the specified index for this query. By default, no ``type`` will be used and the query will be performed across all types in the provided index. Also, types have been removed in Elasticsearch 7.0.0 so you must not specify this property if you want to benchmark Elasticsearch 7.0.0 or later. -* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. +* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. When Rally is used against `Stateless Elasticsearch `_ the default is ``false``. * ``request-params`` (optional): A structure containing arbitrary request parameters. The supported parameters names are documented in the `Search URI Request docs `_. .. note:: @@ -1045,7 +1045,7 @@ Properties """""""""" * ``index`` (optional): An `index pattern `_ that defines which indices or data streams should be targeted by this query. Only needed if the ``indices`` or ``data-streams`` section contains more than one index or data stream respectively. Otherwise, Rally will automatically derive the index or data stream to use. If you have defined multiple indices or data streams and want to query all of them, just specify ``"index": "_all"``. -* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. +* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. When Rally is used against `Stateless Elasticsearch `_ the default is ``false``. * ``request-params`` (optional): A structure containing arbitrary request parameters. The supported parameters names are documented in the `Search URI Request docs `_. .. note:: @@ -1105,7 +1105,7 @@ Properties * ``index`` (optional): An `index pattern `_ that defines which indices or data streams should be targeted by this query. Only needed if the ``indices`` or ``data-streams`` section contains more than one index or data stream respectively. Otherwise, Rally will automatically derive the index or data stream to use. If you have defined multiple indices or data streams and want to query all of them, just specify ``"index": "_all"``. * ``type`` (optional): Defines the type within the specified index for this query. By default, no ``type`` will be used and the query will be performed across all types in the provided index. Also, types have been removed in Elasticsearch 7.0.0 so you must not specify this property if you want to benchmark Elasticsearch 7.0.0 or later. -* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. +* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. When Rally is used against `Stateless Elasticsearch `_ the default is ``false``. * ``request-params`` (optional): A structure containing arbitrary request parameters. The supported parameters names are documented in the `Search URI Request docs `_. .. note:: @@ -1156,7 +1156,7 @@ Properties """""""""" * ``index`` (optional): An `index pattern `_ that defines which indices or data streams should be targeted by this query. Only needed if the ``indices`` or ``data-streams`` section contains more than one index or data stream respectively. Otherwise, Rally will automatically derive the index or data stream to use. If you have defined multiple indices or data streams and want to query all of them, just specify ``"index": "_all"``. -* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. +* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. When Rally is used against `Stateless Elasticsearch `_ the default is ``false``. * ``request-params`` (optional): A structure containing arbitrary request parameters. The supported parameters names are documented in the `Search URI Request docs `_. .. note:: diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 7dd073985..439c1e859 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -1247,7 +1247,7 @@ def receiveMsg_StartWorker(self, msg, sender): # we need to wake up more often in test mode if self.config.opts("track", "test.mode.enabled"): self.wakeup_interval = 0.5 - runner.register_default_runners() + runner.register_default_runners(self.config) if self.track.has_plugins: track.load_track_plugins(self.config, self.track.name, runner.register_runner, scheduler.register_scheduler) self.drive() diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 6222e038a..457b6acc3 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -16,7 +16,6 @@ # under the License. import asyncio -import contextlib import contextvars import json import logging @@ -43,15 +42,15 @@ __RUNNERS = {} -def register_default_runners(): +def register_default_runners(config=None): register_runner(track.OperationType.Bulk, BulkIndex(), async_runner=True) register_runner(track.OperationType.ForceMerge, ForceMerge(), async_runner=True) register_runner(track.OperationType.IndexStats, Retry(IndicesStats()), async_runner=True) register_runner(track.OperationType.NodeStats, NodeStats(), async_runner=True) - register_runner(track.OperationType.Search, Query(), async_runner=True) - register_runner(track.OperationType.PaginatedSearch, Query(), async_runner=True) - register_runner(track.OperationType.CompositeAgg, Query(), async_runner=True) - register_runner(track.OperationType.ScrollSearch, Query(), async_runner=True) + register_runner(track.OperationType.Search, Query(config=config), async_runner=True) + register_runner(track.OperationType.PaginatedSearch, Query(config=config), async_runner=True) + register_runner(track.OperationType.CompositeAgg, Query(config=config), async_runner=True) + register_runner(track.OperationType.ScrollSearch, Query(config=config), async_runner=True) register_runner(track.OperationType.RawRequest, RawRequest(), async_runner=True) register_runner(track.OperationType.Composite, Composite(), async_runner=True) register_runner(track.OperationType.SubmitAsyncSearch, SubmitAsyncSearch(), async_runner=True) @@ -73,11 +72,11 @@ def register_default_runners(): register_runner(track.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True) register_runner(track.OperationType.Refresh, Retry(Refresh()), async_runner=True) register_runner(track.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True) - register_runner(track.OperationType.DeleteIndex, Retry(DeleteIndex()), async_runner=True) + register_runner(track.OperationType.DeleteIndex, Retry(DeleteIndex(config=config)), async_runner=True) register_runner(track.OperationType.CreateComponentTemplate, Retry(CreateComponentTemplate()), async_runner=True) register_runner(track.OperationType.DeleteComponentTemplate, Retry(DeleteComponentTemplate()), async_runner=True) register_runner(track.OperationType.CreateComposableTemplate, Retry(CreateComposableTemplate()), async_runner=True) - register_runner(track.OperationType.DeleteComposableTemplate, Retry(DeleteComposableTemplate()), async_runner=True) + register_runner(track.OperationType.DeleteComposableTemplate, Retry(DeleteComposableTemplate(config=config)), async_runner=True) register_runner(track.OperationType.CreateDataStream, Retry(CreateDataStream()), async_runner=True) register_runner(track.OperationType.DeleteDataStream, Retry(DeleteDataStream()), async_runner=True) register_runner(track.OperationType.CreateIndexTemplate, Retry(CreateIndexTemplate()), async_runner=True) @@ -169,9 +168,14 @@ class Runner: Base class for all operations against Elasticsearch. """ - def __init__(self, *args, **kwargs): + def __init__(self, *args, config=None, **kwargs): super().__init__(*args, **kwargs) self.logger = logging.getLogger(__name__) + self.serverless_mode = False + self.serverless_operator = False + if config: + self.serverless_mode = config.opts("driver", "serverless.mode", mandatory=False, default_value=False) + self.serverless_operator = config.opts("driver", "serverless.operator", mandatory=False, default_value=False) async def __aenter__(self): return self @@ -865,8 +869,8 @@ class Query(Runner): * ``pages``: Total number of pages that have been retrieved. """ - def __init__(self): - super().__init__() + def __init__(self, config=None): + super().__init__(config=config) self._search_after_extractor = SearchAfterExtractor() self._composite_agg_extractor = CompositeAggExtractor() @@ -890,6 +894,8 @@ async def __call__(self, es, params): cache = params.get("cache") if cache is not None: request_params["request_cache"] = str(cache).lower() + elif self.serverless_mode and not self.serverless_operator: + request_params["request_cache"] = "false" if not bool(headers): # counter-intuitive but preserves prior behavior headers = None @@ -1118,11 +1124,12 @@ async def _scroll_query(es, params): took = props.get("took", 0) all_results_collected = (size is not None and hits < size) or hits == 0 else: + # /_search/scroll does not accept request_cache so not providing params r = await es.perform_request( method="GET", path="/_search/scroll", body={"scroll_id": scroll_id, "scroll": "10s"}, - params=request_params, + params=None, headers=headers, ) props = parse(r, ["timed_out", "took"], ["hits.hits"]) @@ -1410,21 +1417,16 @@ class DeleteIndex(Runner): """ async def __call__(self, es, params): - # pylint: disable=import-outside-toplevel - import elasticsearch - ops = 0 indices = mandatory(params, "indices", self) only_if_exists = params.get("only-if-exists", False) request_params = params.get("request-params", {}) - # bypassing action.destructive_requires_name cluster setting mangling for serverless clusters + # bypass cluster settings access for serverless prior_destructive_setting = None - cluster_settings_available = False - with contextlib.suppress(elasticsearch.exceptions.NotFoundError): + if not self.serverless_mode or self.serverless_operator: prior_destructive_setting = await set_destructive_requires_name(es, False) - cluster_settings_available = True try: for index_name in indices: @@ -1436,7 +1438,7 @@ async def __call__(self, es, params): await es.indices.delete(index=index_name, params=request_params) ops += 1 finally: - if cluster_settings_available: + if not self.serverless_mode or self.serverless_operator: await set_destructive_requires_name(es, prior_destructive_setting) return { "weight": ops, @@ -1573,16 +1575,18 @@ async def __call__(self, es, params): self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name) await es.indices.delete_index_template(name=template_name, params=request_params) ops_count += 1 - # ensure that we do not provide an empty index pattern by accident - if delete_matching_indices and index_pattern: - # only set if really required - if current_destructive_setting is None: - current_destructive_setting = False - prior_destructive_setting = await set_destructive_requires_name(es, current_destructive_setting) + # 1. Ignore delete matching indices in serverless as wildcard deletes are not supported + # 2. Ensure that we do not provide an empty index pattern by accident + if not self.serverless_mode or self.serverless_operator: + if delete_matching_indices and index_pattern: + # only set if really required + if current_destructive_setting is None: + current_destructive_setting = False + prior_destructive_setting = await set_destructive_requires_name(es, current_destructive_setting) + ops_count += 1 + + await es.indices.delete(index=index_pattern) ops_count += 1 - - await es.indices.delete(index=index_pattern) - ops_count += 1 finally: if current_destructive_setting is not None: await set_destructive_requires_name(es, prior_destructive_setting) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 42833df31..61c082a7d 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -28,7 +28,7 @@ import elasticsearch import pytest -from esrally import client, exceptions +from esrally import client, config, exceptions from esrally.client.asynchronous import RallyAsyncElasticsearch from esrally.driver import runner @@ -2535,6 +2535,160 @@ async def test_query_runner_fails_with_unknown_operation_type(self, es): await query_runner(es, params) assert exc.value.args[0] == "No runner available for operation-type: [unknown]" + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_query_no_request_cache_by_default(self, es): + es.options.return_value = es + search_response = { + "timed_out": False, + "took": 5, + "_shards": {"total": 808, "successful": 808, "skipped": 0, "failed": 0}, + "hits": { + "total": { + "value": 0, + "relation": "eq", + }, + "hits": [], + }, + } + es.perform_request = mock.AsyncMock(return_value=io.BytesIO(json.dumps(search_response).encode())) + + cfg = config.Config() + query_runner = runner.Query(config=cfg) + + params = { + "operation-type": "search", + "index": "_all", + "detailed-results": True, + "body": { + "query": { + "match_all": {}, + }, + }, + } + + async with query_runner: + result = await query_runner(es, params) + + assert result == { + "weight": 1, + "unit": "ops", + "success": True, + "hits": 0, + "hits_relation": "eq", + "timed_out": False, + "took": 5, + "shards": {"total": 808, "successful": 808, "skipped": 0, "failed": 0}, + } + + es.perform_request.assert_awaited_once_with(method="GET", path="/_all/_search", params={}, body=params["body"], headers=None) + es.clear_scroll.assert_not_called() + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_query_no_request_cache_in_serverless_with_operator(self, es): + es.options.return_value = es + search_response = { + "timed_out": False, + "took": 5, + "_shards": {"total": 808, "successful": 808, "skipped": 0, "failed": 0}, + "hits": { + "total": { + "value": 0, + "relation": "eq", + }, + "hits": [], + }, + } + es.perform_request = mock.AsyncMock(return_value=io.BytesIO(json.dumps(search_response).encode())) + + cfg = config.Config() + cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True) + cfg.add(config.Scope.benchmark, "driver", "serverless.operator", True) + query_runner = runner.Query(config=cfg) + + params = { + "operation-type": "search", + "index": "_all", + "detailed-results": True, + "body": { + "query": { + "match_all": {}, + }, + }, + } + + async with query_runner: + result = await query_runner(es, params) + + assert result == { + "weight": 1, + "unit": "ops", + "success": True, + "hits": 0, + "hits_relation": "eq", + "timed_out": False, + "took": 5, + "shards": {"total": 808, "successful": 808, "skipped": 0, "failed": 0}, + } + + es.perform_request.assert_awaited_once_with(method="GET", path="/_all/_search", params={}, body=params["body"], headers=None) + es.clear_scroll.assert_not_called() + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_query_request_cache_false_in_serverless(self, es): + es.options.return_value = es + search_response = { + "timed_out": False, + "took": 5, + "_shards": {"total": 808, "successful": 808, "skipped": 0, "failed": 0}, + "hits": { + "total": { + "value": 0, + "relation": "eq", + }, + "hits": [], + }, + } + es.perform_request = mock.AsyncMock(return_value=io.BytesIO(json.dumps(search_response).encode())) + + cfg = config.Config() + cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True) + cfg.add(config.Scope.benchmark, "driver", "serverless.operator", False) + + query_runner = runner.Query(config=cfg) + + params = { + "operation-type": "search", + "index": "_all", + "detailed-results": True, + "body": { + "query": { + "match_all": {}, + }, + }, + } + + async with query_runner: + result = await query_runner(es, params) + + assert result == { + "weight": 1, + "unit": "ops", + "success": True, + "hits": 0, + "hits_relation": "eq", + "timed_out": False, + "took": 5, + "shards": {"total": 808, "successful": 808, "skipped": 0, "failed": 0}, + } + + es.perform_request.assert_awaited_once_with( + method="GET", path="/_all/_search", params={"request_cache": "false"}, body=params["body"], headers=None + ) + es.clear_scroll.assert_not_called() + class TestPutPipelineRunner: @mock.patch("elasticsearch.Elasticsearch") @@ -2884,7 +3038,67 @@ async def test_deletes_existing_indices(self, es): es.indices.delete = mock.AsyncMock() es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}}) es.cluster.put_settings = mock.AsyncMock() - r = runner.DeleteIndex() + + cfg = config.Config() + r = runner.DeleteIndex(config=cfg) + + params = {"indices": ["indexA", "indexB"], "only-if-exists": True} + + result = await r(es, params) + + assert result == { + "weight": 1, + "unit": "ops", + "success": True, + } + + es.cluster.put_settings.assert_has_awaits( + [ + mock.call(body={"transient": {"action.destructive_requires_name": False}}), + mock.call(body={"transient": {"action.destructive_requires_name": True}}), + ] + ) + es.indices.delete.assert_awaited_once_with(index="indexB", params={}) + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_deletes_existing_indices_in_serverless(self, es): + es.indices.exists = mock.AsyncMock(side_effect=[False, True]) + es.indices.delete = mock.AsyncMock() + es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}}) + es.cluster.put_settings = mock.AsyncMock() + + cfg = config.Config() + cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True) + cfg.add(config.Scope.benchmark, "driver", "serverless.operator", False) + r = runner.DeleteIndex(config=cfg) + + params = {"indices": ["indexA", "indexB"], "only-if-exists": True} + + result = await r(es, params) + + assert result == { + "weight": 1, + "unit": "ops", + "success": True, + } + + es.cluster.get_settings.assert_not_awaited() + es.cluster.put_settings.assert_not_awaited() + es.indices.delete.assert_awaited_once_with(index="indexB", params={}) + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_deletes_existing_indices_in_serverless_with_operator(self, es): + es.indices.exists = mock.AsyncMock(side_effect=[False, True]) + es.indices.delete = mock.AsyncMock() + es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}}) + es.cluster.put_settings = mock.AsyncMock() + + cfg = config.Config() + cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True) + cfg.add(config.Scope.benchmark, "driver", "serverless.operator", True) + r = runner.DeleteIndex(config=cfg) params = {"indices": ["indexA", "indexB"], "only-if-exists": True} @@ -2910,7 +3124,9 @@ async def test_deletes_all_indices(self, es): es.indices.delete = mock.AsyncMock() es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {}}) es.cluster.put_settings = mock.AsyncMock() - r = runner.DeleteIndex() + + cfg = config.Config() + r = runner.DeleteIndex(config=cfg) params = { "indices": ["indexA", "indexB"], @@ -3356,7 +3572,100 @@ async def test_deletes_all_index_templates(self, es): es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}}) es.cluster.put_settings = mock.AsyncMock() - r = runner.DeleteComposableTemplate() + cfg = config.Config() + r = runner.DeleteComposableTemplate(config=cfg) + + params = { + "templates": [ + ("templateA", False, None), + ("templateB", True, "logs-*"), + ("templateC", True, "metrics-*"), + ], + "request-params": {"timeout": 60}, + "only-if-exists": False, + } + result = await r(es, params) + + # 3 times delete index template, 2 times to set/reset transient cluster settings, 2 times delete matching indices + assert result == { + "weight": 7, + "unit": "ops", + "success": True, + } + + es.indices.delete_index_template.assert_has_awaits( + [ + mock.call(name="templateA", params=params["request-params"], ignore=[404]), + mock.call(name="templateB", params=params["request-params"], ignore=[404]), + mock.call(name="templateC", params=params["request-params"], ignore=[404]), + ] + ) + es.cluster.put_settings.assert_has_awaits( + [ + mock.call(body={"transient": {"action.destructive_requires_name": False}}), + mock.call(body={"transient": {"action.destructive_requires_name": True}}), + ] + ) + es.indices.delete.assert_has_awaits( + [ + mock.call(index="logs-*"), + mock.call(index="metrics-*"), + ] + ) + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_deletes_all_index_templates_in_serverless(self, es): + es.indices.delete_index_template = mock.AsyncMock() + es.indices.delete = mock.AsyncMock() + es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}}) + es.cluster.put_settings = mock.AsyncMock() + + cfg = config.Config() + cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True) + cfg.add(config.Scope.benchmark, "driver", "serverless.operator", False) + r = runner.DeleteComposableTemplate(config=cfg) + + params = { + "templates": [ + ("templateA", False, None), + ("templateB", True, "logs-*"), + ("templateC", True, "metrics-*"), + ], + "request-params": {"timeout": 60}, + "only-if-exists": False, + } + result = await r(es, params) + + # 3 times delete index template + assert result == { + "weight": 3, + "unit": "ops", + "success": True, + } + + es.indices.delete_index_template.assert_has_awaits( + [ + mock.call(name="templateA", params=params["request-params"], ignore=[404]), + mock.call(name="templateB", params=params["request-params"], ignore=[404]), + mock.call(name="templateC", params=params["request-params"], ignore=[404]), + ] + ) + es.cluster.put_settings.assert_not_awaited() + es.indices.delete.assert_not_awaited() + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_deletes_all_index_templates_in_serverless_with_operator(self, es): + es.indices.delete_index_template = mock.AsyncMock() + es.indices.delete = mock.AsyncMock() + es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}}) + es.cluster.put_settings = mock.AsyncMock() + + cfg = config.Config() + cfg.add(config.Scope.benchmark, "driver", "serverless.mode", True) + cfg.add(config.Scope.benchmark, "driver", "serverless.operator", True) + r = runner.DeleteComposableTemplate(config=cfg) params = { "templates": [