Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,24 @@ public void testRecoveryWithSoftDeletes() throws Exception {
ensureGreen(index);
}

/** Ensure that we can always execute update requests regardless of the version of cluster */
public void testUpdateDoc() throws Exception {
final String index = "test_update_doc";
if (CLUSTER_TYPE == ClusterType.OLD) {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2);
createIndex(index, settings.build());
}
ensureGreen(index);
indexDocs(index, 0, 10);
for (int i = 0; i < 10; i++) {
Request update = new Request("POST", index + "/test/" + i + "/_update");
update.setJsonEntity("{\"doc\": {\"f\": " + randomNonNegativeLong() + "}}");
client().performRequest(update);
}
}

private void syncedFlush(String index) throws Exception {
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void writeTo(StreamOutput out) throws IOException {
} else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) {
assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
"sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " +
"Stream version [" + out.getVersion() + "]");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ public void writeTo(StreamOutput out) throws IOException {
} else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) {
assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
"sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " +
"Stream version [" + out.getVersion() + "]");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
Expand All @@ -41,6 +43,7 @@
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.ParentFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.script.Script;
Expand All @@ -52,6 +55,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;

/**
Expand All @@ -62,15 +66,24 @@ public class UpdateHelper {
private static final Logger logger = LogManager.getLogger(UpdateHelper.class);

private final ScriptService scriptService;
private final BooleanSupplier canUseIfSeqNo;

public UpdateHelper(ScriptService scriptService) {
public UpdateHelper(ScriptService scriptService, ClusterService clusterService) {
this(scriptService, () -> clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_6_0));
}

UpdateHelper(ScriptService scriptService, BooleanSupplier canUseIfSeqNo) {
this.scriptService = scriptService;
this.canUseIfSeqNo = canUseIfSeqNo;
}

/**
* Prepares an update request by converting it into an index or delete request or an update response (no action).
*/
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {
if (canUseIfSeqNo.getAsBoolean() == false) {
ensureIfSeqNoNotProvided(request.ifSeqNo(), request.ifPrimaryTerm());
}
final GetResult getResult = indexShard.getService().getForUpdate(
request.type(), request.id(), request.version(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm());
return prepare(indexShard.shardId(), request, getResult, nowInMillis);
Expand Down Expand Up @@ -165,6 +178,19 @@ Result prepareUpsert(ShardId shardId, UpdateRequest request, final GetResult get
return new Result(indexRequest, DocWriteResponse.Result.CREATED, null, null);
}

/**
* Calculate the version to use for the update request, using either the existing version if internal versioning is used, or the get
* result document's version if the version type is "FORCE".
*/
static long calculateUpdateVersion(UpdateRequest request, GetResult getResult) {
if (request.versionType() != VersionType.INTERNAL) {
assert request.versionType() == VersionType.FORCE;
return request.version(); // remember, match_any is excluded by the conflict test
} else {
return getResult.getVersion();
}
}

/**
* Calculate a routing value to be used, either the included index request's routing, or retrieved document's routing when defined.
*/
Expand Down Expand Up @@ -219,9 +245,13 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
final IndexRequest finalIndexRequest = Requests.indexRequest(request.index())
.type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(updatedSourceAsMap, updateSourceContentType)
.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm())
.waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout())
.setRefreshPolicy(request.getRefreshPolicy());
if (canUseIfSeqNo.getAsBoolean()) {
finalIndexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm());
} else {
finalIndexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType());
}
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
}
}
Expand Down Expand Up @@ -261,16 +291,24 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
final IndexRequest indexRequest = Requests.indexRequest(request.index())
.type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(updatedSourceAsMap, updateSourceContentType)
.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm())
.waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout())
.setRefreshPolicy(request.getRefreshPolicy());
if (canUseIfSeqNo.getAsBoolean()) {
indexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm());
} else {
indexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType());
}
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
case DELETE:
DeleteRequest deleteRequest = Requests.deleteRequest(request.index())
.type(request.type()).id(request.id()).routing(routing).parent(parent)
.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm())
.waitForActiveShards(request.waitForActiveShards())
.timeout(request.timeout()).setRefreshPolicy(request.getRefreshPolicy());
if (canUseIfSeqNo.getAsBoolean()) {
deleteRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm());
} else {
deleteRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType());
}
return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType);
default:
// If it was neither an INDEX or DELETE operation, treat it as a noop
Expand Down Expand Up @@ -354,6 +392,14 @@ public static GetResult extractGetResult(final UpdateRequest request, String con
sourceRequested ? sourceFilteredAsBytes : null, fields);
}

