Skip to content

Commit 8dcb1f5

Browse files
authored
Initialize max unsafe auto ID timestamp on shrink
When shrinking an index we initialize its max unsafe auto ID timestamp to the maximum of the max unsafe auto ID timestamps on the source shards. Relates #25356
1 parent d963882 commit 8dcb1f5

File tree

4 files changed

+29
-9
lines changed

4 files changed

+29
-9
lines changed

core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.cluster.metadata.IndexMetaData;
2929
import org.elasticsearch.index.Index;
3030
import org.elasticsearch.index.engine.Engine;
31+
import org.elasticsearch.index.engine.InternalEngine;
3132
import org.elasticsearch.index.store.Store;
3233

3334
import java.io.Closeable;
@@ -64,6 +65,10 @@ long maxSeqNo() {
6465
return shard.getEngine().seqNoService().getMaxSeqNo();
6566
}
6667

68+
long maxUnsafeAutoIdTimestamp() {
69+
return Long.parseLong(shard.getEngine().commitStats().getUserData().get(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID));
70+
}
71+
6772
Directory getSnapshotDirectory() {
6873
/* this directory will not be used for anything else but reading / copying files to another directory
6974
* we prevent all write operations on this directory with UOE - nobody should close it either. */

core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.common.unit.TimeValue;
4141
import org.elasticsearch.index.Index;
4242
import org.elasticsearch.index.engine.EngineException;
43+
import org.elasticsearch.index.engine.InternalEngine;
4344
import org.elasticsearch.index.mapper.MapperService;
4445
import org.elasticsearch.index.seqno.SequenceNumbers;
4546
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
@@ -50,7 +51,6 @@
5051

5152
import java.io.IOException;
5253
import java.util.Arrays;
53-
import java.util.Collections;
5454
import java.util.HashMap;
5555
import java.util.List;
5656
import java.util.Set;
@@ -120,7 +120,9 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
120120
final Directory directory = indexShard.store().directory(); // don't close this directory!!
121121
final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new);
122122
final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong();
123-
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo);
123+
final long maxUnsafeAutoIdTimestamp =
124+
shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong();
125+
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp);
124126
internalRecoverFromStore(indexShard);
125127
// just trigger a merge to do housekeeping on the
126128
// copied segments - we will also see them in stats etc.
@@ -139,7 +141,8 @@ void addIndices(
139141
final Directory target,
140142
final Sort indexSort,
141143
final Directory[] sources,
142-
final long maxSeqNo) throws IOException {
144+
final long maxSeqNo,
145+
final long maxUnsafeAutoIdTimestamp) throws IOException {
143146
final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
144147
IndexWriterConfig iwc = new IndexWriterConfig(null)
145148
.setCommitOnClose(false)
@@ -162,6 +165,7 @@ void addIndices(
162165
final HashMap<String, String> liveCommitData = new HashMap<>(2);
163166
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
164167
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
168+
liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
165169
return liveCommitData.entrySet().iterator();
166170
});
167171
writer.commit();

core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
2929
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
3030
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
31+
import org.elasticsearch.action.admin.indices.stats.CommonStats;
3132
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
3233
import org.elasticsearch.action.admin.indices.stats.ShardStats;
3334
import org.elasticsearch.action.index.IndexRequest;
@@ -46,6 +47,7 @@
4647
import org.elasticsearch.common.xcontent.XContentType;
4748
import org.elasticsearch.index.Index;
4849
import org.elasticsearch.index.IndexService;
50+
import org.elasticsearch.index.engine.SegmentsStats;
4951
import org.elasticsearch.index.query.TermsQueryBuilder;
5052
import org.elasticsearch.index.seqno.SeqNoStats;
5153
import org.elasticsearch.index.shard.IndexShard;
@@ -233,8 +235,8 @@ public void testCreateShrinkIndex() {
233235
client().prepareIndex("source", "type")
234236
.setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
235237
}
236-
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
237-
.getDataNodes();
238+
ImmutableOpenMap<String, DiscoveryNode> dataNodes =
239+
client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
238240
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
239241
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
240242
String mergeNode = discoveryNodes[0].getName();
@@ -249,9 +251,16 @@ public void testCreateShrinkIndex() {
249251
.put("index.blocks.write", true)).get();
250252
ensureGreen();
251253

