Determine remote cluster version#142494
Conversation
|
This is step 4.1 of https://github.com/elastic/elasticsearch-team/issues/2088. The current reindexing flow is:
To support PIT we need to:
Since one PIT is shared between all slices, the opening of the PIT must be done before we slice (ie a step 1.5 in the above flow). Before we open a PIT we need to decide whether we should. To decide, we need to check whether we're reindexing from a remote cluster. If we are, the remote version must be high enough to support PIT, or else we'll default to scroll. This change adds the logic to determine the remote cluster version. It does not USE the remote version. A couple of things to note:
Next Steps:
|
| * while it is under development. | ||
| */ | ||
| // TODO - DELETE. Only needed for local development | ||
| static boolean REINDEX_PIT_SEARCH_ENABLED = new FeatureFlag("reindex_pit_search_enabled").isEnabled(); |
There was a problem hiding this comment.
Added this as a placeholder until #142035 is merged
| * Verifies that lookupRemoteVersion correctly parses historical and | ||
| * forward-compatible main action responses. | ||
| */ | ||
| public void testLookupRemoteVersion() throws Exception { |
There was a problem hiding this comment.
This, testLookupRemoteVersionFailsWithoutContentType and testWrapExceptionToPreserveStatus were modified from RemoteScrollableHitSourceTests. The rest I added to plug testing gaps
PeteGillinElastic
left a comment
There was a problem hiding this comment.
Thanks, this looks like a good start. I have one comment about a potential change of approach. It may not work, but it might be cleaner if it does. (I have a few tiny nits as well, sorry...)
I haven't looked at the tests, I figured it would be worth converging on the overall approach first.
| public class RemoteReindexingUtils { | ||
|
|
||
| public static void lookupRemoteVersion(RejectAwareActionListener<Version> listener, ThreadPool threadPool, RestClient client) { | ||
| execute(new Request("GET", ""), MAIN_ACTION_PARSER, listener, threadPool, client); |
There was a problem hiding this comment.
Micro-nit: I would consider "/" marginally more readable than "". Obviously they're equivalent, but (a) I think that / is the canonical form which the empty path would be normalized into, and (b) I think it's more obvious to the reader what the parameter is if we use "/". (I realize you're just moving this code, but no harm in improving it!)
| RestClient client | ||
| ) { | ||
| // Preserve the thread context so headers survive after the call | ||
| java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true); |
There was a problem hiding this comment.
Nit: We can import j.u.f.Supplier here. (We couldn't in the place you're moving it from because that's importing a different Supplier.)
| private final ScriptService scriptService; | ||
| private final ReindexSslConfig reindexSslConfig; | ||
| private final ReindexMetrics reindexMetrics; | ||
| Version remoteVersion; |
There was a problem hiding this comment.
I would prefer not to add this mutable state to Reindexer, if we can avoid it. It makes it a bit harder to reason about, because whenever we're looking at any code we'll have to think carefully to figure out whether this thing will have been initialized yet or not.
Can we instead make lookupRemoteVersion take an ActionListener<Version> instead of ActionListener<Void>, and plumb the thing version through as an additional method parameter to initTask and then BulkByScrollParallelizationHelper.initTaskState, and then through to the LeaderBulkByScrollTaskState or WorkerBulkByScrollTaskState?
I haven't tried that, so I could be wrong, but it feels like it should be possible.
If you end up having to have this mutable state, please can we make it private, and add a getter if needed, so that at least it can only be set from within this class.
There was a problem hiding this comment.
Actually, come to think of it: Is there a reason why you trigger the lookupRemoteVersion call in the transport action, as a wrapper around initTask, rather than triggering it inside the implementation of initTask? Again, I haven't tried it — but, if you can do the latter, you're reducing the amount of the internals of Reindexer that you're exposing to the transport action, right?
…hua-adams-1/elasticsearch into reindexing-plumb-pittable-hit-source
…hua-adams-1/elasticsearch into reindexing-plumb-pittable-hit-source
| protected void doStart(RejectAwareActionListener<Response> searchListener) { | ||
| lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> { | ||
| remoteVersion = version; | ||
| if (remoteVersion != null) { |
There was a problem hiding this comment.
When the feature flag is turned on, the remote version will be non null, and we'll save the cost of the second lookupRemoteVersion call
| client, | ||
| listener.delegateFailure((l, v) -> executeSlicedAction(task, request, action, l, client, node, workerAction)) | ||
| listener.delegateFailure( | ||
| (l, v) -> executeSlicedAction(task, request, action, l, client, node, null, version -> workerAction.run()) |
There was a problem hiding this comment.
This is called exclusively by u-b-q and d-b-q (which has no concept of remote versions since they're executed on the local node). Therefore, setting the remote version parameter to null has no effect. Subsequent changes to introduce PIT will be behind the REINDEX_PIT_SEARCH_ENABLED feature flag anyways, but in the future, once that flag is lifted, this will fail the 'use pit' check, and default to scroll
| reindexSslConfig, | ||
| request, | ||
| wrapWithMetrics(listener, reindexMetrics, startTime, request.getRemoteInfo() != null), | ||
| remoteVersion |
There was a problem hiding this comment.
This is unchanged, except I pass the remoteVersion downstream
| sslConfig | ||
| ); | ||
| this.destinationIndexIdMapper = destinationIndexMode(state).idFieldMapperWithoutFieldData(); | ||
| this.remoteVersion = remoteVersion; |
There was a problem hiding this comment.
I store it as a state variable so it can be referenced by buildScrollableResultSource below.
| remoteInfo, | ||
| searchRequest | ||
| searchRequest, | ||
| remoteVersion |
There was a problem hiding this comment.
In the case where we've enabled the PIT feature flag, but we're trying to reindex from remote from a remote node with a version < PIT was introduced, then we need to fallback to using scroll. However, since we've already done the remote version lookup, we may as well pass this value in.
If this is a scrollable workflow, then remoteVersion is null.
| * Creates a RemoteScrollablePaginatedHitSource with a pre-resolved initial remote version so that doStart skips the version lookup. | ||
| * The mock client serves only the given response paths (one request = one path when using initial version). | ||
| */ | ||
| private RemoteScrollablePaginatedHitSource sourceWithInitialRemoteVersion(Version initialRemoteVersion, String... paths) |
There was a problem hiding this comment.
This is only called by testDoStartSkipsVersionLookupWhenInitialRemoteVersionSet
…hua-adams-1/elasticsearch into reindexing-plumb-pittable-hit-source
|
Pinging @elastic/es-distributed (Team:Distributed) |
| RemoteInfo remoteInfo = request.getRemoteInfo(); | ||
| assert reindexSslConfig != null : "Reindex ssl config must be set"; | ||
| RestClient restClient = buildRestClient(remoteInfo, reindexSslConfig, task.getId(), synchronizedList(new ArrayList<>())); | ||
| RejectAwareActionListener<Version> rejectAwareListener = new RejectAwareActionListener<>() { |
There was a problem hiding this comment.
Currently the remote version lookup has retries as part of starting the search. It'd be nice to add retries here too?
There was a problem hiding this comment.
Good catch - will add now
modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteReindexingUtils.java
Show resolved
Hide resolved
…hua-adams-1/elasticsearch into reindexing-plumb-pittable-hit-source
| try { | ||
| client.performRequestAsync(request, new ResponseListener() { | ||
| @Override | ||
| public void onSuccess(org.elasticsearch.client.Response response) { |
There was a problem hiding this comment.
Fully qualified name seems unnecessary, just import the class? It's easier to read. Same for other places.
| exponentialBackoff(request.getRetryBackoffInitialTime(), request.getMaxRetries()), | ||
| threadPool, | ||
| restClient, | ||
| task.getWorkerState()::countSearchRetry, |
There was a problem hiding this comment.
I think task.getWorkerState() would throw if the task is not a worker. Leader task would also call lookupRemoteVersionAndExecute, right?
There was a problem hiding this comment.
I guess it's fine as long as slicing is not enabled for remote source, but it does seem a bit trappy.
There was a problem hiding this comment.
Actually I think we could just remove this runnable, countSearchRetry increments the searchRetries counter which is eventually reported in task response. It seems inaccurate to count the retries for fetching remote version towards search retries? Even though the old behaviour does that, it feels more like unintentional.
samxbr
left a comment
There was a problem hiding this comment.
LGTM, just have some non-blocking minor comments.
Extends the reindexing flow to determine the remote cluster version (if applicable), before slicing. Relates: elastic/elasticsearch-team#2088
…cations * upstream/main: (35 commits) Create ARM bulk sqrI8 implementation (elastic#142461) Rework get-snapshots predicates (elastic#143161) Refactor downsampling fetchers and producers (elastic#140357) ESQL: Unmute test and add extra logging to generative test validation (elastic#143168) Fix metadata fields being nullified/loaded by unmapped_fields setting (elastic#143155) Determine remote cluster version (elastic#142494) Populate failure message for aborted clones (elastic#143206) Allow kibana_system role to read and manage logs streams (elastic#143053) Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:eval.DocsLength} elastic#143224 Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:eval.DocsByteLength} elastic#143223 Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:docs.DocsBitLength} elastic#143222 Fix FloatVectorScorerSupplier bulkScore bug (elastic#143211) ESQL: Add data node execution for external sources (elastic#143209) [ESQL] Cleanup commands docs (elastic#143058) [ML]Fix latest transforms disregarding updates when sort and sync fields are non-monotonic (elastic#142856) Mute org.elasticsearch.index.mapper.IpFieldMapperTests testSyntheticSourceInObject elastic#143212 Tests: Fix StoreDirectoryMetricsIT (elastic#143084) ESQL: Add distribution strategy for external sources (elastic#143194) CSV IT spec (elastic#142585) Fix VectorScorerOSQBenchmark.score to read corrections properly (elastic#143137) ...
Extends the reindexing flow to determine the remote cluster version (if applicable), before slicing. Relates: elastic/elasticsearch-team#2088
Extends the reindexing flow to determine the remote cluster version (if applicable), before slicing.
Relates: https://github.com/elastic/elasticsearch-team/issues/2088