Skip to content

Commit

Permalink
Add serverless-aware runners (#1789)
Browse files Browse the repository at this point in the history
This change passes configuration context to `Query`, `DeleteIndex` and
`DeleteComposableTemplate` runners to make them serverless-aware and
change their behaviour if Rally targets serverless cluster.

Details:
* `Query`, `DeleteIndex` and `DeleteComposableTemplate` stop getting and
  setting cluster settings through `set_destructive_requires_name()` as
  cluster settings are not available to serverless users.
* The `DeleteComposableTemplate` runner starts to ignore
  `delete_matching_indices` template option. Wildcard deletes are not
  available to serverless users.
* Request cache becomes turned on by default for serverless non-operator
  use in the `Query` runner (`search`, `paginated-search`,
  `composite-agg` and `scroll-search` operations). In non-serverless use
  we typically rely on `"index.requests.cache.enable": false` index
  setting for this purpose but it is not available to serverless users.
  • Loading branch information
gbanasiak authored Oct 16, 2023
1 parent bdd4640 commit aba239a
Show file tree
Hide file tree
Showing 4 changed files with 352 additions and 39 deletions.
10 changes: 5 additions & 5 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ Each composable template in this list consists of the following properties:

* ``name`` (mandatory): Composable template name.
* ``index-pattern`` (mandatory): Index pattern that matches the composable template. This must match the definition in the template file.
* ``delete-matching-indices`` (optional, defaults to ``true``): Delete all indices that match the provided index pattern if the template is deleted.
* ``delete-matching-indices`` (optional, defaults to ``true``): Delete all indices that match the provided index pattern if the template is deleted. This setting is ignored in `Stateless Elasticsearch <https://www.elastic.co/blog/stateless-your-new-state-of-find-with-elasticsearch>`_ - please use data streams and ``delete-data-stream`` operation instead.
* ``template`` (mandatory): Composable template file name.
* ``template-path`` (optional): JSON field inside the file content that contains the template.

Expand Down Expand Up @@ -985,7 +985,7 @@ Properties

* ``index`` (optional): An `index pattern <https://www.elastic.co/guide/en/elasticsearch/reference/current/multi-index.html>`_ that defines which indices or data streams should be targeted by this query. Only needed if the ``indices`` or ``data-streams`` section contains more than one index or data stream respectively. Otherwise, Rally will automatically derive the index or data stream to use. If you have defined multiple indices or data streams and want to query all of them, just specify ``"index": "_all"``.
* ``type`` (optional): Defines the type within the specified index for this query. By default, no ``type`` will be used and the query will be performed across all types in the provided index. Also, types have been removed in Elasticsearch 7.0.0 so you must not specify this property if you want to benchmark Elasticsearch 7.0.0 or later.
* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version.
* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. When Rally is used against `Stateless Elasticsearch <https://www.elastic.co/blog/stateless-your-new-state-of-find-with-elasticsearch>`_ the default is ``false``.
* ``request-params`` (optional): A structure containing arbitrary request parameters. The supported parameters names are documented in the `Search URI Request docs <https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html#_parameters_3>`_.

.. note::
Expand Down Expand Up @@ -1045,7 +1045,7 @@ Properties
""""""""""

* ``index`` (optional): An `index pattern <https://www.elastic.co/guide/en/elasticsearch/reference/current/multi-index.html>`_ that defines which indices or data streams should be targeted by this query. Only needed if the ``indices`` or ``data-streams`` section contains more than one index or data stream respectively. Otherwise, Rally will automatically derive the index or data stream to use. If you have defined multiple indices or data streams and want to query all of them, just specify ``"index": "_all"``.
* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version.
* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. When Rally is used against `Stateless Elasticsearch <https://www.elastic.co/blog/stateless-your-new-state-of-find-with-elasticsearch>`_ the default is ``false``.
* ``request-params`` (optional): A structure containing arbitrary request parameters. The supported parameters names are documented in the `Search URI Request docs <https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html#_parameters_3>`_.

.. note::
Expand Down Expand Up @@ -1105,7 +1105,7 @@ Properties

* ``index`` (optional): An `index pattern <https://www.elastic.co/guide/en/elasticsearch/reference/current/multi-index.html>`_ that defines which indices or data streams should be targeted by this query. Only needed if the ``indices`` or ``data-streams`` section contains more than one index or data stream respectively. Otherwise, Rally will automatically derive the index or data stream to use. If you have defined multiple indices or data streams and want to query all of them, just specify ``"index": "_all"``.
* ``type`` (optional): Defines the type within the specified index for this query. By default, no ``type`` will be used and the query will be performed across all types in the provided index. Also, types have been removed in Elasticsearch 7.0.0 so you must not specify this property if you want to benchmark Elasticsearch 7.0.0 or later.
* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version.
* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. When Rally is used against `Stateless Elasticsearch <https://www.elastic.co/blog/stateless-your-new-state-of-find-with-elasticsearch>`_ the default is ``false``.
* ``request-params`` (optional): A structure containing arbitrary request parameters. The supported parameters names are documented in the `Search URI Request docs <https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html#_parameters_3>`_.

.. note::
Expand Down Expand Up @@ -1156,7 +1156,7 @@ Properties
""""""""""

* ``index`` (optional): An `index pattern <https://www.elastic.co/guide/en/elasticsearch/reference/current/multi-index.html>`_ that defines which indices or data streams should be targeted by this query. Only needed if the ``indices`` or ``data-streams`` section contains more than one index or data stream respectively. Otherwise, Rally will automatically derive the index or data stream to use. If you have defined multiple indices or data streams and want to query all of them, just specify ``"index": "_all"``.
* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version.
* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version. When Rally is used against `Stateless Elasticsearch <https://www.elastic.co/blog/stateless-your-new-state-of-find-with-elasticsearch>`_ the default is ``false``.
* ``request-params`` (optional): A structure containing arbitrary request parameters. The supported parameters names are documented in the `Search URI Request docs <https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html#_parameters_3>`_.

.. note::
Expand Down
2 changes: 1 addition & 1 deletion esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ def receiveMsg_StartWorker(self, msg, sender):
# we need to wake up more often in test mode
if self.config.opts("track", "test.mode.enabled"):
self.wakeup_interval = 0.5
runner.register_default_runners()
runner.register_default_runners(self.config)
if self.track.has_plugins:
track.load_track_plugins(self.config, self.track.name, runner.register_runner, scheduler.register_scheduler)
self.drive()
Expand Down
62 changes: 33 additions & 29 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.

import asyncio
import contextlib
import contextvars
import json
import logging
Expand All @@ -43,15 +42,15 @@
__RUNNERS = {}


def register_default_runners():
def register_default_runners(config=None):
register_runner(track.OperationType.Bulk, BulkIndex(), async_runner=True)
register_runner(track.OperationType.ForceMerge, ForceMerge(), async_runner=True)
register_runner(track.OperationType.IndexStats, Retry(IndicesStats()), async_runner=True)
register_runner(track.OperationType.NodeStats, NodeStats(), async_runner=True)
register_runner(track.OperationType.Search, Query(), async_runner=True)
register_runner(track.OperationType.PaginatedSearch, Query(), async_runner=True)
register_runner(track.OperationType.CompositeAgg, Query(), async_runner=True)
register_runner(track.OperationType.ScrollSearch, Query(), async_runner=True)
register_runner(track.OperationType.Search, Query(config=config), async_runner=True)
register_runner(track.OperationType.PaginatedSearch, Query(config=config), async_runner=True)
register_runner(track.OperationType.CompositeAgg, Query(config=config), async_runner=True)
register_runner(track.OperationType.ScrollSearch, Query(config=config), async_runner=True)
register_runner(track.OperationType.RawRequest, RawRequest(), async_runner=True)
register_runner(track.OperationType.Composite, Composite(), async_runner=True)
register_runner(track.OperationType.SubmitAsyncSearch, SubmitAsyncSearch(), async_runner=True)
Expand All @@ -73,11 +72,11 @@ def register_default_runners():
register_runner(track.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True)
register_runner(track.OperationType.Refresh, Retry(Refresh()), async_runner=True)
register_runner(track.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True)
register_runner(track.OperationType.DeleteIndex, Retry(DeleteIndex()), async_runner=True)
register_runner(track.OperationType.DeleteIndex, Retry(DeleteIndex(config=config)), async_runner=True)
register_runner(track.OperationType.CreateComponentTemplate, Retry(CreateComponentTemplate()), async_runner=True)
register_runner(track.OperationType.DeleteComponentTemplate, Retry(DeleteComponentTemplate()), async_runner=True)
register_runner(track.OperationType.CreateComposableTemplate, Retry(CreateComposableTemplate()), async_runner=True)
register_runner(track.OperationType.DeleteComposableTemplate, Retry(DeleteComposableTemplate()), async_runner=True)
register_runner(track.OperationType.DeleteComposableTemplate, Retry(DeleteComposableTemplate(config=config)), async_runner=True)
register_runner(track.OperationType.CreateDataStream, Retry(CreateDataStream()), async_runner=True)
register_runner(track.OperationType.DeleteDataStream, Retry(DeleteDataStream()), async_runner=True)
register_runner(track.OperationType.CreateIndexTemplate, Retry(CreateIndexTemplate()), async_runner=True)
Expand Down Expand Up @@ -169,9 +168,14 @@ class Runner:
Base class for all operations against Elasticsearch.
"""

def __init__(self, *args, **kwargs):
def __init__(self, *args, config=None, **kwargs):
super().__init__(*args, **kwargs)
self.logger = logging.getLogger(__name__)
self.serverless_mode = False
self.serverless_operator = False
if config:
self.serverless_mode = config.opts("driver", "serverless.mode", mandatory=False, default_value=False)
self.serverless_operator = config.opts("driver", "serverless.operator", mandatory=False, default_value=False)

async def __aenter__(self):
return self
Expand Down Expand Up @@ -865,8 +869,8 @@ class Query(Runner):
* ``pages``: Total number of pages that have been retrieved.
"""

def __init__(self):
super().__init__()
def __init__(self, config=None):
super().__init__(config=config)
self._search_after_extractor = SearchAfterExtractor()
self._composite_agg_extractor = CompositeAggExtractor()

Expand All @@ -890,6 +894,8 @@ async def __call__(self, es, params):
cache = params.get("cache")
if cache is not None:
request_params["request_cache"] = str(cache).lower()
elif self.serverless_mode and not self.serverless_operator:
request_params["request_cache"] = "false"
if not bool(headers):
# counter-intuitive but preserves prior behavior
headers = None
Expand Down Expand Up @@ -1118,11 +1124,12 @@ async def _scroll_query(es, params):
took = props.get("took", 0)
all_results_collected = (size is not None and hits < size) or hits == 0
else:
# /_search/scroll does not accept request_cache so not providing params
r = await es.perform_request(
method="GET",
path="/_search/scroll",
body={"scroll_id": scroll_id, "scroll": "10s"},
params=request_params,
params=None,
headers=headers,
)
props = parse(r, ["timed_out", "took"], ["hits.hits"])
Expand Down Expand Up @@ -1410,21 +1417,16 @@ class DeleteIndex(Runner):
"""

async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch

ops = 0

indices = mandatory(params, "indices", self)
only_if_exists = params.get("only-if-exists", False)
request_params = params.get("request-params", {})

# bypassing action.destructive_requires_name cluster setting mangling for serverless clusters
# bypass cluster settings access for serverless
prior_destructive_setting = None
cluster_settings_available = False
with contextlib.suppress(elasticsearch.exceptions.NotFoundError):
if not self.serverless_mode or self.serverless_operator:
prior_destructive_setting = await set_destructive_requires_name(es, False)
cluster_settings_available = True

try:
for index_name in indices:
Expand All @@ -1436,7 +1438,7 @@ async def __call__(self, es, params):
await es.indices.delete(index=index_name, params=request_params)
ops += 1
finally:
if cluster_settings_available:
if not self.serverless_mode or self.serverless_operator:
await set_destructive_requires_name(es, prior_destructive_setting)
return {
"weight": ops,
Expand Down Expand Up @@ -1573,16 +1575,18 @@ async def __call__(self, es, params):
self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name)
await es.indices.delete_index_template(name=template_name, params=request_params)
ops_count += 1
# ensure that we do not provide an empty index pattern by accident
if delete_matching_indices and index_pattern:
# only set if really required
if current_destructive_setting is None:
current_destructive_setting = False
prior_destructive_setting = await set_destructive_requires_name(es, current_destructive_setting)
# 1. Ignore delete matching indices in serverless as wildcard deletes are not supported
# 2. Ensure that we do not provide an empty index pattern by accident
if not self.serverless_mode or self.serverless_operator:
if delete_matching_indices and index_pattern:
# only set if really required
if current_destructive_setting is None:
current_destructive_setting = False
prior_destructive_setting = await set_destructive_requires_name(es, current_destructive_setting)
ops_count += 1

await es.indices.delete(index=index_pattern)
ops_count += 1

await es.indices.delete(index=index_pattern)
ops_count += 1
finally:
if current_destructive_setting is not None:
await set_destructive_requires_name(es, prior_destructive_setting)
Expand Down
Loading

0 comments on commit aba239a

Please sign in to comment.