252-
final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").get();
254+
final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").setSegments(true).get();
253255
final long maxSeqNo =
254256
Arrays.stream(sourceStats.getShards()).map(ShardStats::getSeqNoStats).mapToLong(SeqNoStats::getMaxSeqNo).max().getAsLong();
257+
final long maxUnsafeAutoIdTimestamp =
258+
Arrays.stream(sourceStats.getShards())
259+
.map(ShardStats::getStats)
260+
.map(CommonStats::getSegments)
261+
.mapToLong(SegmentsStats::getMaxUnsafeAutoIdTimestamp)
262+
.max()
263+
.getAsLong();
255264
// now merge source into a single shard index
256265

257266
final boolean createWithReplicas = randomBoolean();
@@ -264,6 +273,7 @@ public void testCreateShrinkIndex() {
264273
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
265274
assertThat(seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo));
266275
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo));
276+
assertThat(shardStats.getStats().getSegments().getMaxUnsafeAutoIdTimestamp(), equalTo(maxUnsafeAutoIdTimestamp));
267277
}
268278

269279
final int size = docs > 0 ? 2 * docs : 1;

core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.lucene.codecs.CodecUtil;
2222
import org.apache.lucene.document.Field;
2323
import org.apache.lucene.document.SortedNumericDocValuesField;
24-
import org.apache.lucene.document.SortedSetDocValuesField;
2524
import org.apache.lucene.document.StringField;
2625
import org.apache.lucene.index.DirectoryReader;
2726
import org.apache.lucene.index.IndexWriter;
@@ -32,11 +31,11 @@
3231
import org.apache.lucene.search.Sort;
3332
import org.apache.lucene.search.SortField;
3433
import org.apache.lucene.search.SortedNumericSortField;
35-
import org.apache.lucene.search.SortedSetSortField;
3634
import org.apache.lucene.store.Directory;
3735
import org.apache.lucene.store.IOContext;
3836
import org.apache.lucene.store.IndexOutput;
3937
import org.apache.lucene.util.IOUtils;
38+
import org.elasticsearch.index.engine.InternalEngine;
4039
import org.elasticsearch.index.seqno.SequenceNumbers;
4140
import org.elasticsearch.indices.recovery.RecoveryState;
4241
import org.elasticsearch.test.ESTestCase;
@@ -87,7 +86,8 @@ public void testAddIndices() throws IOException {
8786
RecoveryState.Index indexStats = new RecoveryState.Index();
8887
Directory target = newFSDirectory(createTempDir());
8988
final long maxSeqNo = randomNonNegativeLong();
90-
storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo);
89+
final long maxUnsafeAutoIdTimestamp = randomNonNegativeLong();
90+
storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo, maxUnsafeAutoIdTimestamp);
9191
int numFiles = 0;
9292
Predicate<String> filesFilter = (f) -> f.startsWith("segments") == false && f.equals("write.lock") == false
9393
&& f.startsWith("extra") == false;
@@ -107,6 +107,7 @@ public void testAddIndices() throws IOException {
107107
final Map<String, String> userData = segmentCommitInfos.getUserData();
108108
assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(Long.toString(maxSeqNo)));
109109
assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(maxSeqNo)));
110+
assertThat(userData.get(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID), equalTo(Long.toString(maxUnsafeAutoIdTimestamp)));
110111
for (SegmentCommitInfo info : segmentCommitInfos) { // check that we didn't merge
111112
assertEquals("all sources must be flush",
112113
info.info.getDiagnostics().get("source"), "flush");

0 commit comments

Comments
 (0)