From c0dcd5f5201531de07c90b3340f5a8ae7eda93ac Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Wed, 30 Oct 2024 11:34:23 -0400 Subject: [PATCH 1/5] Initial refactorings. Not the main set yet --- .../xpack/esql/action/EsqlExecutionInfo.java | 12 +++------ .../esql/enrich/EnrichPolicyResolver.java | 27 ++++++++++--------- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index f2ab0355304b3..80bb2afe57122 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -229,7 +229,7 @@ public Iterator toXContentChunked(ToXContent.Params params b.field(SKIPPED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.SKIPPED)); b.field(PARTIAL_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.PARTIAL)); b.field(FAILED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.FAILED)); - // each clusterinfo defines its own field object name + // each Cluster object defines its own field object name b.xContentObject("details", clusterInfo.values().iterator()); }); } @@ -352,11 +352,7 @@ public Cluster( this.successfulShards = successfulShards; this.skippedShards = skippedShards; this.failedShards = failedShards; - if (failures == null) { - this.failures = List.of(); - } else { - this.failures = failures; - } + this.failures = failures == null ? Collections.emptyList() : failures; this.took = took; } @@ -373,7 +369,7 @@ public Cluster(StreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) { this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure)); } else { - this.failures = List.of(); + this.failures = Collections.emptyList(); } } @@ -475,7 +471,7 @@ public Cluster.Builder setTook(TimeValue took) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { String name = clusterAlias; - if (clusterAlias.equals("")) { + if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { name = LOCAL_CLUSTER_NAME_REPRESENTATION; } builder.startObject(name); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 77ef5ef597bb5..c8a7a6bcc4e98 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -289,24 +289,17 @@ public void onResponse(Transport.Connection connection) { RESOLVE_ACTION_NAME, new LookupRequest(cluster, remotePolicies), TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(lookupListener.delegateResponse((l, e) -> { - if (ExceptionsHelper.isRemoteUnavailableException(e) - && remoteClusterService.isSkipUnavailable(cluster)) { - l.onResponse(new LookupResponse(e)); - } else { - l.onFailure(e); - } - }), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH)) + new ActionListenerResponseHandler<>( + lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, l)), + LookupResponse::new, + threadPool.executor(ThreadPool.Names.SEARCH) + ) ); } @Override public void onFailure(Exception e) { - if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) { - lookupListener.onResponse(new LookupResponse(e)); - } else { - lookupListener.onFailure(e); - } + failIfSkipUnavailableFalse(e, cluster, lookupListener); } }); } @@ -331,6 +324,14 @@ public void onFailure(Exception e) { } } + private void failIfSkipUnavailableFalse(Exception e, String cluster, ActionListener lookupListener) { + if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) { + lookupListener.onResponse(new LookupResponse(e)); + } else { + lookupListener.onFailure(e); + } + } + private static class LookupRequest extends TransportRequest { private final String clusterAlias; private final Collection policyNames; From 272eb380ee2d1af3ccb34f237022f008d49f19e5 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Wed, 30 Oct 2024 11:46:42 -0400 Subject: [PATCH 2/5] Moved some logic over to EsqlSessionCCSUtils; not complete --- .../xpack/esql/session/EsqlSession.java | 8 +- .../esql/session/EsqlSessionCCSUtils.java | 170 +++++++ .../session/EsqlSessionCCSUtilsTests.java | 472 ++++++++++++++++++ 3 files changed, 646 insertions(+), 4 deletions(-) create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java create mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 1e78f454b7531..ebac0a808fce8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -261,7 +261,7 @@ public void executeOptimizedPlan( ActionListener listener ) { LogicalPlan firstPhase = Phased.extractFirstPhase(optimizedPlan); - updateExecutionInfoAtEndOfPlanning(executionInfo); + EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); if (firstPhase == null) { runPhase.accept(logicalPlanToPhysicalPlan(optimizedPlan, request), listener); } else { @@ -347,8 +347,8 @@ private void preAnalyze( // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index // resolution to updateExecutionInfo if (indexResolution.isValid()) { - updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters()); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters()); if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel @@ -422,7 +422,7 @@ private void preAnalyzeIndices( } // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search // based only on available clusters (which could now be an empty list) - String indexExpressionToResolve = createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of()))); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java new file mode 100644 index 0000000000000..8c91039979014 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -0,0 +1,170 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.session; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.index.IndexResolution; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class EsqlSessionCCSUtils { + + private EsqlSessionCCSUtils() {} + + static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) { + StringBuilder sb = new StringBuilder(); + for (String clusterAlias : executionInfo.clusterAliases()) { + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); + if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); + } else { + String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression(); + for (String index : indexExpression.split(",")) { + sb.append(clusterAlias).append(':').append(index).append(','); + } + } + } + } + + if (sb.length() > 0) { + return sb.substring(0, sb.length() - 1); + } else { + return ""; + } + } + +// static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) { +// StringBuilder sb = new StringBuilder(); +// for (String clusterAlias : executionInfo.clusterAliases()) { +// EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); +// if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { +// if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { +// sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); +// } else { +// String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression(); +// for (String index : indexExpression.split(",'")) { +// sb.append(clusterAlias).append(':').append(index).append(','); +// } +// } +// } +// } +// +// if (sb.length() > 0) { +// return sb.substring(0, sb.length() - 1); +// } else { +// return ""; +// } +// } + + static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map unavailable) { + for (Map.Entry entry : unavailable.entrySet()) { + String clusterAlias = entry.getKey(); + boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable(); + RemoteTransportException e = new RemoteTransportException( + Strings.format("Remote cluster [%s] (with setting skip_unavailable=%s) is not available", clusterAlias, skipUnavailable), + entry.getValue().getException() + ); + if (skipUnavailable) { + execInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .setFailures(List.of(new ShardSearchFailure(e))) + .build() + ); + } else { + throw e; + } + } + } + + static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { + Set clustersWithResolvedIndices = new HashSet<>(); + // determine missing clusters + for (String indexName : indexResolution.get().indexNameWithModes().keySet()) { + clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName)); + } + Set clustersRequested = executionInfo.clusterAliases(); + Set clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices); + clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters().keySet()); + /* + * These are clusters in the original request that are not present in the field-caps response. They were + * specified with an index or indices that do not exist, so the search on that cluster is done. + * Mark it as SKIPPED with 0 shards searched and took=0. + */ + for (String c : clustersWithNoMatchingIndices) { + // TODO: in a follow-on PR, throw a Verification(400 status code) for local and remotes with skip_unavailable=false if + // they were requested with one or more concrete indices + // for now we never mark the local cluster as SKIPPED + final var status = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(c) + ? EsqlExecutionInfo.Cluster.Status.SUCCESSFUL + : EsqlExecutionInfo.Cluster.Status.SKIPPED; + executionInfo.swapCluster( + c, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status) + .setTook(new TimeValue(0)) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .build() + ); + } + } + + static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { + // TODO: this logic assumes a single phase execution model, so it may need to altered once INLINESTATS is made CCS compatible + if (execInfo.isCrossClusterSearch()) { + execInfo.markEndPlanning(); + for (String clusterAlias : execInfo.clusterAliases()) { + EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias); + if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + execInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime()) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .build() + ); + } + } + } + } + + static Map determineUnavailableRemoteClusters(List failures) { + Map unavailableRemotes = new HashMap<>(); + for (FieldCapabilitiesFailure failure : failures) { + if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) { + for (String indexExpression : failure.getIndices()) { + if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) { + unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure); + } + } + } + } + return unavailableRemotes; + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java new file mode 100644 index 0000000000000..9768c2b7425b2 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -0,0 +1,472 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.session; + +import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.common.Strings; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.NoSeedNodeLeftException; +import org.elasticsearch.transport.NoSuchRemoteClusterException; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.index.EsIndex; +import org.elasticsearch.xpack.esql.index.IndexResolution; +import org.elasticsearch.xpack.esql.type.EsFieldTests; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class EsqlSessionCCSUtilsTests extends ESTestCase { + + public void testCreateIndexExpressionFromAvailableClusters() { + final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + final String remote1Alias = "remote1"; + final String remote2Alias = "remote2"; + + // no clusters marked as skipped + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); + + String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); + List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); + assertThat(list.size(), equalTo(5)); + assertThat( + new HashSet<>(list), + equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote2:mylogs1,remote2:mylogs2,remote2:logs*")) + ); + } + + // one cluster marked as skipped, so not present in revised index expression + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true)); + executionInfo.swapCluster( + remote2Alias, + (k, v) -> new EsqlExecutionInfo.Cluster( + remote2Alias, + "mylogs1,mylogs2,logs*", + true, + EsqlExecutionInfo.Cluster.Status.SKIPPED + ) + ); + + String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); + List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); + assertThat(list.size(), equalTo(3)); + assertThat(new HashSet<>(list), equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote1:foo"))); + } + + // two clusters marked as skipped, so only local cluster present in revised index expression + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster( + remote1Alias, + (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + ); + executionInfo.swapCluster( + remote2Alias, + (k, v) -> new EsqlExecutionInfo.Cluster( + remote2Alias, + "mylogs1,mylogs2,logs*", + true, + EsqlExecutionInfo.Cluster.Status.SKIPPED + ) + ); + + assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); + } + + // only remotes present and all marked as skipped, so in revised index expression should be empty string + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster( + remote1Alias, + (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + ); + executionInfo.swapCluster( + remote2Alias, + (k, v) -> new EsqlExecutionInfo.Cluster( + remote2Alias, + "mylogs1,mylogs2,logs*", + true, + EsqlExecutionInfo.Cluster.Status.SKIPPED + ) + ); + + assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); + } + } + + public void testUpdateExecutionInfoWithUnavailableClusters() { + final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + final String remote1Alias = "remote1"; + final String remote2Alias = "remote2"; + + // skip_unavailable=true clusters are unavailable, both marked as SKIPPED + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); + + var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); + var unvailableClusters = Map.of(remote1Alias, failure, remote2Alias, failure); + EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); + assertNull(executionInfo.overallTook()); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); + } + + // skip_unavailable=false cluster is unavailable, throws Exception + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + + var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); + RemoteTransportException e = expectThrows( + RemoteTransportException.class, + () -> EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure)) + ); + assertThat(e.status().getStatus(), equalTo(500)); + assertThat( + e.getDetailedMessage(), + containsString("Remote cluster [remote2] (with setting skip_unavailable=false) is not available") + ); + assertThat(e.getCause().getMessage(), containsString("unable to connect")); + } + + // all clusters available, no Clusters in ExecutionInfo should be modified + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + + EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); + assertNull(executionInfo.overallTook()); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + } + } + + public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { + final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + final String remote1Alias = "remote1"; + final String remote2Alias = "remote2"; + // all clusters present in EsIndex, so no updates to EsqlExecutionInfo should happen + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + + EsIndex esIndex = new EsIndex( + "logs*,remote1:*,remote2:mylogs1,remote2:mylogs2,remote2:logs*", + randomMapping(), + Map.of( + "logs-a", + IndexMode.STANDARD, + "remote1:logs-a", + IndexMode.STANDARD, + "remote2:mylogs1", + IndexMode.STANDARD, + "remote2:mylogs2", + IndexMode.STANDARD, + "remote2:logs-b", + IndexMode.STANDARD + ) + ); + IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of()); + + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + } + + // remote1 is missing from EsIndex info, so it should be updated and marked as SKIPPED with 0 total shards, 0 took time, etc. + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + + EsIndex esIndex = new EsIndex( + "logs*,remote2:mylogs1,remote2:mylogs2,remote2:logs*", + randomMapping(), + Map.of( + "logs-a", + IndexMode.STANDARD, + "remote2:mylogs1", + IndexMode.STANDARD, + "remote2:mylogs2", + IndexMode.STANDARD, + "remote2:logs-b", + IndexMode.STANDARD + ) + ); + IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of()); + + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1Cluster.getTook().millis(), equalTo(0L)); + assertThat(remote1Cluster.getTotalShards(), equalTo(0)); + assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + } + + // all remotes are missing from EsIndex info, so they should be updated and marked as SKIPPED with 0 total shards, 0 took time, etc. + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + + EsIndex esIndex = new EsIndex( + "logs*,remote2:mylogs1,remote2:mylogs2,remote2:logs*", + randomMapping(), + Map.of("logs-a", IndexMode.STANDARD) + ); + // remote1 is unavailable + var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); + IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure)); + + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed + // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote2Cluster.getTook().millis(), equalTo(0L)); + assertThat(remote2Cluster.getTotalShards(), equalTo(0)); + assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote2Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote2Cluster.getFailedShards(), equalTo(0)); + } + + // all remotes are missing from EsIndex info. Since one is configured with skip_unavailable=false, + // an exception should be thrown + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*")); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + + EsIndex esIndex = new EsIndex( + "logs*,remote2:mylogs1,remote2:mylogs2,remote2:logs*", + randomMapping(), + Map.of("logs-a", IndexMode.STANDARD) + ); + + var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); + IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure)); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + } + } + + public void testUpdateExecutionInfoAtEndOfPlanning() { + String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + String remote1Alias = "remote1"; + String remote2Alias = "remote2"; + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster( + remote1Alias, + (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + ); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + + assertNull(executionInfo.planningTookTime()); + assertNull(executionInfo.overallTook()); + try { + Thread.sleep(1); + } catch (InterruptedException e) {} + + EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); + + assertThat(executionInfo.planningTookTime().millis(), greaterThanOrEqualTo(0L)); + assertNull(executionInfo.overallTook()); + + // only remote1 should be altered, since it is the only one marked as SKIPPED when passed into updateExecutionInfoAtEndOfPlanning + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + assertNull(localCluster.getTotalShards()); + assertNull(localCluster.getTook()); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1Cluster.getTotalShards(), equalTo(0)); + assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailedShards(), equalTo(0)); + assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1Cluster.getTook().millis(), equalTo(executionInfo.planningTookTime().millis())); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + assertNull(remote2Cluster.getTotalShards()); + assertNull(remote2Cluster.getTook()); + } + + private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) { + assertThat(cluster.getStatus(), equalTo(status)); + assertNull(cluster.getTook()); + if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { + assertNull(cluster.getTotalShards()); + assertNull(cluster.getSuccessfulShards()); + assertNull(cluster.getSkippedShards()); + assertNull(cluster.getFailedShards()); + } else if (status == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + assertThat(cluster.getTotalShards(), equalTo(0)); + assertThat(cluster.getSuccessfulShards(), equalTo(0)); + assertThat(cluster.getSkippedShards(), equalTo(0)); + assertThat(cluster.getFailedShards(), equalTo(0)); + } else { + fail("Unexpected status: " + status); + } + } + + private static Map randomMapping() { + int size = between(0, 10); + Map result = new HashMap<>(size); + while (result.size() < size) { + result.put(randomAlphaOfLength(5), EsFieldTests.randomAnyEsField(1)); + } + return result; + } + + public void testDetermineUnavailableRemoteClusters() { + // two clusters, both "remote unavailable" type exceptions + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); + failures.add( + new FieldCapabilitiesFailure( + new String[] { "remote1:foo", "remote1:bar" }, + new IllegalStateException("Unable to open any connections") + ) + ); + + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote1", "remote2"))); + } + + // one cluster with "remote unavailable" with two failures + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSeedNodeLeftException("no seed node"))); + + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); + } + + // two clusters, one "remote unavailable" type exceptions and one with another type + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new CorruptIndexException("foo", "bar"))); + failures.add( + new FieldCapabilitiesFailure( + new String[] { "remote2:foo", "remote2:bar" }, + new IllegalStateException("Unable to open any connections") + ) + ); + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); + } + + // one cluster1 with exception not known to indicate "remote unavailable" + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new RuntimeException("foo"))); + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of())); + } + + // empty failures list + { + List failures = new ArrayList<>(); + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of())); + } + } +} From 56ea833cf659fe58008516ad6db272519732748a Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Wed, 30 Oct 2024 12:18:02 -0400 Subject: [PATCH 3/5] More refactorings. --- .../xpack/esql/session/EsqlSession.java | 131 +----- .../esql/session/EsqlSessionCCSUtils.java | 23 - .../xpack/esql/session/IndexResolver.java | 21 +- .../xpack/esql/session/EsqlSessionTests.java | 413 ------------------ .../esql/session/IndexResolverTests.java | 80 ---- 5 files changed, 15 insertions(+), 653 deletions(-) delete mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java delete mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index ebac0a808fce8..ca2e9001490e3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -10,13 +10,11 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -75,7 +73,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -150,22 +147,24 @@ public void execute( analyzedPlan( parse(request.query(), request.params()), executionInfo, - new LogicalPlanActionListener(request, executionInfo, runPhase, listener) + new CcsAwarePreAnalysisActionListener(request, executionInfo, runPhase, listener) ); } /** - * ActionListener that receives LogicalPlan or error from logical planning. + * ActionListener that receives LogicalPlan or error from preAnalysis. * Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so - * the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error. + * the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error + * based on the skip_unavailable status of the cluster the error came from. (The local cluster + * is always treated like skip_unavailable=false.) */ - class LogicalPlanActionListener implements ActionListener { + class CcsAwarePreAnalysisActionListener implements ActionListener { private final EsqlQueryRequest request; private final EsqlExecutionInfo executionInfo; private final BiConsumer> runPhase; private final ActionListener listener; - LogicalPlanActionListener( + CcsAwarePreAnalysisActionListener( EsqlQueryRequest request, EsqlExecutionInfo executionInfo, BiConsumer> runPhase, @@ -344,7 +343,7 @@ private void preAnalyze( .collect(Collectors.toSet()); Map unavailableClusters = enrichResolution.getUnavailableClusters(); preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l.delegateFailureAndWrap((ll, indexResolution) -> { - // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index + // TODO in follow-PR (for skip_unavailable handling of missing indexes) add some tests for invalid index // resolution to updateExecutionInfo if (indexResolution.isValid()) { EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -352,7 +351,7 @@ private void preAnalyze( if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel - // Exception to let the LogicalPlanActionListener decide how to proceed + // Exception to let the CcsAwarePreAnalysisActionListener decide how to proceed ll.onFailure(new NoClustersToSearchException()); return; } @@ -426,10 +425,10 @@ private void preAnalyzeIndices( if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of()))); - } else { - // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types - indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, listener); + return; } + // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types + indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, listener); } else { try { // occurs when dealing with local relations (row a = 1) @@ -440,30 +439,6 @@ private void preAnalyzeIndices( } } - // visible for testing - static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) { - StringBuilder sb = new StringBuilder(); - for (String clusterAlias : executionInfo.clusterAliases()) { - EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); - if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { - if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { - sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); - } else { - String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression(); - for (String index : indexExpression.split(",")) { - sb.append(clusterAlias).append(':').append(index).append(','); - } - } - } - } - - if (sb.length() > 0) { - return sb.substring(0, sb.length() - 1); - } else { - return ""; - } - } - static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchFields) { if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) { // no explicit columns selection, for example "from employees" @@ -607,86 +582,4 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { LOGGER.debug("Optimized physical plan:\n{}", plan); return plan; } - - static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map unavailable) { - for (Map.Entry entry : unavailable.entrySet()) { - String clusterAlias = entry.getKey(); - boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable(); - RemoteTransportException e = new RemoteTransportException( - Strings.format("Remote cluster [%s] (with setting skip_unavailable=%s) is not available", clusterAlias, skipUnavailable), - entry.getValue().getException() - ); - if (skipUnavailable) { - execInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .setFailures(List.of(new ShardSearchFailure(e))) - .build() - ); - } else { - throw e; - } - } - } - - // visible for testing - static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { - Set clustersWithResolvedIndices = new HashSet<>(); - // determine missing clusters - for (String indexName : indexResolution.get().indexNameWithModes().keySet()) { - clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName)); - } - Set clustersRequested = executionInfo.clusterAliases(); - Set clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices); - clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters().keySet()); - /* - * These are clusters in the original request that are not present in the field-caps response. They were - * specified with an index or indices that do not exist, so the search on that cluster is done. - * Mark it as SKIPPED with 0 shards searched and took=0. - */ - for (String c : clustersWithNoMatchingIndices) { - // TODO: in a follow-on PR, throw a Verification(400 status code) for local and remotes with skip_unavailable=false if - // they were requested with one or more concrete indices - // for now we never mark the local cluster as SKIPPED - final var status = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(c) - ? EsqlExecutionInfo.Cluster.Status.SUCCESSFUL - : EsqlExecutionInfo.Cluster.Status.SKIPPED; - executionInfo.swapCluster( - c, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status) - .setTook(new TimeValue(0)) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .build() - ); - } - } - - // visible for testing - static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { - // TODO: this logic assumes a single phase execution model, so it may need to altered once INLINESTATS is made CCS compatible - if (execInfo.isCrossClusterSearch()) { - execInfo.markEndPlanning(); - for (String clusterAlias : execInfo.clusterAliases()) { - EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias); - if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { - execInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime()) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .build() - ); - } - } - } - } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 8c91039979014..50df203adca35 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -51,29 +51,6 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu } } -// static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) { -// StringBuilder sb = new StringBuilder(); -// for (String clusterAlias : executionInfo.clusterAliases()) { -// EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); -// if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { -// if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { -// sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); -// } else { -// String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression(); -// for (String index : indexExpression.split(",'")) { -// sb.append(clusterAlias).append(':').append(index).append(','); -// } -// } -// } -// } -// -// if (sb.length() > 0) { -// return sb.substring(0, sb.length() - 1); -// } else { -// return ""; -// } -// } - static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map unavailable) { for (Map.Entry entry : unavailable.entrySet()) { String clusterAlias = entry.getKey(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index f76f7798dece8..b05928fbba576 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -6,7 +6,6 @@ */ package org.elasticsearch.xpack.esql.session; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse; @@ -20,7 +19,6 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.TimeSeriesParams; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.DateEsField; @@ -159,25 +157,12 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) { concreteIndices.put(ir.getIndexName(), ir.getIndexMode()); } - Map unavailableRemotes = determineUnavailableRemoteClusters(fieldCapsResponse.getFailures()); + Map unavailableRemotes = EsqlSessionCCSUtils.determineUnavailableRemoteClusters( + fieldCapsResponse.getFailures() + ); return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), unavailableRemotes); } - // visible for testing - static Map determineUnavailableRemoteClusters(List failures) { - Map unavailableRemotes = new HashMap<>(); - for (FieldCapabilitiesFailure failure : failures) { - if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) { - for (String indexExpression : failure.getIndices()) { - if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) { - unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure); - } - } - } - } - return unavailableRemotes; - } - private boolean allNested(List caps) { for (IndexFieldCapabilities cap : caps) { if (false == cap.type().equalsIgnoreCase("nested")) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java deleted file mode 100644 index dddfa67338419..0000000000000 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java +++ /dev/null @@ -1,413 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.session; - -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; -import org.elasticsearch.common.Strings; -import org.elasticsearch.index.IndexMode; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.NoSeedNodeLeftException; -import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteTransportException; -import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; -import org.elasticsearch.xpack.esql.core.type.EsField; -import org.elasticsearch.xpack.esql.index.EsIndex; -import org.elasticsearch.xpack.esql.index.IndexResolution; -import org.elasticsearch.xpack.esql.type.EsFieldTests; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; - -public class EsqlSessionTests extends ESTestCase { - - public void testCreateIndexExpressionFromAvailableClusters() { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; - - // no clusters marked as skipped - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); - - String indexExpr = EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo); - List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); - assertThat(list.size(), equalTo(5)); - assertThat( - new HashSet<>(list), - equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote2:mylogs1,remote2:mylogs2,remote2:logs*")) - ); - } - - // one cluster marked as skipped, so not present in revised index expression - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true)); - executionInfo.swapCluster( - remote2Alias, - (k, v) -> new EsqlExecutionInfo.Cluster( - remote2Alias, - "mylogs1,mylogs2,logs*", - true, - EsqlExecutionInfo.Cluster.Status.SKIPPED - ) - ); - - String indexExpr = EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo); - List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); - assertThat(list.size(), equalTo(3)); - assertThat(new HashSet<>(list), equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote1:foo"))); - } - - // two clusters marked as skipped, so only local cluster present in revised index expression - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster( - remote1Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) - ); - executionInfo.swapCluster( - remote2Alias, - (k, v) -> new EsqlExecutionInfo.Cluster( - remote2Alias, - "mylogs1,mylogs2,logs*", - true, - EsqlExecutionInfo.Cluster.Status.SKIPPED - ) - ); - - assertThat(EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); - } - - // only remotes present and all marked as skipped, so in revised index expression should be empty string - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster( - remote1Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) - ); - executionInfo.swapCluster( - remote2Alias, - (k, v) -> new EsqlExecutionInfo.Cluster( - remote2Alias, - "mylogs1,mylogs2,logs*", - true, - EsqlExecutionInfo.Cluster.Status.SKIPPED - ) - ); - - assertThat(EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); - } - } - - public void testUpdateExecutionInfoWithUnavailableClusters() { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; - - // skip_unavailable=true clusters are unavailable, both marked as SKIPPED - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); - - var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - var unvailableClusters = Map.of(remote1Alias, failure, remote2Alias, failure); - EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); - - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); - assertNull(executionInfo.overallTook()); - - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); - - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); - assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); - } - - // skip_unavailable=false cluster is unavailable, throws Exception - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); - - var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - RemoteTransportException e = expectThrows( - RemoteTransportException.class, - () -> EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure)) - ); - assertThat(e.status().getStatus(), equalTo(500)); - assertThat( - e.getDetailedMessage(), - containsString("Remote cluster [remote2] (with setting skip_unavailable=false) is not available") - ); - assertThat(e.getCause().getMessage(), containsString("unable to connect")); - } - - // all clusters available, no Clusters in ExecutionInfo should be modified - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); - - EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); - - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); - assertNull(executionInfo.overallTook()); - - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); - assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - } - } - - public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; - // all clusters present in EsIndex, so no updates to EsqlExecutionInfo should happen - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); - - EsIndex esIndex = new EsIndex( - "logs*,remote1:*,remote2:mylogs1,remote2:mylogs2,remote2:logs*", - randomMapping(), - Map.of( - "logs-a", - IndexMode.STANDARD, - "remote1:logs-a", - IndexMode.STANDARD, - "remote2:mylogs1", - IndexMode.STANDARD, - "remote2:mylogs2", - IndexMode.STANDARD, - "remote2:logs-b", - IndexMode.STANDARD - ) - ); - IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of()); - - EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); - assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - } - - // remote1 is missing from EsIndex info, so it should be updated and marked as SKIPPED with 0 total shards, 0 took time, etc. - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); - - EsIndex esIndex = new EsIndex( - "logs*,remote2:mylogs1,remote2:mylogs2,remote2:logs*", - randomMapping(), - Map.of( - "logs-a", - IndexMode.STANDARD, - "remote2:mylogs1", - IndexMode.STANDARD, - "remote2:mylogs2", - IndexMode.STANDARD, - "remote2:logs-b", - IndexMode.STANDARD - ) - ); - IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of()); - - EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - assertThat(remote1Cluster.getTook().millis(), equalTo(0L)); - assertThat(remote1Cluster.getTotalShards(), equalTo(0)); - assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); - assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); - assertThat(remote1Cluster.getFailedShards(), equalTo(0)); - - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); - assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - } - - // all remotes are missing from EsIndex info, so they should be updated and marked as SKIPPED with 0 total shards, 0 took time, etc. - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); - - EsIndex esIndex = new EsIndex( - "logs*,remote2:mylogs1,remote2:mylogs2,remote2:logs*", - randomMapping(), - Map.of("logs-a", IndexMode.STANDARD) - ); - // remote1 is unavailable - var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure)); - - EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed - // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) - assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); - - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); - assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - assertThat(remote2Cluster.getTook().millis(), equalTo(0L)); - assertThat(remote2Cluster.getTotalShards(), equalTo(0)); - assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0)); - assertThat(remote2Cluster.getSkippedShards(), equalTo(0)); - assertThat(remote2Cluster.getFailedShards(), equalTo(0)); - } - - // all remotes are missing from EsIndex info. Since one is configured with skip_unavailable=false, - // an exception should be thrown - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*")); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); - - EsIndex esIndex = new EsIndex( - "logs*,remote2:mylogs1,remote2:mylogs2,remote2:logs*", - randomMapping(), - Map.of("logs-a", IndexMode.STANDARD) - ); - - var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure)); - EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - } - } - - public void testUpdateExecutionInfoAtEndOfPlanning() { - String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - String remote1Alias = "remote1"; - String remote2Alias = "remote2"; - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster( - remote1Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) - ); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); - - assertNull(executionInfo.planningTookTime()); - assertNull(executionInfo.overallTook()); - try { - Thread.sleep(1); - } catch (InterruptedException e) {} - - EsqlSession.updateExecutionInfoAtEndOfPlanning(executionInfo); - - assertThat(executionInfo.planningTookTime().millis(), greaterThanOrEqualTo(0L)); - assertNull(executionInfo.overallTook()); - - // only remote1 should be altered, since it is the only one marked as SKIPPED when passed into updateExecutionInfoAtEndOfPlanning - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); - assertNull(localCluster.getTotalShards()); - assertNull(localCluster.getTook()); - - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); - assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - assertThat(remote1Cluster.getTotalShards(), equalTo(0)); - assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); - assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); - assertThat(remote1Cluster.getFailedShards(), equalTo(0)); - assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); - assertThat(remote1Cluster.getTook().millis(), equalTo(executionInfo.planningTookTime().millis())); - - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); - assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); - assertNull(remote2Cluster.getTotalShards()); - assertNull(remote2Cluster.getTook()); - } - - private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) { - assertThat(cluster.getStatus(), equalTo(status)); - assertNull(cluster.getTook()); - if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { - assertNull(cluster.getTotalShards()); - assertNull(cluster.getSuccessfulShards()); - assertNull(cluster.getSkippedShards()); - assertNull(cluster.getFailedShards()); - } else if (status == EsqlExecutionInfo.Cluster.Status.SKIPPED) { - assertThat(cluster.getTotalShards(), equalTo(0)); - assertThat(cluster.getSuccessfulShards(), equalTo(0)); - assertThat(cluster.getSkippedShards(), equalTo(0)); - assertThat(cluster.getFailedShards(), equalTo(0)); - } else { - fail("Unexpected status: " + status); - } - } - - private static Map randomMapping() { - int size = between(0, 10); - Map result = new HashMap<>(size); - while (result.size() < size) { - result.put(randomAlphaOfLength(5), EsFieldTests.randomAnyEsField(1)); - } - return result; - } -} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java deleted file mode 100644 index d6e410305afaa..0000000000000 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.session; - -import org.apache.lucene.index.CorruptIndexException; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.NoSeedNodeLeftException; -import org.elasticsearch.transport.NoSuchRemoteClusterException; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.hamcrest.Matchers.equalTo; - -public class IndexResolverTests extends ESTestCase { - - public void testDetermineUnavailableRemoteClusters() { - // two clusters, both "remote unavailable" type exceptions - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); - failures.add( - new FieldCapabilitiesFailure( - new String[] { "remote1:foo", "remote1:bar" }, - new IllegalStateException("Unable to open any connections") - ) - ); - - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote1", "remote2"))); - } - - // one cluster with "remote unavailable" with two failures - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSeedNodeLeftException("no seed node"))); - - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); - } - - // two clusters, one "remote unavailable" type exceptions and one with another type - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new CorruptIndexException("foo", "bar"))); - failures.add( - new FieldCapabilitiesFailure( - new String[] { "remote2:foo", "remote2:bar" }, - new IllegalStateException("Unable to open any connections") - ) - ); - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); - } - - // one cluster1 with exception not known to indicate "remote unavailable" - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new RuntimeException("foo"))); - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of())); - } - - // empty failures list - { - List failures = new ArrayList<>(); - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of())); - } - } -} From b9d6aefa6cd612cc33078b8855b7df95bead2c25 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Wed, 30 Oct 2024 13:47:25 -0400 Subject: [PATCH 4/5] Removed CcsAwarePreAnalysisActionListener, pushing functionality to EsqlSessionCCSUtils --- .../xpack/esql/session/EsqlSession.java | 108 ++---------------- .../esql/session/EsqlSessionCCSUtils.java | 61 ++++++++++ 2 files changed, 68 insertions(+), 101 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index ca2e9001490e3..878c8b1259479 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.session; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.ShardSearchFailure; @@ -23,9 +22,6 @@ import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; -import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; import org.elasticsearch.xpack.esql.analysis.Analyzer; @@ -147,105 +143,15 @@ public void execute( analyzedPlan( parse(request.query(), request.params()), executionInfo, - new CcsAwarePreAnalysisActionListener(request, executionInfo, runPhase, listener) - ); - } - - /** - * ActionListener that receives LogicalPlan or error from preAnalysis. - * Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so - * the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error - * based on the skip_unavailable status of the cluster the error came from. (The local cluster - * is always treated like skip_unavailable=false.) - */ - class CcsAwarePreAnalysisActionListener implements ActionListener { - private final EsqlQueryRequest request; - private final EsqlExecutionInfo executionInfo; - private final BiConsumer> runPhase; - private final ActionListener listener; - - CcsAwarePreAnalysisActionListener( - EsqlQueryRequest request, - EsqlExecutionInfo executionInfo, - BiConsumer> runPhase, - ActionListener listener - ) { - this.request = request; - this.executionInfo = executionInfo; - this.runPhase = runPhase; - this.listener = listener; - } - - @Override - public void onResponse(LogicalPlan analyzedPlan) { - executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(analyzedPlan), listener); - } - - /** - * Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error. - * - * For cases where field-caps had no indices to search and the remotes were unavailable, we - * return an empty successful response (200) if all remotes are marked with skip_unavailable=true. - * - * Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match - * on any of the requested clusters. - */ - private boolean returnSuccessWithEmptyResult(Exception e) { - if (executionInfo.isCrossClusterSearch() == false) { - return false; - } - - if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) { - for (String clusterAlias : executionInfo.clusterAliases()) { - if (executionInfo.isSkipUnavailable(clusterAlias) == false - && clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { - return false; - } - } - return true; - } - return false; - } - - @Override - public void onFailure(Exception e) { - if (returnSuccessWithEmptyResult(e)) { - executionInfo.markEndQuery(); - Exception exceptionForResponse; - if (e instanceof ConnectTransportException) { - // when field-caps has no field info (since no clusters could be connected to or had matching indices) - // it just throws the first exception in its list, so this odd special handling is here is to avoid - // having one specific remote alias name in all failure lists in the metadata response - exceptionForResponse = new RemoteTransportException( - "connect_transport_exception - unable to connect to remote cluster", - null - ); + ActionListener.wrap(plan -> executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(plan), listener), e -> { + if (EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, e)) { + EsqlSessionCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, e); + listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo)); } else { - exceptionForResponse = e; + listener.onFailure(e); } - for (String clusterAlias : executionInfo.clusterAliases()) { - executionInfo.swapCluster(clusterAlias, (k, v) -> { - EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook( - executionInfo.overallTook() - ).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0); - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { - // never mark local cluster as skipped - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); - } else { - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); - // add this exception to the failures list only if there is no failure already recorded there - if (v.getFailures() == null || v.getFailures().size() == 0) { - builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); - } - } - return builder.build(); - }); - } - listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo)); - } else { - listener.onFailure(e); - } - } + }) + ); } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 50df203adca35..b09519420b063 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; @@ -144,4 +145,64 @@ static Map determineUnavailableRemoteClusters( } return unavailableRemotes; } + + /** + * Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error. + * + * For cases where field-caps had no indices to search and the remotes were unavailable, we + * return an empty successful response (200) if all remotes are marked with skip_unavailable=true. + * + * Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match + * on any of the requested clusters. + */ + static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exception e) { + if (executionInfo.isCrossClusterSearch() == false) { + return false; + } + + if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) { + for (String clusterAlias : executionInfo.clusterAliases()) { + if (executionInfo.isSkipUnavailable(clusterAlias) == false + && clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { + return false; + } + } + return true; + } + return false; + } + + // TODO: write tests for this and method above + static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionInfo, Exception e) { + executionInfo.markEndQuery(); + Exception exceptionForResponse; + if (e instanceof ConnectTransportException) { + // when field-caps has no field info (since no clusters could be connected to or had matching indices) + // it just throws the first exception in its list, so this odd special handling is here is to avoid + // having one specific remote alias name in all failure lists in the metadata response + exceptionForResponse = new RemoteTransportException("connect_transport_exception - unable to connect to remote cluster", null); + } else { + exceptionForResponse = e; + } + for (String clusterAlias : executionInfo.clusterAliases()) { + executionInfo.swapCluster(clusterAlias, (k, v) -> { + EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(executionInfo.overallTook()) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0); + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + // never mark local cluster as skipped + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); + } else { + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); + // add this exception to the failures list only if there is no failure already recorded there + if (v.getFailures() == null || v.getFailures().size() == 0) { + builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); + } + } + return builder.build(); + }); + } + } } From 84163d89dce749bc67f076f318d25bad2f1bbbb8 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Wed, 30 Oct 2024 15:17:34 -0400 Subject: [PATCH 5/5] Added tests to EsqlSessionCCSUtils for new methods moved there, such as testReturnSuccessWithEmptyResult --- .../xpack/esql/action/EsqlExecutionInfo.java | 1 + .../esql/session/EsqlSessionCCSUtils.java | 1 - .../session/EsqlSessionCCSUtilsTests.java | 110 ++++++++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 80bb2afe57122..7153be543d2c2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -182,6 +182,7 @@ public boolean isSkipUnavailable(String clusterAlias) { if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { return false; } + // TODO: should first check the skipUn status of each cluster if present in clusterInfo (simplifies testing) return skipUnavailablePredicate.test(clusterAlias); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index b09519420b063..cd2a99fc3c509 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -172,7 +172,6 @@ static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exc return false; } - // TODO: write tests for this and method above static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionInfo, Exception e) { executionInfo.markEndQuery(); Exception exceptionForResponse; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index 9768c2b7425b2..5c89f612f5603 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -9,9 +9,11 @@ import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.Strings; import org.elasticsearch.index.IndexMode; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NoSeedNodeLeftException; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; @@ -29,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -469,4 +472,111 @@ public void testDetermineUnavailableRemoteClusters() { assertThat(unavailableClusters.keySet(), equalTo(Set.of())); } } + + public void testReturnSuccessWithEmptyResult() { + String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + String remote1Alias = "remote1"; + String remote2Alias = "remote2"; + String remote3Alias = "remote3"; + NoClustersToSearchException noClustersException = new NoClustersToSearchException(); + Predicate skipUnPredicate = s -> { + if (s.equals("remote2") || s.equals("remote3")) { + return true; + } + return false; + }; + + EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false); + EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", false); + EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true); + EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true); + + // not a cross-cluster cluster search, so do not return empty result + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster); + assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + } + + // local cluster is present, so do not return empty result + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster); + executionInfo.swapCluster(remote1Alias, (k, v) -> remote1); + // TODO: this logic will be added in the follow-on PR that handles missing indices + // assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + } + + // remote-only, one cluster is skip_unavailable=false, so do not return empty result + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(remote1Alias, (k, v) -> remote1); + executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + } + + // remote-only, all clusters are skip_unavailable=true, so should return empty result with + // NoSuchClustersException or "remote unavailable" type exception + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + executionInfo.swapCluster(remote3Alias, (k, v) -> remote3); + Exception e = randomFrom( + new NoSuchRemoteClusterException("foo"), + noClustersException, + new NoSeedNodeLeftException("foo"), + new IllegalStateException("unknown host") + ); + assertTrue(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, e)); + } + + // remote-only, all clusters are skip_unavailable=true, but exception is not "remote unavailable" so return false + // Note: this functionality may change in follow-on PRs, so remove this test in that case + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + executionInfo.swapCluster(remote3Alias, (k, v) -> remote3); + assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, new NullPointerException())); + } + } + + public void testUpdateExecutionInfoToReturnEmptyResult() { + String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + String remote1Alias = "remote1"; + String remote2Alias = "remote2"; + String remote3Alias = "remote3"; + ConnectTransportException transportEx = new ConnectTransportException(null, "foo"); + Predicate skipUnPredicate = s -> { + if (s.startsWith("remote")) { + return true; + } + return false; + }; + + EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false); + EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", true); + EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true); + EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true); + + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(localCluster.getClusterAlias(), (k, v) -> localCluster); + executionInfo.swapCluster(remote1.getClusterAlias(), (k, v) -> remote1); + executionInfo.swapCluster(remote2.getClusterAlias(), (k, v) -> remote2); + executionInfo.swapCluster(remote3.getClusterAlias(), (k, v) -> remote3); + + assertNull(executionInfo.overallTook()); + + EsqlSessionCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, transportEx); + + assertNotNull(executionInfo.overallTook()); + assertThat(executionInfo.getCluster(localClusterAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(executionInfo.getCluster(localClusterAlias).getFailures().size(), equalTo(0)); + + for (String remoteAlias : Set.of(remote1Alias, remote2Alias, remote3Alias)) { + assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + List remoteFailures = executionInfo.getCluster(remoteAlias).getFailures(); + assertThat(remoteFailures.size(), equalTo(1)); + assertThat(remoteFailures.get(0).reason(), containsString("unable to connect to remote cluster")); + } + } }