Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e3b6a4b
Term vector API on stateless search nodes
kingherc Jun 19, 2025
551564a
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 3, 2025
f6a2b60
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 7, 2025
ac6501e
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 8, 2025
768061c
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 10, 2025
4700319
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 14, 2025
7b78d40
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 16, 2025
10a0516
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 17, 2025
a1de023
Move transport action to serverless
kingherc Jul 17, 2025
0d35fe4
PR comments
kingherc Jul 17, 2025
e56b4b2
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 18, 2025
cc02815
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 18, 2025
e022eb1
Remove dead code
kingherc Jul 18, 2025
ecd0132
Checkstyle
kingherc Jul 18, 2025
f3b5777
Make action internal
kingherc Jul 18, 2025
9bf99d9
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 18, 2025
d4e7c8b
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 18, 2025
2533240
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 21, 2025
032ea2e
Allow refreshing a shard by system
kingherc Jul 21, 2025
849e31c
Add wait_for_no_initializing_shards in yaml rest
kingherc Jul 21, 2025
fde5fe0
Add special origin to call refresh shard
kingherc Jul 21, 2025
12b0d36
Merge remote-tracking branch 'kingherc/main' into non-issue/ES-12112-…
kingherc Jul 21, 2025
167fa81
Comments
kingherc Jul 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ routing:
settings:
index:
number_of_shards: 5
number_of_replicas: 0

- do:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this removed intentionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes because it won't work in Serverless, as there'll be no search shard to execute the new term API. In serverless we force to have a search node and we'd like a search shard.

So this way it works in both stateful and stateless.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the test would become flaky now that we don't wait for the cluster to go green, that's why I was asking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To wait for the search shard is definitely up and running, I incorporated the following piece of code

 - do:
     cluster.health:
       wait_for_no_initializing_shards: true

which is copied from #114641 which solved a similar issue to make the tests work in both ES and serverless.

I run it also 10 locally, both core ES and severless, and it succeeds. Feel free to tell me if you have more feedback.

cluster.health:
wait_for_status: green
wait_for_no_initializing_shards: true

- do:
index:
Expand Down Expand Up @@ -52,11 +51,14 @@ requires routing:
settings:
index:
number_of_shards: 5
number_of_replicas: 0
mappings:
_routing:
required: true

- do:
cluster.health:
wait_for_no_initializing_shards: true

- do:
index:
index: test_1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
index:
translog.flush_threshold_size: "512MB"
number_of_shards: 1
number_of_replicas: 0
refresh_interval: -1
mappings:
properties:
Expand All @@ -17,7 +16,7 @@

- do:
cluster.health:
wait_for_status: green
wait_for_no_initializing_shards: true

- do:
index:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
settings:
index:
refresh_interval: -1
number_of_replicas: 0

- do:
cluster.health:
wait_for_status: green
cluster.health:
wait_for_no_initializing_shards: true

- do:
index:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*
* This file was contributed to by generative AI
*/

