diff --git a/server/src/internalClusterTest/java/org/elasticsearch/get/GetFromTranslogActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/get/GetFromTranslogActionIT.java new file mode 100644 index 0000000000000..f0781535d4c32 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/get/GetFromTranslogActionIT.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.get; + +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.get.TransportGetFromTranslogAction; +import org.elasticsearch.action.get.TransportGetFromTranslogAction.Response; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +public class GetFromTranslogActionIT extends ESIntegTestCase { + public void testGetFromTranslog() throws Exception { + assertAcked( + prepareCreate("test").setMapping("field1", "type=keyword,store=true") + .setSettings( + Settings.builder() + .put("index.refresh_interval", -1) + // A GetFromTranslogAction runs only Stateless where there is only one active indexing shard. + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + ) + .addAlias(new Alias("alias").writeIndex(randomFrom(true, false, null))) + ); + ensureGreen(); + + var response = getFromTranslog(indexOrAlias(), "1"); + assertNull(response.getResult()); + // There hasn't been any switches from unsafe to safe map + assertThat(response.segmentGeneration(), equalTo(-1L)); + + var indexResponse = client().prepareIndex("test") + .setId("1") + .setSource("field1", "value1") + .setRefreshPolicy(RefreshPolicy.NONE) + .get(); + response = getFromTranslog(indexOrAlias(), "1"); + assertNotNull(response.getResult()); + assertThat(response.getResult().isExists(), equalTo(true)); + assertThat(response.getResult().getVersion(), equalTo(indexResponse.getVersion())); + assertThat(response.segmentGeneration(), equalTo(-1L)); + // Get followed by a delete should still return a result + client().prepareDelete("test", "1").get(); + response = getFromTranslog(indexOrAlias(), "1"); + assertNotNull("get followed by a delete should still return a result", response.getResult()); + assertThat(response.getResult().isExists(), equalTo(false)); + assertThat(response.segmentGeneration(), equalTo(-1L)); + + indexResponse = client().prepareIndex("test").setSource("field1", "value2").get(); + response = getFromTranslog(indexOrAlias(), indexResponse.getId()); + assertNotNull(response.getResult()); + assertThat(response.getResult().isExists(), equalTo(true)); + assertThat(response.getResult().getVersion(), equalTo(indexResponse.getVersion())); + assertThat(response.segmentGeneration(), equalTo(-1L)); + // After a refresh we should not be able to get from translog + refresh("test"); + response = getFromTranslog(indexOrAlias(), indexResponse.getId()); + assertNull("after a refresh we should not be able to get from translog", response.getResult()); + assertThat(response.segmentGeneration(), equalTo(-1L)); + // After two refreshes the LiveVersionMap switches back to append-only and stops tracking IDs + // Refreshing with empty LiveVersionMap doesn't cause the switch, see {@link LiveVersionMap.Maps#shouldInheritSafeAccess()}. + client().prepareIndex("test").setSource("field1", "value3").get(); + refresh("test"); + refresh("test"); + // An optimized index operation marks the maps as unsafe + client().prepareIndex("test").setSource("field1", "value4").get(); + response = getFromTranslog(indexOrAlias(), "non-existent"); + assertNull(response.getResult()); + assertThat(response.segmentGeneration(), greaterThan(0L)); + } + + private Response getFromTranslog(String index, String id) throws Exception { + var getRequest = client().prepareGet(index, id).request(); + var shardRouting = randomFrom(clusterService().state().routingTable().allShards("test")); + var node = clusterService().state().nodes().get(shardRouting.currentNodeId()); + assertNotNull(node); + TransportGetFromTranslogAction.Request request = new TransportGetFromTranslogAction.Request(getRequest, shardRouting.shardId()); + var transportService = internalCluster().getInstance(TransportService.class); + PlainActionFuture response = new PlainActionFuture<>(); + transportService.sendRequest( + node, + TransportGetFromTranslogAction.NAME, + request, + new ActionListenerResponseHandler<>(response, Response::new, ThreadPool.Names.GET) + ); + return response.get(); + } + + private String indexOrAlias() { + return randomBoolean() ? "test" : "alias"; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index c4ebefdac75fd..ed0a4e8973972 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -59,6 +59,8 @@ public TransportGetAction( ); this.indicesService = indicesService; this.executorSelector = executorSelector; + // register the internal TransportGetFromTranslogAction + new TransportGetFromTranslogAction(transportService, indicesService, actionFilters); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java new file mode 100644 index 0000000000000..c3127c9a2a05d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java @@ -0,0 +1,182 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.get; + +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +// TODO(ES-5727): add a retry mechanism to TransportGetFromTranslogAction +public class TransportGetFromTranslogAction extends HandledTransportAction< + TransportGetFromTranslogAction.Request, + TransportGetFromTranslogAction.Response> { + + public static final String NAME = "internal:data/read/get_from_translog"; + public static final Logger logger = LogManager.getLogger(TransportGetFromTranslogAction.class); + + private final IndicesService indicesService; + + @Inject + public TransportGetFromTranslogAction(TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) { + super(NAME, transportService, actionFilters, Request::new, ThreadPool.Names.GET); + this.indicesService = indicesService; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + final GetRequest getRequest = request.getRequest(); + final ShardId shardId = request.shardId(); + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry(); + assert getRequest.realtime(); + ActionListener.completeWith(listener, () -> { + var result = indexShard.getService() + .getFromTranslog( + getRequest.id(), + getRequest.storedFields(), + getRequest.realtime(), + getRequest.version(), + getRequest.versionType(), + getRequest.fetchSourceContext(), + getRequest.isForceSyntheticSource() + ); + long segmentGeneration = -1; + if (result == null) { + Engine engine = indexShard.getEngineOrNull(); + if (engine == null) { + throw new AlreadyClosedException("engine closed"); + } + segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets(); + } + return new Response(result, segmentGeneration); + }); + } + + public static class Request extends ActionRequest { + + private final GetRequest getRequest; + private final ShardId shardId; + + public Request(GetRequest getRequest, ShardId shardId) { + this.getRequest = Objects.requireNonNull(getRequest); + this.shardId = Objects.requireNonNull(shardId); + } + + public Request(StreamInput in) throws IOException { + super(in); + getRequest = new GetRequest(in); + shardId = new ShardId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + getRequest.writeTo(out); + shardId.writeTo(out); + } + + public GetRequest getRequest() { + return getRequest; + } + + public ShardId shardId() { + return shardId; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public String toString() { + return "GetFromTranslogRequest{" + "getRequest=" + getRequest + ", shardId=" + shardId + "}"; + } + } + + public static class Response extends ActionResponse { + @Nullable + private final GetResult getResult; + private final long segmentGeneration; + + public Response(GetResult getResult, long segmentGeneration) { + this.getResult = getResult; + this.segmentGeneration = segmentGeneration; + } + + public Response(StreamInput in) throws IOException { + super(in); + segmentGeneration = in.readZLong(); + getResult = in.readOptionalWriteable(GetResult::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeZLong(segmentGeneration); + out.writeOptionalWriteable(getResult); + } + + @Nullable + public GetResult getResult() { + return getResult; + } + + /** + * The segment generation that the search shard should wait for before handling the real-time GET request locally. + * -1 if the result is not null (i.e., the result is served from the indexing shard), or there hasn't simply been + * any switches from unsafe to safe map in the LiveVersionMap (see {@link InternalEngine#getVersionFromMap(BytesRef)}). + */ + public long segmentGeneration() { + return segmentGeneration; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o instanceof Response == false) return false; + Response other = (Response) o; + return segmentGeneration == other.segmentGeneration && Objects.equals(getResult, other.getResult); + } + + @Override + public int hashCode() { + return Objects.hash(segmentGeneration, getResult); + } + + @Override + public String toString() { + return "Response{" + "getResult=" + getResult + ", segmentGeneration=" + segmentGeneration + "}"; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/get/GetFromTranslogResponseSerializationTests.java b/server/src/test/java/org/elasticsearch/action/get/GetFromTranslogResponseSerializationTests.java new file mode 100644 index 0000000000000..4c1d3c14f4956 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/get/GetFromTranslogResponseSerializationTests.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.get; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.get.GetResultTests; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; + +public class GetFromTranslogResponseSerializationTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return TransportGetFromTranslogAction.Response::new; + } + + @Override + protected TransportGetFromTranslogAction.Response createTestInstance() { + return new TransportGetFromTranslogAction.Response(randomGetResult(), randomSegmentGeneration()); + } + + @Override + protected TransportGetFromTranslogAction.Response mutateInstance(TransportGetFromTranslogAction.Response instance) throws IOException { + return randomBoolean() + ? new TransportGetFromTranslogAction.Response( + instance.getResult(), + randomValueOtherThan(instance.segmentGeneration(), this::randomSegmentGeneration) + ) + : new TransportGetFromTranslogAction.Response( + randomValueOtherThan(instance.getResult(), this::randomGetResult), + instance.segmentGeneration() + ); + } + + private long randomSegmentGeneration() { + return randomBoolean() ? -1L : randomNonNegativeLong(); + } + + private GetResult randomGetResult() { + return randomBoolean() ? null : GetResultTests.randomGetResult(randomFrom(XContentType.values())).v1(); + } +}