Skip to content

Commit e61c54c

Browse files
committed
Engine - Do not store operations that are not index into lucene in the translog (5.x only) (#25592)
When a replica processes out of order operations, it can drop some due to version comparisons. In the past that would have resulted in a VersionConflictException being thrown and ignored higher up. We changed this to have a cleaner flow that doesn't use exceptions. However, when backporting that change from master, we also back ported a change that isn't good for 5.x: we started storing these out of order ops in the translog. This is needed for the sequence number push, which also gives us some mechanism to deal with it later on during recovery. With the seq# this is not needed and can lead to deletes being lost (see the added test `testRecoverFromStoreWithOutOfOrderDelete` which fails without the fix). Note that master also suffers from a similar issue but we will be pursuing a different solution there (still under discussion).
1 parent 9584011 commit e61c54c

File tree

4 files changed

+60
-31
lines changed

4 files changed

+60
-31
lines changed

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq
391391
if (!TransportActions.isShardNotAvailableException(failure)) {
392392
throw failure;
393393
}
394-
} else {
394+
} else if (operationResult.getTranslogLocation() != null) { // out of order ops are not added to the translog
395395
location = locationToSync(location, operationResult.getTranslogLocation());
396396
}
397397
} catch (Exception e) {

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -504,9 +504,11 @@ public IndexResult index(Index index) throws IOException {
504504
} else if (plan.indexIntoLucene) {
505505
indexResult = indexIntoLucene(index, plan);
506506
} else {
507+
assert index.origin() != Operation.Origin.PRIMARY;
507508
indexResult = new IndexResult(plan.versionForIndexing, plan.currentNotFoundOrDeleted);
508509
}
509510
if (indexResult.hasFailure() == false &&
511+
plan.indexIntoLucene && // if we didn't store it in lucene, there is no need to store it in the translog
510512
index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
511513
Translog.Location location =
512514
translog.add(new Translog.Index(index, indexResult));
@@ -543,7 +545,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
543545
// a delete state and return false for the created flag in favor of code simplicity
544546
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
545547
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
546-
plan = IndexingStrategy.processButSkipLucene(false, index.version());
548+
plan = IndexingStrategy.skipAsStale(false, index.version());
547549
} else {
548550
plan = IndexingStrategy.processNormally(
549551
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version()
@@ -706,7 +708,7 @@ static IndexingStrategy overrideExistingAsIfNotThere(long versionForIndexing) {
706708
return new IndexingStrategy(true, true, true, versionForIndexing, null);
707709
}
708710

709-
static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) {
711+
static IndexingStrategy skipAsStale(boolean currentNotFoundOrDeleted, long versionForIndexing) {
710712
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, versionForIndexing, null);
711713
}
712714
}
@@ -760,9 +762,11 @@ public DeleteResult delete(Delete delete) throws IOException {
760762
} else if (plan.deleteFromLucene) {
761763
deleteResult = deleteInLucene(delete, plan);
762764
} else {
765+
assert delete.origin() != Operation.Origin.PRIMARY;
763766
deleteResult = new DeleteResult(plan.versionOfDeletion, plan.currentlyDeleted == false);
764767
}
765768
if (!deleteResult.hasFailure() &&
769+
plan.deleteFromLucene && // if it wasn't applied to lucene, we don't store it in the translog
766770
delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
767771
Translog.Location location =
768772
translog.add(new Translog.Delete(delete, deleteResult));

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2528,8 +2528,8 @@ public BytesRef binaryValue() {
25282528
public void testDoubleDeliveryPrimary() throws IOException {
25292529
final ParsedDocument doc = testParsedDocument("1", "test", null, System.currentTimeMillis(), -1L,
25302530
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
2531-
Engine.Index operation = appendOnlyPrimary(doc, false, 1);
2532-
Engine.Index retry = appendOnlyPrimary(doc, true, 1);
2531+
final Engine.Index operation = appendOnlyPrimary(doc, false, 1);
2532+
final Engine.Index retry = appendOnlyPrimary(doc, true, 1);
25332533
if (randomBoolean()) {
25342534
Engine.IndexResult indexResult = engine.index(operation);
25352535
assertFalse(engine.indexWriterHasDeletions());
@@ -2557,8 +2557,6 @@ public void testDoubleDeliveryPrimary() throws IOException {
25572557
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
25582558
assertEquals(1, topDocs.totalHits);
25592559
}
2560-
operation = randomAppendOnly(doc, false, 1);
2561-
retry = randomAppendOnly(doc, true, 1);
25622560
if (randomBoolean()) {
25632561
Engine.IndexResult indexResult = engine.index(operation);
25642562
assertNotNull(indexResult.getTranslogLocation());
@@ -2569,7 +2567,7 @@ public void testDoubleDeliveryPrimary() throws IOException {
25692567
Engine.IndexResult retryResult = engine.index(retry);
25702568
assertNotNull(retryResult.getTranslogLocation());
25712569
Engine.IndexResult indexResult = engine.index(operation);
2572-
assertNotNull(retryResult.getTranslogLocation());
2570+
assertNotNull(indexResult.getTranslogLocation());
25732571
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
25742572
}
25752573

@@ -2583,8 +2581,8 @@ public void testDoubleDeliveryPrimary() throws IOException {
25832581
public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
25842582
final ParsedDocument doc = testParsedDocument("1", "test", null, System.currentTimeMillis(), -1,
25852583
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
2586-
Engine.Index operation = appendOnlyReplica(doc, false, 1);
2587-
Engine.Index retry = appendOnlyReplica(doc, true, 1);
2584+
final Engine.Index operation = appendOnlyReplica(doc, false, 1);
2585+
final Engine.Index retry = appendOnlyReplica(doc, true, 1);
25882586
if (randomBoolean()) {
25892587
Engine.IndexResult indexResult = engine.index(operation);
25902588
assertFalse(engine.indexWriterHasDeletions());
@@ -2593,8 +2591,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
25932591
Engine.IndexResult retryResult = engine.index(retry);
25942592
assertFalse(engine.indexWriterHasDeletions());
25952593
assertEquals(1, engine.getNumVersionLookups());
2596-
assertNotNull(retryResult.getTranslogLocation());
2597-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
2594+
assertNull(retryResult.getTranslogLocation()); // we didn't index it nor put it in the translog
25982595
} else {
25992596
Engine.IndexResult retryResult = engine.index(retry);
26002597
assertFalse(engine.indexWriterHasDeletions());
@@ -2603,29 +2600,24 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
26032600
Engine.IndexResult indexResult = engine.index(operation);
26042601
assertFalse(engine.indexWriterHasDeletions());
26052602
assertEquals(2, engine.getNumVersionLookups());
2606-
assertNotNull(retryResult.getTranslogLocation());
2607-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
2603+
assertNull(indexResult.getTranslogLocation()); // we didn't index it nor put it in the translog
26082604
}
26092605

26102606
engine.refresh("test");
26112607
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
26122608
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
26132609
assertEquals(1, topDocs.totalHits);
26142610
}
2615-
operation = randomAppendOnly(doc, false, 1);
2616-
retry = randomAppendOnly(doc, true, 1);
26172611
if (randomBoolean()) {
26182612
Engine.IndexResult indexResult = engine.index(operation);
2619-
assertNotNull(indexResult.getTranslogLocation());
2613+
assertNull(indexResult.getTranslogLocation()); // we don't index because a retry has already been processed.
26202614
Engine.IndexResult retryResult = engine.index(retry);
2621-
assertNotNull(retryResult.getTranslogLocation());
2622-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
2615+
assertNull(retryResult.getTranslogLocation());
26232616
} else {
26242617
Engine.IndexResult retryResult = engine.index(retry);
2625-
assertNotNull(retryResult.getTranslogLocation());
2618+
assertNull(retryResult.getTranslogLocation());
26262619
Engine.IndexResult indexResult = engine.index(operation);
2627-
assertNotNull(retryResult.getTranslogLocation());
2628-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
2620+
assertNull(indexResult.getTranslogLocation());
26292621
}
26302622

26312623
engine.refresh("test");
@@ -2651,8 +2643,7 @@ public void testDoubleDeliveryReplica() throws IOException {
26512643
Engine.IndexResult retryResult = engine.index(duplicate);
26522644
assertFalse(engine.indexWriterHasDeletions());
26532645
assertEquals(2, engine.getNumVersionLookups());
2654-
assertNotNull(retryResult.getTranslogLocation());
2655-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
2646+
assertNull(retryResult.getTranslogLocation());
26562647
} else {
26572648
Engine.IndexResult retryResult = engine.index(duplicate);
26582649
assertFalse(engine.indexWriterHasDeletions());
@@ -2664,8 +2655,7 @@ public void testDoubleDeliveryReplica() throws IOException {
26642655
Engine.IndexResult indexResult = engine.index(operation);
26652656
assertFalse(engine.indexWriterHasDeletions());
26662657
assertEquals(2, engine.getNumVersionLookups());
2667-
assertNotNull(retryResult.getTranslogLocation());
2668-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
2658+
assertNull(indexResult.getTranslogLocation()); // we didn't index, no need to put in translog
26692659
}
26702660

26712661
engine.refresh("test");

core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.action.admin.indices.stats.CommonStats;
3939
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
4040
import org.elasticsearch.action.admin.indices.stats.ShardStats;
41+
import org.elasticsearch.action.index.IndexRequest;
4142
import org.elasticsearch.action.support.PlainActionFuture;
4243
import org.elasticsearch.cluster.metadata.IndexMetaData;
4344
import org.elasticsearch.cluster.metadata.MappingMetaData;
@@ -77,6 +78,7 @@
7778
import org.elasticsearch.index.mapper.Mapping;
7879
import org.elasticsearch.index.mapper.ParseContext;
7980
import org.elasticsearch.index.mapper.ParsedDocument;
81+
import org.elasticsearch.index.mapper.SourceToParse;
8082
import org.elasticsearch.index.mapper.Uid;
8183
import org.elasticsearch.index.mapper.UidFieldMapper;
8284
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@@ -123,6 +125,7 @@
123125

124126
import static java.util.Collections.emptyMap;
125127
import static java.util.Collections.emptySet;
128+
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
126129
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
127130
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
128131
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -281,14 +284,14 @@ public void testOperationLocksOnPrimaryShards() throws InterruptedException, Exe
281284

282285
if (randomBoolean()) {
283286
// relocation target
284-
indexShard = newShard(TestShardRouting.newShardRouting(shardId, "local_node", "other node",
287+
indexShard = newShard(newShardRouting(shardId, "local_node", "other node",
285288
true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing())));
286289
} else if (randomBoolean()) {
287290
// simulate promotion
288291
indexShard = newStartedShard(false);
289292
ShardRouting replicaRouting = indexShard.routingEntry();
290293
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
291-
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
294+
ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
292295
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
293296
indexShard.updateRoutingEntry(primaryRouting);
294297
} else {
@@ -340,7 +343,7 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe
340343
case 1: {
341344
// initializing replica / primary
342345
final boolean relocating = randomBoolean();
343-
ShardRouting routing = TestShardRouting.newShardRouting(shardId, "local_node",
346+
ShardRouting routing = newShardRouting(shardId, "local_node",
344347
relocating ? "sourceNode" : null,
345348
relocating ? randomBoolean() : false,
346349
ShardRoutingState.INITIALIZING,
@@ -352,7 +355,7 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe
352355
// relocation source
353356
indexShard = newStartedShard(true);
354357
ShardRouting routing = indexShard.routingEntry();
355-
routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
358+
routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
356359
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
357360
indexShard.updateRoutingEntry(routing);
358361
indexShard.relocated("test");
@@ -892,6 +895,38 @@ public void testRecoverFromStore() throws IOException {
892895
closeShards(newShard);
893896
}
894897

898+
public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
899+
final IndexShard shard = newStartedShard(false);
900+
final Engine.Index index = shard.prepareIndexOnReplica(
901+
SourceToParse.source(SourceToParse.Origin.REPLICA, shard.shardId().getIndexName(), "type", "id", new BytesArray("{}"),
902+
XContentType.JSON), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
903+
final Engine.Delete delete = shard.prepareDeleteOnReplica("type", "id", 2, VersionType.EXTERNAL);
904+
shard.delete(delete);
905+
final int translogOps;
906+
if (randomBoolean()) {
907+
flushShard(shard, true); // lucene won't flush due to just one pending delete
908+
translogOps = 0;
909+
} else {
910+
translogOps = 1;
911+
}
912+
final Engine.IndexResult result = shard.index(index);
913+
assertThat(result.getTranslogLocation(), nullValue());
914+
final ShardRouting replicaRouting = shard.routingEntry();
915+
IndexShard newShard = reinitShard(shard,
916+
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
917+
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE));
918+
DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT);
919+
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
920+
assertTrue(newShard.recoverFromStore());
921+
assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations());
922+
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
923+
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
924+
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
925+
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
926+
assertDocCount(newShard, 0);
927+
closeShards(newShard);
928+
}
929+
895930
public void testRecoverFromCleanStore() throws IOException {
896931
final IndexShard shard = newStartedShard(true);
897932
indexDoc(shard, "test", "0");
@@ -1314,7 +1349,7 @@ public void testRecoverFromLocalShard() throws IOException {
13141349
sourceShard.refresh("test");
13151350

13161351

1317-
ShardRouting targetRouting = TestShardRouting.newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true,
1352+
ShardRouting targetRouting = newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true,
13181353
ShardRoutingState.INITIALIZING, RecoverySource.LocalShardsRecoverySource.INSTANCE);
13191354

13201355
final IndexShard targetShard;

0 commit comments

Comments
 (0)