Skip to content

Commit fca2f43

Browse files
authored
Fallback to field-caps (#115977) (#116428)
This change falls back to the old field-caps action if the remote cluster has not been updated to 8.16 or later.
1 parent 84087bd commit fca2f43

File tree

5 files changed

+39
-12
lines changed

5 files changed

+39
-12
lines changed

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

+20-6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.action.support.ChannelActionListener;
2323
import org.elasticsearch.action.support.HandledTransportAction;
2424
import org.elasticsearch.action.support.RefCountingRunnable;
25+
import org.elasticsearch.client.internal.RemoteClusterClient;
2526
import org.elasticsearch.cluster.ClusterState;
2627
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2728
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -113,23 +114,28 @@ public TransportFieldCapabilitiesAction(
113114

114115
@Override
115116
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
116-
executeRequest(task, request, REMOTE_TYPE, listener);
117+
executeRequest(
118+
task,
119+
request,
120+
(remoteClient, remoteRequest, remoteListener) -> remoteClient.execute(REMOTE_TYPE, remoteRequest, remoteListener),
121+
listener
122+
);
117123
}
118124

119125
public void executeRequest(
120126
Task task,
121127
FieldCapabilitiesRequest request,
122-
RemoteClusterActionType<FieldCapabilitiesResponse> remoteAction,
128+
RemoteRequestExecutor remoteRequestExecutor,
123129
ActionListener<FieldCapabilitiesResponse> listener
124130
) {
125131
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
126-
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteAction, l)));
132+
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteRequestExecutor, l)));
127133
}
128134

129135
private void doExecuteForked(
130136
Task task,
131137
FieldCapabilitiesRequest request,
132-
RemoteClusterActionType<FieldCapabilitiesResponse> remoteAction,
138+
RemoteRequestExecutor remoteRequestExecutor,
133139
ActionListener<FieldCapabilitiesResponse> listener
134140
) {
135141
if (ccsCheckCompatibility) {
@@ -282,8 +288,8 @@ private void doExecuteForked(
282288
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
283289
}
284290
});
285-
remoteClusterClient.execute(
286-
remoteAction,
291+
remoteRequestExecutor.executeRemoteRequest(
292+
remoteClusterClient,
287293
remoteRequest,
288294
// The underlying transport service may call onFailure with a thread pool other than search_coordinator.
289295
// This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator.
@@ -298,6 +304,14 @@ private void doExecuteForked(
298304
}
299305
}
300306

307+
public interface RemoteRequestExecutor {
308+
void executeRemoteRequest(
309+
RemoteClusterClient remoteClient,
310+
FieldCapabilitiesRequest remoteRequest,
311+
ActionListener<FieldCapabilitiesResponse> remoteListener
312+
);
313+
}
314+
301315
private static void checkIndexBlocks(ClusterState clusterState, String[] concreteIndices) {
302316
var blocks = clusterState.blocks();
303317
if (blocks.global().isEmpty() && blocks.indices().isEmpty()) {

x-pack/plugin/esql/qa/server/multi-clusters/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ dependencies {
1818
}
1919

2020
def supportedVersion = bwcVersion -> {
21-
// ESQL requires its own resolve_fields API
22-
return bwcVersion.onOrAfter(Version.fromString("8.16.0"));
21+
// CCS in ES|QL available in 8.13
22+
return bwcVersion.onOrAfter(Version.fromString("8.13.0"));
2323
}
2424

2525
BuildParams.bwcVersions.withWireCompatible(supportedVersion) { bwcVersion, baseName ->

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ public MultiClusterSpecIT(
104104
protected void shouldSkipTest(String testName) throws IOException {
105105
super.shouldSkipTest(testName);
106106
checkCapabilities(remoteClusterClient(), remoteFeaturesService(), testName, testCase);
107-
assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api"));
108107
assumeFalse("can't test with _index metadata", hasIndexMetadata(testCase.query));
109108
assumeTrue(
110109
"Test " + testName + " is skipped on " + Clusters.oldVersion(),

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ record Doc(int id, String color, long data) {
6767

6868
@Before
6969
public void setUpIndices() throws Exception {
70-
assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api"));
7170
final String mapping = """
7271
"properties": {
7372
"data": { "type": "long" },

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.esql.action;
88

9+
import org.elasticsearch.TransportVersions;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.ActionType;
1112
import org.elasticsearch.action.RemoteClusterActionType;
@@ -14,6 +15,7 @@
1415
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
1516
import org.elasticsearch.action.support.ActionFilters;
1617
import org.elasticsearch.action.support.HandledTransportAction;
18+
import org.elasticsearch.client.internal.RemoteClusterClient;
1719
import org.elasticsearch.common.util.concurrent.EsExecutors;
1820
import org.elasticsearch.injection.guice.Inject;
1921
import org.elasticsearch.tasks.Task;
@@ -27,7 +29,7 @@
2729
public class EsqlResolveFieldsAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
2830
public static final String NAME = "indices:data/read/esql/resolve_fields";
2931
public static final ActionType<FieldCapabilitiesResponse> TYPE = new ActionType<>(NAME);
30-
public static final RemoteClusterActionType<FieldCapabilitiesResponse> REMOTE_TYPE = new RemoteClusterActionType<>(
32+
public static final RemoteClusterActionType<FieldCapabilitiesResponse> RESOLVE_REMOTE_TYPE = new RemoteClusterActionType<>(
3133
NAME,
3234
FieldCapabilitiesResponse::new
3335
);
@@ -47,6 +49,19 @@ public EsqlResolveFieldsAction(
4749

4850
@Override
4951
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
50-
fieldCapsAction.executeRequest(task, request, REMOTE_TYPE, listener);
52+
fieldCapsAction.executeRequest(task, request, this::executeRemoteRequest, listener);
53+
}
54+
55+
void executeRemoteRequest(
56+
RemoteClusterClient remoteClient,
57+
FieldCapabilitiesRequest remoteRequest,
58+
ActionListener<FieldCapabilitiesResponse> remoteListener
59+
) {
60+
remoteClient.getConnection(remoteRequest, remoteListener.delegateFailure((l, conn) -> {
61+
var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.ESQL_ORIGINAL_INDICES)
62+
? RESOLVE_REMOTE_TYPE
63+
: TransportFieldCapabilitiesAction.REMOTE_TYPE;
64+
remoteClient.execute(conn, remoteAction, remoteRequest, l);
65+
}));
5166
}
5267
}

0 commit comments

Comments
 (0)