Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.get;

import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.get.TransportGetFromTranslogAction;
import org.elasticsearch.action.get.TransportGetFromTranslogAction.Response;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

public class GetFromTranslogActionIT extends ESIntegTestCase {
public void testGetFromTranslog() throws Exception {
assertAcked(
prepareCreate("test").setMapping("field1", "type=keyword,store=true")
.setSettings(
Settings.builder()
.put("index.refresh_interval", -1)
// A GetFromTranslogAction runs only Stateless where there is only one active indexing shard.
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
)
.addAlias(new Alias("alias").writeIndex(randomFrom(true, false, null)))
);
ensureGreen();

var response = getFromTranslog(indexOrAlias(), "1");
assertNull(response.getResult());
// There hasn't been any switches from unsafe to safe map
assertThat(response.segmentGeneration(), equalTo(-1L));

var indexResponse = client().prepareIndex("test")
.setId("1")
.setSource("field1", "value1")
.setRefreshPolicy(RefreshPolicy.NONE)
.get();
response = getFromTranslog(indexOrAlias(), "1");
assertNotNull(response.getResult());
assertThat(response.getResult().isExists(), equalTo(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we also assert the real getResult's value? That e.g. it's equal to a document with field1&value1? Similar comment for the similar points below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why that would matter for this case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worth verifying that we are getting the right version of the document from the translog

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assertThat(response.getResult().getVersion(), equalTo(indexResponse.getVersion()));
assertThat(response.segmentGeneration(), equalTo(-1L));
// Get followed by a delete should still return a result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a side note and for future tests, I think that comment like this are very helpful to understand test failures and can be backed into the assertion methods themselves, ie this could be:

assertThat("Get followed by a delete should still return a result", response.getResult(), nonNullValue());

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea!

client().prepareDelete("test", "1").get();
response = getFromTranslog(indexOrAlias(), "1");
assertNotNull("get followed by a delete should still return a result", response.getResult());
assertThat(response.getResult().isExists(), equalTo(false));
assertThat(response.segmentGeneration(), equalTo(-1L));

indexResponse = client().prepareIndex("test").setSource("field1", "value2").get();
response = getFromTranslog(indexOrAlias(), indexResponse.getId());
assertNotNull(response.getResult());
assertThat(response.getResult().isExists(), equalTo(true));
assertThat(response.getResult().getVersion(), equalTo(indexResponse.getVersion()));
assertThat(response.segmentGeneration(), equalTo(-1L));
// After a refresh we should not be able to get from translog
refresh("test");
response = getFromTranslog(indexOrAlias(), indexResponse.getId());
assertNull("after a refresh we should not be able to get from translog", response.getResult());
assertThat(response.segmentGeneration(), equalTo(-1L));
// After two refreshes the LiveVersionMap switches back to append-only and stops tracking IDs
// Refreshing with empty LiveVersionMap doesn't cause the switch, see {@link LiveVersionMap.Maps#shouldInheritSafeAccess()}.
client().prepareIndex("test").setSource("field1", "value3").get();
refresh("test");
refresh("test");
// An optimized index operation marks the maps as unsafe
client().prepareIndex("test").setSource("field1", "value4").get();
response = getFromTranslog(indexOrAlias(), "non-existent");
assertNull(response.getResult());
assertThat(response.segmentGeneration(), greaterThan(0L));
}

private Response getFromTranslog(String index, String id) throws Exception {
var getRequest = client().prepareGet(index, id).request();
var shardRouting = randomFrom(clusterService().state().routingTable().allShards("test"));
var node = clusterService().state().nodes().get(shardRouting.currentNodeId());
assertNotNull(node);
TransportGetFromTranslogAction.Request request = new TransportGetFromTranslogAction.Request(getRequest, shardRouting.shardId());
var transportService = internalCluster().getInstance(TransportService.class);
PlainActionFuture<Response> response = new PlainActionFuture<>();
transportService.sendRequest(
node,
TransportGetFromTranslogAction.NAME,
request,
new ActionListenerResponseHandler<>(response, Response::new, ThreadPool.Names.GET)
);
return response.get();
}

private String indexOrAlias() {
return randomBoolean() ? "test" : "alias";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public TransportGetAction(
);
this.indicesService = indicesService;
this.executorSelector = executorSelector;
// register the internal TransportGetFromTranslogAction
new TransportGetFromTranslogAction(transportService, indicesService, actionFilters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.get;

import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Objects;

// TODO(ES-5727): add a retry mechanism to TransportGetFromTranslogAction
public class TransportGetFromTranslogAction extends HandledTransportAction<
TransportGetFromTranslogAction.Request,
TransportGetFromTranslogAction.Response> {

public static final String NAME = "internal:data/read/get_from_translog";
public static final Logger logger = LogManager.getLogger(TransportGetFromTranslogAction.class);

private final IndicesService indicesService;

@Inject
public TransportGetFromTranslogAction(TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) {
super(NAME, transportService, actionFilters, Request::new, ThreadPool.Names.GET);
this.indicesService = indicesService;
}

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final GetRequest getRequest = request.getRequest();
final ShardId shardId = request.shardId();
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
assert indexShard.routingEntry().isPromotableToPrimary() : "not an indexing shard" + indexShard.routingEntry();
assert getRequest.realtime();
ActionListener.completeWith(listener, () -> {
var result = indexShard.getService()
.getFromTranslog(
getRequest.id(),
getRequest.storedFields(),
getRequest.realtime(),
getRequest.version(),
getRequest.versionType(),
getRequest.fetchSourceContext(),
getRequest.isForceSyntheticSource()
);
long segmentGeneration = -1;
if (result == null) {
Engine engine = indexShard.getEngineOrNull();
if (engine == null) {
throw new AlreadyClosedException("engine closed");
}
segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets();
}
return new Response(result, segmentGeneration);
});
}

public static class Request extends ActionRequest {

private final GetRequest getRequest;
private final ShardId shardId;

public Request(GetRequest getRequest, ShardId shardId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetRequest is a SingleShardRequest and can contain a ShardId, should we check that they are the same?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed if it's the same shardId, then we can re-use the GetRequest as the request type of the TransportGetFromTranslogAction.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that in an initial version. See this comment. The code in SingleShardRequest is a bit old school. It keeps an internalShardId which is mutable and is only used internally. Essentially to reuse it, I'd have to make some internal mutable state of SingleShardRequest public which is not really worth it I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing to the comment. I do agree SingleShardRequest did not aged well.

If I understand the code correctly, the internal shard id of the GetRequest is resolved when the request is executed through the TransportGetAction/TransportSingleShardAction, and TransportGetAction will be modified for realtime gets on stateless to use the new TransportGetFromTranslogAction and the resolved shard id will be passed over the the TransportGetFromTranslogAction.Request.

So we should be fine.

this.getRequest = Objects.requireNonNull(getRequest);
this.shardId = Objects.requireNonNull(shardId);
}

public Request(StreamInput in) throws IOException {
super(in);
getRequest = new GetRequest(in);
shardId = new ShardId(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
getRequest.writeTo(out);
shardId.writeTo(out);
}

public GetRequest getRequest() {
return getRequest;
}

public ShardId shardId() {
return shardId;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public String toString() {
return "GetFromTranslogRequest{" + "getRequest=" + getRequest + ", shardId=" + shardId + "}";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just

Suggested change
return "GetFromTranslogRequest{" + "getRequest=" + getRequest + ", shardId=" + shardId + "}";
return "translog " + getRequest;

as GetRequest.toString already provides useful information?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem to include the shardID though.

}
}

public static class Response extends ActionResponse {
@Nullable
private final GetResult getResult;
private final long segmentGeneration;

public Response(GetResult getResult, long segmentGeneration) {
this.getResult = getResult;
this.segmentGeneration = segmentGeneration;
}

public Response(StreamInput in) throws IOException {
super(in);
segmentGeneration = in.readZLong();
getResult = in.readOptionalWriteable(GetResult::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(segmentGeneration);
out.writeOptionalWriteable(getResult);
}

@Nullable
public GetResult getResult() {
return getResult;
}

/**
* The segment generation that the search shard should wait for before handling the real-time GET request locally.
* -1 if the result is not null (i.e., the result is served from the indexing shard), or there hasn't simply been
* any switches from unsafe to safe map in the LiveVersionMap (see {@link InternalEngine#getVersionFromMap(BytesRef)}).
*/
public long segmentGeneration() {
return segmentGeneration;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o instanceof Response == false) return false;
Response other = (Response) o;
return segmentGeneration == other.segmentGeneration && Objects.equals(getResult, other.getResult);
}

@Override
public int hashCode() {
return Objects.hash(segmentGeneration, getResult);
}

@Override
public String toString() {
return "Response{" + "getResult=" + getResult + ", segmentGeneration=" + segmentGeneration + "}";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.get;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.get.GetResultTests;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;

public class GetFromTranslogResponseSerializationTests extends AbstractWireSerializingTestCase<TransportGetFromTranslogAction.Response> {
@Override
protected Writeable.Reader<TransportGetFromTranslogAction.Response> instanceReader() {
return TransportGetFromTranslogAction.Response::new;
}

@Override
protected TransportGetFromTranslogAction.Response createTestInstance() {
return new TransportGetFromTranslogAction.Response(randomGetResult(), randomSegmentGeneration());
}

@Override
protected TransportGetFromTranslogAction.Response mutateInstance(TransportGetFromTranslogAction.Response instance) throws IOException {
return randomBoolean()
? new TransportGetFromTranslogAction.Response(
instance.getResult(),
randomValueOtherThan(instance.segmentGeneration(), this::randomSegmentGeneration)
)
: new TransportGetFromTranslogAction.Response(
randomValueOtherThan(instance.getResult(), this::randomGetResult),
instance.segmentGeneration()
);
}

private long randomSegmentGeneration() {
return randomBoolean() ? -1L : randomNonNegativeLong();
}

private GetResult randomGetResult() {
return randomBoolean() ? null : GetResultTests.randomGetResult(randomFrom(XContentType.values())).v1();
}
}