Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement adaptive replica selection for coordinating nodes performing queries #24915

Closed
5 tasks done
dakrone opened this issue May 26, 2017 · 1 comment
Closed
5 tasks done
Assignees
Labels
>enhancement :Search/Search Search-related issues that do not fall into other categories v6.1.0

Comments

@dakrone
Copy link
Member

dakrone commented May 26, 2017

In #23884 (and #3890) we added the fixed_auto_queue_size threadpool which could automatically raise or lower the queue size of the search threadpool depending on the arrival rate of operations and target response rate.

We'd like to take the next step for this and implement adaptive replica selection. This is a partial application of the C3 algorithm used on the coordinating node to select the appropriate replica instead of our current round robin behavior. Note that we cannot currently implement the rate control and backpressure from the paper since we cannot treat each request as having identical cost, though with the automatic queue-sizing already implemented we do have a good way to provide backpressure on the execution nodes themselves already.

The formula for replica ranking (Ψ(s)) (see page 6 of the linked paper) (EWMA = Exponentially Weighted Moving Average):

Ψ(s) = R(s) - 1/µ̄(s) + (q̂(s))^b / µ̄(s)

Where q̂(s) is:

q̂(s) = 1 + (os(s) * n) + q(s)

Here (os(s) * n) is the "concurrency compensation", where os(s) is the number of outstanding requests to a node and n is the number of clients in the system. R(s), q(s), and µ̄(s) are EWMAs of the response time (as seen from the coordinating node), queue-size, and service time received from the execution node.

This will require a number of steps in order to be implemented:

There is a little flexibility here, since we could possibly use some of our existing metrics (like "took" time) instead of adding new measurements, this is only a rough overview.

Additionally, we will need to decide on a value for b to correctly penalize long queues (the paper uses 3) as well as a good α value for the EWMA calculations. We could also make these configurable if desired.

@dakrone dakrone self-assigned this May 26, 2017
@clintongormley clintongormley added the :Search/Search Search-related issues that do not fall into other categories label May 29, 2017
dakrone added a commit to dakrone/elasticsearch that referenced this issue Jun 5, 2017
This is the first step towards adaptive replica selection (elastic#24915). This PR
tracks the execution time, also known as the "service time" of a task in the
threadpool. The `QueueResizingEsThreadPoolExecutor` then stores a moving average
of these task times which can be retrieved from the executor.

Currently there is no functionality using the EWMA yet (other than tests), this
is only a bite-sized building block so that it's easier to review.

[1]: EWMA = Exponentially Weighted Moving Average
dakrone added a commit to dakrone/elasticsearch that referenced this issue Jun 27, 2017
This is part of elastic#24915, where we now calculate the EWMA of service time for
tasks in the search threadpool, and send that as well as the current queue size
back to the coordinating node. The coordinating node now tracks this information
for each node in the cluster.

This information will be used in the future the determining the best replica a
search request should be routed to. This change has no user-visible difference.
dakrone added a commit that referenced this issue Jul 17, 2017
…25430)

* Register data node stats from info carried back in search responses

This is part of #24915, where we now calculate the EWMA of service time for
tasks in the search threadpool, and send that as well as the current queue size
back to the coordinating node. The coordinating node now tracks this information
for each node in the cluster.

This information will be used in the future the determining the best replica a
search request should be routed to. This change has no user-visible difference.

* Move response time timing into ResponseListenerWrapper

* Move ResponseListenerWrapper to ActionListener instead of SearchActionListener

Also removes the logger

* Move `requestIndex` back to private

* De-guice-ify ResponseCollectorService \o/

* Undo all changes to SearchQueryThenFetchAsyncAction

* Remove unneeded response collector from TransportSearchAction

* Undo all changes to SearchDfsQueryThenFetchAsyncAction

* Completely rewrite the inside of ResponseCollectorService's record keeping

* Documentation and cleanups for ResponseCollectorService

* Add unit test for collection of queue size and service time

* Fix Guice construction error

* Add basic unit tests for ResponseCollectorService

* Fix version constant for the master merge

* Fix test compilation after master merge

* Add a test for node removal on cluster changed event

* Remove integration test as there are now unit tests

* Rename ResponseListenerWrapper -> SearchExecutionStatsCollector

* Fix line-length

* Make classes private and final where appropriate

* Pass nodeId into SearchExecutionStatsCollector and use only ActionListener

* Get nodeId from connection so searchShardTarget can be private

* Remove threadpool from SearchContext, get it from IndexShard instead

* Add missing import

* Use BiFunction for responseWrapper rather than passing in collector service
dakrone added a commit to dakrone/elasticsearch that referenced this issue Aug 9, 2017
This implements the selection algorithm described in the C3 paper for
determining which copy of the data a query should be routed to.

By using the service time EWMA, response time EWMA, and queue size EWMA we
calculate the score of a node by piggybacking these metrics with each search
request.

Since Elasticsearch lacks the "broadcast to every copy" behavior that Cassandra
has (as mentioned in the C3 paper) to update metrics after a node has been
highly weighted, this implementation adjusts a node's response stats using the
average of the its own and the "best" node's metrics. This is so that a long GC
or other activity that may cause a node's rank to increase dramatically does not
permanently keep a node from having requests routed to it, instead it will
eventually lower its score back to the realm where it is a potential candidate
for new queries.

This feature is off by default and can be turned on with the dynamic setting
`cluster.routing.use_adaptive_replica_selection`.