private void ensureIfSeqNoNotProvided(long ifSeqNo, long ifPrimaryTerm) {
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher.");
}
}

public static class Result {

private final Streamable action;
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ protected Node(
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptModule.getScriptService()));
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptModule.getScriptService(), clusterService));
b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -53,11 +54,13 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.script.MockScriptEngine.mockInlineScript;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
Expand All @@ -74,6 +77,7 @@
public class UpdateRequestTests extends ESTestCase {

private UpdateHelper updateHelper;
private final AtomicBoolean canUseIfSeqNo = new AtomicBoolean(true);

@Override
@Before
Expand Down Expand Up @@ -139,7 +143,7 @@ public void setUp() throws Exception {
final MockScriptEngine engine = new MockScriptEngine("mock", scripts, Collections.emptyMap());
Map<String, ScriptEngine> engines = Collections.singletonMap(engine.getType(), engine);
ScriptService scriptService = new ScriptService(baseSettings, engines, ScriptModule.CORE_CONTEXTS);
updateHelper = new UpdateHelper(scriptService);
updateHelper = new UpdateHelper(scriptService, canUseIfSeqNo::get);
}

public void testFromXContent() throws Exception {
Expand Down Expand Up @@ -696,6 +700,39 @@ public void testUpdateScript() throws Exception {
assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.NOOP));
}

public void testOldClusterFallbackToUseVersion() throws Exception {
ShardId shardId = new ShardId("test", "", 0);
long version = randomNonNegativeLong();
long seqNo = randomNonNegativeLong();
long primaryTerm = randomNonNegativeLong();
GetResult getResult = new GetResult("test", "type", "1", seqNo, primaryTerm, version, true,
new BytesArray("{\"body\": \"bar\"}"), null);
UpdateRequest request = new UpdateRequest("test", "type1", "1").fromXContent(
createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}")));

canUseIfSeqNo.set(false);
IndexRequest updateUsingVersion = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action();
assertThat(updateUsingVersion.ifSeqNo(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(updateUsingVersion.ifPrimaryTerm(), equalTo(UNASSIGNED_PRIMARY_TERM));
assertThat(updateUsingVersion.version(), equalTo(version));

canUseIfSeqNo.set(true);
IndexRequest updateUsingSeqNo = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action();
assertThat(updateUsingSeqNo.ifSeqNo(), equalTo(seqNo));
assertThat(updateUsingSeqNo.ifPrimaryTerm(), equalTo(primaryTerm));
assertThat(updateUsingSeqNo.version(), equalTo(Versions.MATCH_ANY));
}

public void testOldClusterRejectIfSeqNo() {
canUseIfSeqNo.set(false);
long ifSeqNo = randomNonNegativeLong();
long ifPrimaryTerm = randomNonNegativeLong();
UpdateRequest request = new UpdateRequest("test", "type1", "1").setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm);
AssertionError error = expectThrows(AssertionError.class,
() -> updateHelper.prepare(request, null, ESTestCase::randomNonNegativeLong));
assertThat(error.getMessage(), equalTo("setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]"));
}

public void testToString() throws IOException {
UpdateRequest request = new UpdateRequest("test", "type1", "1")
.script(mockInlineScript("ctx._source.body = \"foo\""));
Expand Down