Skip to content

Commit 6ad0e5c

Browse files
authored
Do not use ifSeqNo if doc does not have seq_no(#46198)
This change fixes a bug where we can't partially update documents created in a mixed cluster between 5.x and 6.x. The problem is that those documents have primary terms but not sequence numbers. Hence, we have to fallback using the legacy versioning for updates of those documents.
1 parent cd14683 commit 6ad0e5c

File tree

3 files changed

+45
-8
lines changed

3 files changed

+45
-8
lines changed

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import java.io.IOException;
4040
import java.util.ArrayList;
41+
import java.util.HashMap;
4142
import java.util.List;
4243
import java.util.Map;
4344
import java.util.Objects;
@@ -438,13 +439,30 @@ public void testUpdateDoc() throws Exception {
438439
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
439440
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2);
440441
createIndex(index, settings.build());
442+
indexDocs(index, 0, 100);
441443
}
442-
ensureGreen(index);
443-
indexDocs(index, 0, 10);
444-
for (int i = 0; i < 10; i++) {
445-
Request update = new Request("POST", index + "/test/" + i + "/_update");
446-
update.setJsonEntity("{\"doc\": {\"f\": " + randomNonNegativeLong() + "}}");
447-
client().performRequest(update);
444+
if (randomBoolean()) {
445+
ensureGreen(index);
446+
}
447+
Map<Integer, Long> updates = new HashMap<>();
448+
for (int docId = 0; docId < 100; docId++) {
449+
final int times = randomIntBetween(0, 2);
450+
for (int i = 0; i < times; i++) {
451+
Request update = new Request("POST", index + "/test/" + docId + "/_update");
452+
long value = randomNonNegativeLong();
453+
update.setJsonEntity("{\"doc\": {\"updated_field\": " + value + "}}");
454+
client().performRequest(update);
455+
updates.put(docId, value);
456+
}
457+
}
458+
client().performRequest(new Request("POST", index + "/_refresh"));
459+
for (int docId : updates.keySet()) {
460+
Request get = new Request("GET", index + "/test/" + docId);
461+
Map<String, Object> doc = entityAsMap(client().performRequest(get));
462+
assertThat(XContentMapValues.extractValue("_source.updated_field", doc), equalTo(updates.get(docId)));
463+
}
464+
if (randomBoolean()) {
465+
syncedFlush(index);
448466
}
449467
}
450468

server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,15 @@ public Result prepare(UpdateRequest request, IndexShard indexShard, boolean canU
8585
* noop).
8686
*/
8787
@SuppressWarnings("unchecked")
88-
protected Result prepare(ShardId shardId, UpdateRequest request, boolean canUseIfSeqNo, GetResult getResult, LongSupplier nowInMillis) {
88+
final Result prepare(ShardId shardId, UpdateRequest request, boolean canUseIfSeqNo, GetResult getResult, LongSupplier nowInMillis) {
8989
if (getResult.isExists() == false) {
9090
// If the document didn't exist, execute the update request as an upsert
9191
return prepareUpsert(shardId, request, getResult, nowInMillis);
92-
} else if (getResult.internalSourceRef() == null) {
92+
}
93+
// Documents indexed a mixed cluster between 6.x and 5.x do not have sequence numbers but have primary terms.
94+
// We have to fallback using the legacy versioning for updates of those documents.
95+
canUseIfSeqNo &= getResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
96+
if (getResult.internalSourceRef() == null) {
9397
// no source, we can't do anything, throw a failure...
9498
throw new DocumentSourceMissingException(shardId, request.type(), request.id());
9599
} else if (request.script() == null && request.doc() != null) {

server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,4 +740,19 @@ public void testOldClusterRejectIfSeqNo() {
740740
() -> updateHelper.prepare(request, null, false, ESTestCase::randomNonNegativeLong));
741741
assertThat(error.getMessage(), equalTo("ifSeqNo [" + ifSeqNo + "], ifPrimaryTerm [" + ifPrimaryTerm + "]"));
742742
}
743+
744+
public void testFallbackUsingVersionIfCurrentDocumentDoesNotHaveSeqNo() throws Exception {
745+
ShardId shardId = new ShardId("test", "", 0);
746+
long version = randomNonNegativeLong();
747+
long primaryTerm = randomBoolean() ? UNASSIGNED_PRIMARY_TERM : randomNonNegativeLong();
748+
GetResult getResult = new GetResult("test", "type", "1", UNASSIGNED_SEQ_NO, primaryTerm, version, true,
749+
new BytesArray("{\"body\": \"bar\"}"), null);
750+
UpdateRequest updateRequest = new UpdateRequest("test", "type1", "1").fromXContent(
751+
createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}")));
752+
IndexRequest indexRequest = updateHelper.prepare(
753+
shardId, updateRequest, randomBoolean(), getResult, ESTestCase::randomNonNegativeLong).action();
754+
assertThat(indexRequest.ifSeqNo(), equalTo(UNASSIGNED_SEQ_NO));
755+
assertThat(indexRequest.ifPrimaryTerm(), equalTo(UNASSIGNED_PRIMARY_TERM));
756+
assertThat(indexRequest.version(), equalTo(version));
757+
}
743758
}

0 commit comments

Comments
 (0)