QL: retry SQL and EQL requests in a mixed-node (rolling upgrade) cluster#68602
Conversation
Add mixed-node tests to SQL and EQL
|
Pinging @elastic/es-ql (Team:QL) |
costin
left a comment
There was a problem hiding this comment.
Left a comment regarding a base class between the two redirect listeners - looks good otherwise.
Thanks for extensive tests!
| @@ -0,0 +1,66 @@ | |||
| apply plugin: 'elasticsearch.testclusters' | |||
| } | ||
| } | ||
|
|
||
| private List<String> getSequencesBulkEntries() { |
There was a problem hiding this comment.
How about externalizing this and reading each entry line by line.
| * the resulting ES document as a field. | ||
| */ | ||
| public class QlSourceBuilder { | ||
| public static final Version FIELDS_API_INTRODUCTION_VERSION = Version.V_7_10_0; |
There was a problem hiding this comment.
The name is not very clear - how about USE_FIELD_API_VERSION or FIELD_API_USAGE_VERSION
There was a problem hiding this comment.
The name is not very clear
It'd be great if we could "standardise" on a format, given that these introducing version constants will only get more (nanos, fields api, unsigned long, arrays, plus to come).
| wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), e -> { | ||
| // the search request will likely run on nodes with different versions of ES | ||
| // we will retry on a node with an older version that should generate a backwards compatible _search request | ||
| if (e instanceof SearchPhaseExecutionException | ||
| && ((SearchPhaseExecutionException) e).getCause() instanceof VersionMismatchException) { | ||
|
|
||
| SearchPhaseExecutionException spee = (SearchPhaseExecutionException) e; | ||
| if (log.isTraceEnabled()) { | ||
| log.trace("Caught exception type [{}] with cause [{}].", e.getClass().getName(), e.getCause()); | ||
| } | ||
| DiscoveryNode localNode = clusterService.state().nodes().getLocalNode(); | ||
| DiscoveryNode candidateNode = null; | ||
| for (DiscoveryNode node : clusterService.state().nodes()) { | ||
| // find the first node that's older than the current node | ||
| if (node != localNode && node.getVersion().before(localNode.getVersion())) { | ||
| candidateNode = node; | ||
| break; | ||
| } | ||
| } | ||
| if (candidateNode != null) { |
There was a problem hiding this comment.
This class and the one in EQL can be simplified by moving the common code into a base class in QL.
The exception check plus DiscoveryNode selection, plus logging and retry are the same.
The only differences that I can see are calling transportService and the planExecutor call which can be passed as Runnable or Function in case a property needs to passed in.
There was a problem hiding this comment.
I've moved the common code to a separate method in QL.
bpintea
left a comment
There was a problem hiding this comment.
Nice. Only left some small comments and a question on the retrying logic.
| SearchSourceBuilder source, | ||
| boolean includeFrozen, | ||
| String... indices) { | ||
| return client.prepareSearch(indices) |
There was a problem hiding this comment.
client function argument can now be removed.
There was a problem hiding this comment.
Nicely spot. Removed.
| listener::onFailure)); | ||
| Holder<Boolean> retrySecondTime = new Holder<Boolean>(false); | ||
| planExecutor.eql(cfg, request.query(), params, wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())), e -> { | ||
| // the search request will likely run on nodes with different versions of ES |
There was a problem hiding this comment.
is this actually true? it is possible, but "likely"?
There was a problem hiding this comment.
I think I put "likely" there because the actual search might run on a sub-set of shards than the one at the start of the search process. There is an initial phase in the search process that's called canMatch that is really quick and can evaluate if a request on a shard should actually be executed on the shard or not, based on a timestamp range query. If I am not mistaken, this canMatch phase is also part of this different versions mismatch.
The canMatch can quickly "discard" some shards and the actual request truly runs on a sub-set of them. If you want to read more about this, I think this is the initial PR adding this - #25658. And some other good resources here: https://stackoverflow.com/a/64693782/3498062
There was a problem hiding this comment.
Thanks. I think what threw me off is the future tense, in a failure handler :-).
I guess "the search request likely ran on nodes with different versions of ES" might be clearer. Anyways, not super relevant.
| // find the first node that's older than the current node | ||
| if (Objects.equals(node, localNode) == false && node.getVersion().before(localNode.getVersion())) { | ||
| candidateNode = node; | ||
| break; | ||
| } |
There was a problem hiding this comment.
I guess normally the difference, should be the same for all nodes (if not null), but would it not make sense to get the oldest, to potentially prevent another redirection?
There was a problem hiding this comment.
There's a longer discussion about the improvements here... I would keep the algorithm as is for the moment and, maybe, improve in the next versions if we hit bumps along the way. Not sure we support a multiple versions (more than two) rolling upgrades. Our documentation say upgrade one by one until all are upgraded, but this assumes upgrading from version X to Y, there is no Z in the story.
| })); | ||
| if (retrySecondTime.get()) { | ||
| if (log.isTraceEnabled()) { | ||
| log.trace("No candidate node found, likely all were upgraded in the meantime. Re-trying the original request."); |
There was a problem hiding this comment.
Was wondering about the trace logging choice here (and above) vs. for instance debug.
( Pico-nit: the repetitive part of the message could maybe simply be extracted as a comment, as a stand-alone message - i.e. not knowing what's going on; and supposedly this being the reason for adding the explanation - it is not quite clear what the "candidate" is. )
There was a problem hiding this comment.
I based my decision for trace on the likeliness of this logging actually being used - hopefully very rarely to none. Willing to change it for a good reason.
Regarding the tracing messages I think the first one (where retrySecondTime is not necessary.
| listener.onFailure(e); | ||
| } | ||
| })); | ||
| if (retrySecondTime.get()) { |
There was a problem hiding this comment.
If there are a higher number of nodes in the cluster, the "old" ones are the most likely to be take offline. So I guess this means there's a higher chance the request will fail when sent to one "old" node (if there's a race between querying for the versions and sending the request).
Would it make sense to reattempt transport-redirecting, for as long as there are old nodes in the cluster?
There was a problem hiding this comment.
For the moment, I'd like to keep it as is and try to improve the algorithm in a future PR. There are, for sure, things that can be improved.
|
|
||
| final List<String> bulkEntries = getSequencesBulkEntries(); | ||
| StringBuilder builder = new StringBuilder(); | ||
| for (int i = 1; i < 16; i++) { |
There was a problem hiding this comment.
I've refactored this to read the bulk entries from a file as per @costin's suggestion.
| * the resulting ES document as a field. | ||
| */ | ||
| public class QlSourceBuilder { | ||
| public static final Version FIELDS_API_INTRODUCTION_VERSION = Version.V_7_10_0; |
There was a problem hiding this comment.
The name is not very clear
It'd be great if we could "standardise" on a format, given that these introducing version constants will only get more (nanos, fields api, unsigned long, arrays, plus to come).
|
@elasticmachine run elasticsearch-ci/default-distro |
matriv
left a comment
There was a problem hiding this comment.
LGTM, Nice stuff!
Left some very minor comments.
| TransportEqlSearchAction.operation(planExecutor, task, new EqlSearchRequest().query("foo where blah"), "", "", "node_id", | ||
| new ActionListener<>() { | ||
| TransportEqlSearchAction.operation(planExecutor, task, new EqlSearchRequest().query("foo where blah"), "", | ||
| mock(TransportService.class), mockClusterService, new ActionListener<>() { |
There was a problem hiding this comment.
minor: since mock(TransportService.class) is reused, you could also assign it to a var.
| // we will retry on a node with an older version that should generate a backwards compatible _search request | ||
| if (e instanceof SearchPhaseExecutionException | ||
| && ((SearchPhaseExecutionException) e).getCause() instanceof VersionMismatchException) { | ||
| if (log.isTraceEnabled()) { |
There was a problem hiding this comment.
Not familiar with the strategy here, just double checking if it maybe should be debug instead?
| @@ -0,0 +1,59 @@ | |||
| "properties": { | |||
There was a problem hiding this comment.
minor: why are these entries indented like this? (many whitespaces from the beginning of the line)
| @@ -0,0 +1,35 @@ | |||
| "properties": { | |||
There was a problem hiding this comment.
Same here, why the leading whitespaces?
…stic/elasticsearch into eql_sql_request_retry
| * the resulting ES document as a field. | ||
| */ | ||
| public class QlSourceBuilder { | ||
| public static final Version FIELDS_API_USAGE_VERSION = Version.V_7_10_0; |
There was a problem hiding this comment.
I personally find this naming less evocative ("is the fields api only used in that version?"), but not a biggie.
There was a problem hiding this comment.
The upside is that these are internal constants that we can rename in the future if we find better names.
Any suggestions for better naming?
FIELDS_API_MIGRATION_VERSION, MIGRATE_TO_FIELD_API_VERSION, SWITCH_OVER/TO_FIELDS_API_VERSION ?
| listener::onFailure)); | ||
| Holder<Boolean> retrySecondTime = new Holder<Boolean>(false); | ||
| planExecutor.eql(cfg, request.query(), params, wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())), e -> { | ||
| // the search request will likely run on nodes with different versions of ES |
There was a problem hiding this comment.
Thanks. I think what threw me off is the future tense, in a failure handler :-).
I guess "the search request likely ran on nodes with different versions of ES" might be clearer. Anyways, not super relevant.
| @@ -0,0 +1,30 @@ | |||
| {"index":{"_id":1}} | |||
| } | ||
| executeRequestWithRetryAttempt(clusterService, listener::onFailure, | ||
| onFailure -> planExecutor.eql(cfg, request.query(), params, | ||
| wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())), onFailure)), |
| * the resulting ES document as a field. | ||
| */ | ||
| public class QlSourceBuilder { | ||
| public static final Version FIELDS_API_USAGE_VERSION = Version.V_7_10_0; |
There was a problem hiding this comment.
The upside is that these are internal constants that we can rename in the future if we find better names.
Any suggestions for better naming?
FIELDS_API_MIGRATION_VERSION, MIGRATE_TO_FIELD_API_VERSION, SWITCH_OVER/TO_FIELDS_API_VERSION ?
true.
I like this one myself. But yes, can be done later, it's hair splitting. |
* Integrate "fields" API into QL (elastic#68467) * QL: retry SQL and EQL requests in a mixed-node (rolling upgrade) cluster (elastic#68602) * Adapt nested fields extraction from "fields" API output to the new un-flattened structure (elastic#68745) (cherry picked from commit ee5cc54)
These changes make use of previous work added with PR #65896 (adds minimum compatibility version to search requests in ES) by using a minimum compatibility version when creating a search request against ES and re-trying the request if the search proves to be executed on at least one incompatible shard.
The retrial happens on a node that has an older version, the original request (SQL/EQL request) being sent through transport layer.
The node receiving the retried request will re-parse it and create another query DSL to be sent to ES.
As it is at the moment when this PR was created, the introduction of "fields" API in QL is such a change that needs this feature.
Testing happens in two new qa similar projects, one for SQL and one for EQL.