Relates to elastic#24915, however instead of `b=3` I used `b=4` (after benchmarking)
dakrone added a commit that referenced this issue Aug 31, 2017
* Implement adaptive replica selection

This implements the selection algorithm described in the C3 paper for
determining which copy of the data a query should be routed to.

By using the service time EWMA, response time EWMA, and queue size EWMA we
calculate the score of a node by piggybacking these metrics with each search
request.

Since Elasticsearch lacks the "broadcast to every copy" behavior that Cassandra
has (as mentioned in the C3 paper) to update metrics after a node has been
highly weighted, this implementation adjusts a node's response stats using the
average of the its own and the "best" node's metrics. This is so that a long GC
or other activity that may cause a node's rank to increase dramatically does not
permanently keep a node from having requests routed to it, instead it will
eventually lower its score back to the realm where it is a potential candidate
for new queries.

This feature is off by default and can be turned on with the dynamic setting
`cluster.routing.use_adaptive_replica_selection`.

Relates to #24915, however instead of `b=3` I used `b=4` (after benchmarking)

* Randomly use adaptive replica selection for internal test cluster

* Use an action name *prefix* for retrieving pending requests

* Add unit test for replica selection

* don't use adaptive replica selection in SearchPreferenceIT

* Track client connections in a SearchTransportService instead of TransportService

* Bind `entry` pieces in local variables

* Add javadoc link to C3 paper and javadocs for stat adjustments

* Bind entry's key and value to local variables

* Remove unneeded actionNamePrefix parameter

* Use conns.longValue() instead of cached Long

* Add comments about removing entries from the map

* Pull out bindings for `entry` in IndexShardRoutingTable

* Use .compareTo instead of manually comparing

* add assert for connections not being null and gte to 1

* Copy map for pending search connections instead of "live" map

* Increase the number of pending search requests used for calculating rank when chosen

When a node gets chosen, this increases the number of search counts for the
winning node so that it will not be as likely to be chosen again for
non-concurrent search requests.

* Remove unused HashMap import

* Rename rank -> rankShardsAndUpdateStats

* Rename rankedActiveInitializingShardsIt -> activeInitializingShardsRankedIt

* Instead of precalculating winning node, use "winning" shard from ranked list

* Sort null ranked nodes before nodes that have a rank
@dakrone
Copy link
Member Author

dakrone commented Aug 31, 2017

Closing this as all parts have now been implemented!

@dakrone dakrone added v6.1.0 and removed v6.1.0 labels Aug 31, 2017
dakrone added a commit that referenced this issue Aug 31, 2017
* Implement adaptive replica selection

This implements the selection algorithm described in the C3 paper for
determining which copy of the data a query should be routed to.

By using the service time EWMA, response time EWMA, and queue size EWMA we
calculate the score of a node by piggybacking these metrics with each search
request.

Since Elasticsearch lacks the "broadcast to every copy" behavior that Cassandra
has (as mentioned in the C3 paper) to update metrics after a node has been
highly weighted, this implementation adjusts a node's response stats using the
average of the its own and the "best" node's metrics. This is so that a long GC
or other activity that may cause a node's rank to increase dramatically does not
permanently keep a node from having requests routed to it, instead it will
eventually lower its score back to the realm where it is a potential candidate
for new queries.

This feature is off by default and can be turned on with the dynamic setting
`cluster.routing.use_adaptive_replica_selection`.

Relates to #24915, however instead of `b=3` I used `b=4` (after benchmarking)

* Randomly use adaptive replica selection for internal test cluster

* Use an action name *prefix* for retrieving pending requests

* Add unit test for replica selection

* don't use adaptive replica selection in SearchPreferenceIT

* Track client connections in a SearchTransportService instead of TransportService

* Bind `entry` pieces in local variables

* Add javadoc link to C3 paper and javadocs for stat adjustments

* Bind entry's key and value to local variables

* Remove unneeded actionNamePrefix parameter

* Use conns.longValue() instead of cached Long

* Add comments about removing entries from the map

* Pull out bindings for `entry` in IndexShardRoutingTable

* Use .compareTo instead of manually comparing

* add assert for connections not being null and gte to 1

* Copy map for pending search connections instead of "live" map

* Increase the number of pending search requests used for calculating rank when chosen

When a node gets chosen, this increases the number of search counts for the
winning node so that it will not be as likely to be chosen again for
non-concurrent search requests.

* Remove unused HashMap import

* Rename rank -> rankShardsAndUpdateStats

* Rename rankedActiveInitializingShardsIt -> activeInitializingShardsRankedIt

* Instead of precalculating winning node, use "winning" shard from ranked list

* Sort null ranked nodes before nodes that have a rank
dakrone added a commit to dakrone/elasticsearch that referenced this issue Sep 1, 2017
This adds a blurb for adaptive replica selection since it was previously
undocumented.

Relates to elastic#24915
dakrone added a commit that referenced this issue Sep 1, 2017
This adds a blurb for adaptive replica selection since it was previously
undocumented.

Relates to #24915
dakrone added a commit to dakrone/elasticsearch that referenced this issue Sep 6, 2017
dakrone added a commit that referenced this issue Sep 7, 2017
@dakrone dakrone closed this as completed Sep 7, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>enhancement :Search/Search Search-related issues that do not fall into other categories v6.1.0
Projects
None yet
Development

No branches or pull requests

3 participants