diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mtermvectors/30_routing.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mtermvectors/30_routing.yml index ba88ed8104f3e..8f9e3560ab1c0 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mtermvectors/30_routing.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mtermvectors/30_routing.yml @@ -7,11 +7,10 @@ routing: settings: index: number_of_shards: 5 - number_of_replicas: 0 - do: cluster.health: - wait_for_status: green + wait_for_no_initializing_shards: true - do: index: @@ -52,11 +51,14 @@ requires routing: settings: index: number_of_shards: 5 - number_of_replicas: 0 mappings: _routing: required: true + - do: + cluster.health: + wait_for_no_initializing_shards: true + - do: index: index: test_1 diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/20_issue7121.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/20_issue7121.yml index 6b03428332932..099eaddc909ef 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/20_issue7121.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/20_issue7121.yml @@ -7,7 +7,6 @@ index: translog.flush_threshold_size: "512MB" number_of_shards: 1 - number_of_replicas: 0 refresh_interval: -1 mappings: properties: @@ -17,7 +16,7 @@ - do: cluster.health: - wait_for_status: green + wait_for_no_initializing_shards: true - do: index: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/30_realtime.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/30_realtime.yml index cc2272f813f32..f89f141288c21 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/30_realtime.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/30_realtime.yml @@ -7,11 +7,10 @@ settings: index: refresh_interval: -1 - number_of_replicas: 0 - do: - cluster.health: - wait_for_status: green + cluster.health: + wait_for_no_initializing_shards: true - do: index: diff --git a/server/src/main/java/org/elasticsearch/action/termvectors/EnsureDocsSearchableAction.java b/server/src/main/java/org/elasticsearch/action/termvectors/EnsureDocsSearchableAction.java new file mode 100644 index 0000000000000..1cdab19ffb03f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/termvectors/EnsureDocsSearchableAction.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + * + * This file was contributed to by generative AI + */ + +package org.elasticsearch.action.termvectors; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * This action is used in serverless to ensure that documents are searchable on the search tier before processing + * term vector requests. It is an intermediate action that is executed on the indexing node and responds + * with a no-op (the search node can proceed to process the term vector request). The action may trigger an external refresh + * to ensure the search shards are up to date before returning the no-op. + */ +public class EnsureDocsSearchableAction { + + private static final String ACTION_NAME = "internal:index/data/read/eds"; + public static final ActionType TYPE = new ActionType<>(ACTION_NAME); + public static final String ENSURE_DOCS_SEARCHABLE_ORIGIN = "ensure_docs_searchable"; + + public static final class EnsureDocsSearchableRequest extends SingleShardRequest { + + private int shardId; // this is not serialized over the wire, and will be 0 on the other end of the wire. + private String[] docIds; + + public EnsureDocsSearchableRequest() {} + + public EnsureDocsSearchableRequest(StreamInput in) throws IOException { + super(in); + docIds = in.readStringArray(); + } + + @Override + public ActionRequestValidationException validate() { + return super.validateNonNullIndex(); + } + + public EnsureDocsSearchableRequest(String index, int shardId, String[] docIds) { + super(index); + this.shardId = shardId; + this.docIds = docIds; + } + + public int shardId() { + return this.shardId; + } + + public String[] docIds() { + return docIds; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(docIds); + } + + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java b/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java index f7b54388dfcd6..e6ad2c9d3bdb5 100644 --- a/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java +++ b/server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java @@ -5,16 +5,21 @@ * Public License v 1"; you may not use this file except in compliance with, at * your election, the "Elastic License 2.0", the "GNU Affero General Public * License v3.0 only", or the "Server Side Public License, v 1". + * + * This file was contributed to by generative AI */ package org.elasticsearch.action.termvectors; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; @@ -28,13 +33,18 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; +import java.util.List; + import static org.elasticsearch.core.Strings.format; public class TransportShardMultiTermsVectorAction extends TransportSingleShardAction< MultiTermVectorsShardRequest, MultiTermVectorsShardResponse> { + private final NodeClient client; private final IndicesService indicesService; + private final boolean stateless; private static final String ACTION_NAME = MultiTermVectorsAction.NAME + "[shard]"; public static final ActionType TYPE = new ActionType<>(ACTION_NAME); @@ -42,6 +52,7 @@ public class TransportShardMultiTermsVectorAction extends TransportSingleShardAc @Inject public TransportShardMultiTermsVectorAction( ClusterService clusterService, + NodeClient client, TransportService transportService, IndicesService indicesService, ThreadPool threadPool, @@ -60,7 +71,9 @@ public TransportShardMultiTermsVectorAction( MultiTermVectorsShardRequest::new, threadPool.executor(ThreadPool.Names.GET) ); + this.client = client; this.indicesService = indicesService; + this.stateless = DiscoveryNode.isStateless(clusterService.getSettings()); } @Override @@ -80,9 +93,44 @@ protected boolean resolveIndex(MultiTermVectorsShardRequest request) { @Override protected ShardIterator shards(ProjectState project, InternalRequest request) { - ShardIterator shards = clusterService.operationRouting() + ShardIterator iterator = clusterService.operationRouting() .getShards(project, request.concreteIndex(), request.request().shardId(), request.request().preference()); - return clusterService.operationRouting().useOnlyPromotableShardsForStateless(shards); + if (iterator == null) { + // We return an empty iterator to avoid hitting an indexing node in serverless (e.g., if there are no search nodes available). + return new ShardIterator(null, List.of()); + } + return ShardIterator.allSearchableShards(iterator); + } + + @Override + protected void asyncShardOperation( + MultiTermVectorsShardRequest request, + ShardId shardId, + ActionListener listener + ) throws IOException { + if (stateless) { + final String[] realTimeIds = request.requests.stream() + .filter(r -> r.realtime()) + .map(TermVectorsRequest::id) + .toArray(String[]::new); + if (realTimeIds.length > 0) { + final var ensureDocsSearchableRequest = new EnsureDocsSearchableAction.EnsureDocsSearchableRequest( + request.index(), + shardId.id(), + realTimeIds + ); + ensureDocsSearchableRequest.setParentTask(clusterService.localNode().getId(), request.getParentTask().getId()); + client.executeLocally( + EnsureDocsSearchableAction.TYPE, + ensureDocsSearchableRequest, + listener.delegateFailureAndWrap((l, r) -> super.asyncShardOperation(request, shardId, l)) + ); + } else { + super.asyncShardOperation(request, shardId, listener); + } + } else { + super.asyncShardOperation(request, shardId, listener); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index 8191d0b79cedc..299458ca728c0 100644 --- a/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -5,6 +5,8 @@ * Public License v 1"; you may not use this file except in compliance with, at * your election, the "Elastic License 2.0", the "GNU Affero General Public * License v3.0 only", or the "Server Side Public License, v 1". + * + * This file was contributed to by generative AI */ package org.elasticsearch.action.termvectors; @@ -12,8 +14,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; @@ -28,17 +32,21 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.List; /** * Performs the get operation. */ public class TransportTermVectorsAction extends TransportSingleShardAction { + private final NodeClient client; private final IndicesService indicesService; + private final boolean stateless; @Inject public TransportTermVectorsAction( ClusterService clusterService, + NodeClient client, TransportService transportService, IndicesService indicesService, ThreadPool threadPool, @@ -57,7 +65,9 @@ public TransportTermVectorsAction( TermVectorsRequest::new, threadPool.executor(ThreadPool.Names.GET) ); + this.client = client; this.indicesService = indicesService; + this.stateless = DiscoveryNode.isStateless(clusterService.getSettings()); } @Override @@ -69,21 +79,19 @@ protected ShardIterator shards(ProjectState project, InternalRequest request) { .getFirst(); } - return operationRouting.useOnlyPromotableShardsForStateless( - operationRouting.getShards( - + ShardIterator iterator = clusterService.operationRouting() + .getShards( project, - request.concreteIndex(), - request.request().id(), - request.request().routing(), - request.request().preference() - - ) - ); + ); + if (iterator == null) { + // We return an empty iterator to avoid hitting an indexing node in serverless (e.g., if there are no search nodes available). + return new ShardIterator(null, List.of()); + } + return ShardIterator.allSearchableShards(iterator); } @Override @@ -103,7 +111,22 @@ protected void asyncShardOperation(TermVectorsRequest request, ShardId shardId, IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles - super.asyncShardOperation(request, shardId, listener); + if (stateless) { + // Ensure that the document is searchable before we execute the term vectors request + final var ensureDocsSearchableRequest = new EnsureDocsSearchableAction.EnsureDocsSearchableRequest( + request.index(), + shardId.id(), + new String[] { request.id() } + ); + ensureDocsSearchableRequest.setParentTask(clusterService.localNode().getId(), request.getParentTask().getId()); + client.executeLocally( + EnsureDocsSearchableAction.TYPE, + ensureDocsSearchableRequest, + listener.delegateFailureAndWrap((l, r) -> super.asyncShardOperation(request, shardId, l)) + ); + } else { + super.asyncShardOperation(request, shardId, listener); + } } else { indexShard.ensureShardSearchActive(b -> { try { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 978e6c19566b1..be4e4bdc94878 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; @@ -42,11 +41,9 @@ public class OperationRouting { ); private boolean useAdaptiveReplicaSelection; - private final boolean isStateless; @SuppressWarnings("this-escape") public OperationRouting(Settings settings, ClusterSettings clusterSettings) { - this.isStateless = DiscoveryNode.isStateless(settings); this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); } @@ -78,19 +75,6 @@ public ShardIterator getShards(ProjectState projectState, String index, int shar return preferenceActiveShardIterator(indexShard, nodes.getLocalNodeId(), nodes, preference, null, null); } - public ShardIterator useOnlyPromotableShardsForStateless(ShardIterator shards) { - // If it is stateless, only route promotable shards. This is a temporary workaround until a more cohesive solution can be - // implemented for search shards. - if (isStateless && shards != null) { - return new ShardIterator( - shards.shardId(), - shards.getShardRoutings().stream().filter(ShardRouting::isPromotableToPrimary).collect(Collectors.toList()) - ); - } else { - return shards; - } - } - public List searchShards( ProjectState projectState, String[] concreteIndices, diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 1fdd1b52cfd51..6aad44eae0335 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -915,6 +915,17 @@ protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher, boo } } + /** + * Whether the document is in the live version map or not. + * + * This is used in stateless so that the {@link org.elasticsearch.action.termvectors.EnsureDocsSearchableAction} can + * judge whether a requested document needs to be refreshed to the search shards before executing the term vector + * information API on the search shards. + */ + public boolean isDocumentInLiveVersionMap(BytesRef uid) { + return false; + } + public abstract GetResult get( Get get, MappingLookup mappingLookup, diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1ca795f69257f..48c56a73f17ef 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -848,6 +848,14 @@ private GetResult getFromTranslog( return getFromSearcher(get, wrappedSearcher, true); } + @Override + public boolean isDocumentInLiveVersionMap(BytesRef uid) { + try (Releasable ignore = versionMap.acquireLock(uid)) { + final var versionValue = getVersionFromMap(uid); + return versionValue != null; + } + } + @Override public GetResult get( Get get, diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index 481c59aa25b60..923c03adce6ef 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -22,6 +22,7 @@ import static org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction.TASKS_ORIGIN; import static org.elasticsearch.action.bulk.TransportBulkAction.LAZY_ROLLOVER_ORIGIN; import static org.elasticsearch.action.support.replication.PostWriteRefresh.POST_WRITE_REFRESH_ORIGIN; +import static org.elasticsearch.action.termvectors.EnsureDocsSearchableAction.ENSURE_DOCS_SEARCHABLE_ORIGIN; import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN; import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN; import static org.elasticsearch.persistent.PersistentTasksService.PERSISTENT_TASK_ORIGIN; @@ -132,6 +133,7 @@ public static void switchUserBasedOnActionOriginAndExecute( case SECURITY_PROFILE_ORIGIN: securityContext.executeAsInternalUser(InternalUsers.SECURITY_PROFILE_USER, version, consumer); break; + case ENSURE_DOCS_SEARCHABLE_ORIGIN: case POST_WRITE_REFRESH_ORIGIN: securityContext.executeAsInternalUser(InternalUsers.STORAGE_USER, version, consumer); break;