package org.elasticsearch.action.termvectors;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* This action is used in serverless to ensure that documents are searchable on the search tier before processing
* term vector requests. It is an intermediate action that is executed on the indexing node and responds
* with a no-op (the search node can proceed to process the term vector request). The action may trigger an external refresh
* to ensure the search shards are up to date before returning the no-op.
*/
public class EnsureDocsSearchableAction {

private static final String ACTION_NAME = "internal:index/data/read/eds";
public static final ActionType<ActionResponse.Empty> TYPE = new ActionType<>(ACTION_NAME);
public static final String ENSURE_DOCS_SEARCHABLE_ORIGIN = "ensure_docs_searchable";

public static final class EnsureDocsSearchableRequest extends SingleShardRequest<EnsureDocsSearchableRequest> {

private int shardId; // this is not serialized over the wire, and will be 0 on the other end of the wire.
private String[] docIds;

public EnsureDocsSearchableRequest() {}

public EnsureDocsSearchableRequest(StreamInput in) throws IOException {
super(in);
docIds = in.readStringArray();
}

@Override
public ActionRequestValidationException validate() {
return super.validateNonNullIndex();
}

public EnsureDocsSearchableRequest(String index, int shardId, String[] docIds) {
super(index);
this.shardId = shardId;
this.docIds = docIds;
}

public int shardId() {
return this.shardId;
}

public String[] docIds() {
return docIds;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(docIds);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*
* This file was contributed to by generative AI
*/

package org.elasticsearch.action.termvectors;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -28,20 +33,26 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.core.Strings.format;

public class TransportShardMultiTermsVectorAction extends TransportSingleShardAction<
MultiTermVectorsShardRequest,
MultiTermVectorsShardResponse> {

private final NodeClient client;
private final IndicesService indicesService;
private final boolean stateless;

private static final String ACTION_NAME = MultiTermVectorsAction.NAME + "[shard]";
public static final ActionType<MultiTermVectorsShardResponse> TYPE = new ActionType<>(ACTION_NAME);

@Inject
public TransportShardMultiTermsVectorAction(
ClusterService clusterService,
NodeClient client,
TransportService transportService,
IndicesService indicesService,
ThreadPool threadPool,
Expand All @@ -60,7 +71,9 @@ public TransportShardMultiTermsVectorAction(
MultiTermVectorsShardRequest::new,
threadPool.executor(ThreadPool.Names.GET)
);
this.client = client;
this.indicesService = indicesService;
this.stateless = DiscoveryNode.isStateless(clusterService.getSettings());
}

@Override
Expand All @@ -80,9 +93,44 @@ protected boolean resolveIndex(MultiTermVectorsShardRequest request) {

@Override
protected ShardIterator shards(ProjectState project, InternalRequest request) {
ShardIterator shards = clusterService.operationRouting()
ShardIterator iterator = clusterService.operationRouting()
.getShards(project, request.concreteIndex(), request.request().shardId(), request.request().preference());
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(shards);
if (iterator == null) {
// We return an empty iterator to avoid hitting an indexing node in serverless (e.g., if there are no search nodes available).
return new ShardIterator(null, List.of());
}
return ShardIterator.allSearchableShards(iterator);
}

@Override
protected void asyncShardOperation(
MultiTermVectorsShardRequest request,
ShardId shardId,
ActionListener<MultiTermVectorsShardResponse> listener
) throws IOException {
if (stateless) {
final String[] realTimeIds = request.requests.stream()
.filter(r -> r.realtime())
.map(TermVectorsRequest::id)
.toArray(String[]::new);
if (realTimeIds.length > 0) {
final var ensureDocsSearchableRequest = new EnsureDocsSearchableAction.EnsureDocsSearchableRequest(
request.index(),
shardId.id(),
realTimeIds
);
ensureDocsSearchableRequest.setParentTask(clusterService.localNode().getId(), request.getParentTask().getId());
client.executeLocally(
EnsureDocsSearchableAction.TYPE,
ensureDocsSearchableRequest,
listener.delegateFailureAndWrap((l, r) -> super.asyncShardOperation(request, shardId, l))
);
} else {
super.asyncShardOperation(request, shardId, listener);
}
} else {
super.asyncShardOperation(request, shardId, listener);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*
* This file was contributed to by generative AI
*/

package org.elasticsearch.action.termvectors;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -28,17 +32,21 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;

/**
* Performs the get operation.
*/
public class TransportTermVectorsAction extends TransportSingleShardAction<TermVectorsRequest, TermVectorsResponse> {

private final NodeClient client;
private final IndicesService indicesService;
private final boolean stateless;

@Inject
public TransportTermVectorsAction(
ClusterService clusterService,
NodeClient client,
TransportService transportService,
IndicesService indicesService,
ThreadPool threadPool,
Expand All @@ -57,7 +65,9 @@ public TransportTermVectorsAction(
TermVectorsRequest::new,
threadPool.executor(ThreadPool.Names.GET)
);
this.client = client;
this.indicesService = indicesService;
this.stateless = DiscoveryNode.isStateless(clusterService.getSettings());
}

@Override
Expand All @@ -69,21 +79,19 @@ protected ShardIterator shards(ProjectState project, InternalRequest request) {
.getFirst();
}

return operationRouting.useOnlyPromotableShardsForStateless(
operationRouting.getShards(

ShardIterator iterator = clusterService.operationRouting()
.getShards(
project,

request.concreteIndex(),

request.request().id(),

request.request().routing(),

request.request().preference()

)
);
);
if (iterator == null) {
// We return an empty iterator to avoid hitting an indexing node in serverless (e.g., if there are no search nodes available).
return new ShardIterator(null, List.of());
}
return ShardIterator.allSearchableShards(iterator);
}

@Override
Expand All @@ -103,7 +111,22 @@ protected void asyncShardOperation(TermVectorsRequest request, ShardId shardId,
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles
super.asyncShardOperation(request, shardId, listener);
if (stateless) {
// Ensure that the document is searchable before we execute the term vectors request
final var ensureDocsSearchableRequest = new EnsureDocsSearchableAction.EnsureDocsSearchableRequest(
request.index(),
shardId.id(),
new String[] { request.id() }
);
ensureDocsSearchableRequest.setParentTask(clusterService.localNode().getId(), request.getParentTask().getId());
client.executeLocally(
EnsureDocsSearchableAction.TYPE,
ensureDocsSearchableRequest,
listener.delegateFailureAndWrap((l, r) -> super.asyncShardOperation(request, shardId, l))
);
} else {
super.asyncShardOperation(request, shardId, listener);
}
} else {
indexShard.ensureShardSearchActive(b -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -42,11 +41,9 @@ public class OperationRouting {
);

private boolean useAdaptiveReplicaSelection;
private final boolean isStateless;

@SuppressWarnings("this-escape")
public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
this.isStateless = DiscoveryNode.isStateless(settings);
this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
}
Expand Down Expand Up @@ -78,19 +75,6 @@ public ShardIterator getShards(ProjectState projectState, String index, int shar
return preferenceActiveShardIterator(indexShard, nodes.getLocalNodeId(), nodes, preference, null, null);
}

public ShardIterator useOnlyPromotableShardsForStateless(ShardIterator shards) {
// If it is stateless, only route promotable shards. This is a temporary workaround until a more cohesive solution can be
// implemented for search shards.
if (isStateless && shards != null) {
return new ShardIterator(
shards.shardId(),
shards.getShardRoutings().stream().filter(ShardRouting::isPromotableToPrimary).collect(Collectors.toList())
);
} else {
return shards;
}
}

public List<ShardIterator> searchShards(
ProjectState projectState,
String[] concreteIndices,
Expand Down
Loading