Skip to content

Commit 1f14d04

Browse files
authored
Initialize primary term for shrunk indices
Today when an index is shrunk, the primary terms for its shards start from one. Yet, this is a problem as the index will already contain assigned sequence numbers across primary terms. To ensure document-level sequence number semantics, the primary terms of the target shards must start from the maximum of all the shards in the source index. This commit causes this to be the case. Relates #25307
1 parent 50bac63 commit 1f14d04

File tree

2 files changed

+136
-14
lines changed

2 files changed

+136
-14
lines changed

core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import java.util.concurrent.atomic.AtomicInteger;
9292
import java.util.function.BiFunction;
9393
import java.util.function.Predicate;
94+
import java.util.stream.IntStream;
9495

9596
import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
9697
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
@@ -340,19 +341,44 @@ public ClusterState execute(ClusterState currentState) throws Exception {
340341
indexSettingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName());
341342
indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
342343
final Index shrinkFromIndex = request.shrinkFrom();
343-
int routingNumShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettingsBuilder.build());;
344-
if (shrinkFromIndex != null) {
345-
prepareShrinkIndexSettings(currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex,
346-
request.index());
347-
IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex);
344+
final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index());
345+
346+
final int routingNumShards;
347+
if (shrinkFromIndex == null) {
348+
routingNumShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettingsBuilder.build());
349+
} else {
350+
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex);
348351
routingNumShards = sourceMetaData.getRoutingNumShards();
349352
}
353+
tmpImdBuilder.setRoutingNumShards(routingNumShards);
354+
355+
if (shrinkFromIndex != null) {
356+
prepareShrinkIndexSettings(
357+
currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex, request.index());
358+
}
359+
final Settings actualIndexSettings = indexSettingsBuilder.build();
360+
tmpImdBuilder.settings(actualIndexSettings);
361+
362+
if (shrinkFromIndex != null) {
363+
/*
364+
* We need to arrange that the primary term on all the shards in the shrunken index is at least as large as
365+
* the maximum primary term on all the shards in the source index. This ensures that we have correct
366+
* document-level semantics regarding sequence numbers in the shrunken index.
367+
*/
368+
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex);
369+
final long primaryTerm =
370+
IntStream
371+
.range(0, sourceMetaData.getNumberOfShards())
372+
.mapToLong(sourceMetaData::primaryTerm)
373+
.max()
374+
.getAsLong();
375+
for (int shardId = 0; shardId < tmpImdBuilder.numberOfShards(); shardId++) {
376+
tmpImdBuilder.primaryTerm(shardId, primaryTerm);
377+
}
378+
}
350379

351-
Settings actualIndexSettings = indexSettingsBuilder.build();
352-
IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index())
353-
.setRoutingNumShards(routingNumShards);
354380
// Set up everything, now locally create the index to see that things are ok, and apply
355-
final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build();
381+
final IndexMetaData tmpImd = tmpImdBuilder.build();
356382
ActiveShardCount waitForActiveShards = request.waitForActiveShards();
357383
if (waitForActiveShards == ActiveShardCount.DEFAULT) {
358384
waitForActiveShards = tmpImd.getWaitForActiveShards();
@@ -408,6 +434,11 @@ public ClusterState execute(ClusterState currentState) throws Exception {
408434
final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index())
409435
.settings(actualIndexSettings)
410436
.setRoutingNumShards(routingNumShards);
437+
438+
for (int shardId = 0; shardId < tmpImd.getNumberOfShards(); shardId++) {
439+
indexMetaDataBuilder.primaryTerm(shardId, tmpImd.primaryTerm(shardId));
440+
}
441+
411442
for (MappingMetaData mappingMd : mappingsMetaData.values()) {
412443
indexMetaDataBuilder.putMapping(mappingMd);
413444
}

core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java

Lines changed: 96 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,41 +19,57 @@
1919

2020
package org.elasticsearch.action.admin.indices.create;
2121

22+
import org.apache.lucene.index.CorruptIndexException;
2223
import org.apache.lucene.search.Sort;
2324
import org.apache.lucene.search.SortField;
2425
import org.apache.lucene.search.SortedSetSelector;
2526
import org.apache.lucene.search.SortedSetSortField;
2627
import org.elasticsearch.Version;
2728
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
29+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
2830
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
29-
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
30-
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
31-
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
32-
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
3331
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
32+
import org.elasticsearch.action.index.IndexRequest;
3433
import org.elasticsearch.action.support.ActiveShardCount;
34+
import org.elasticsearch.client.Client;
3535
import org.elasticsearch.cluster.ClusterInfoService;
3636
import org.elasticsearch.cluster.InternalClusterInfoService;
37+
import org.elasticsearch.cluster.metadata.IndexMetaData;
3738
import org.elasticsearch.cluster.node.DiscoveryNode;
39+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
40+
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
3841
import org.elasticsearch.cluster.routing.RoutingTable;
42+
import org.elasticsearch.cluster.routing.ShardRouting;
3943
import org.elasticsearch.cluster.routing.UnassignedInfo;
4044
import org.elasticsearch.common.Priority;
4145
import org.elasticsearch.common.collect.ImmutableOpenMap;
4246
import org.elasticsearch.common.settings.Settings;
4347
import org.elasticsearch.common.xcontent.XContentType;
44-
import org.elasticsearch.index.engine.Segment;
48+
import org.elasticsearch.index.Index;
49+
import org.elasticsearch.index.IndexService;
4550
import org.elasticsearch.index.query.TermsQueryBuilder;
51+
import org.elasticsearch.index.shard.IndexShard;
52+
import org.elasticsearch.index.shard.ShardId;
53+
import org.elasticsearch.indices.IndicesService;
4654
import org.elasticsearch.plugins.Plugin;
4755
import org.elasticsearch.test.ESIntegTestCase;
4856
import org.elasticsearch.test.InternalSettingsPlugin;
4957
import org.elasticsearch.test.VersionUtils;
5058

5159
import java.util.Arrays;
5260
import java.util.Collection;
61+
import java.util.HashSet;
62+
import java.util.List;
63+
import java.util.Set;
64+
import java.util.stream.Collectors;
65+
import java.util.stream.IntStream;
5366

5467
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5568
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
69+
import static org.hamcrest.CoreMatchers.not;
5670
import static org.hamcrest.Matchers.containsString;
71+
import static org.hamcrest.Matchers.equalTo;
72+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5773

5874
public class ShrinkIndexIT extends ESIntegTestCase {
5975

@@ -135,6 +151,81 @@ public void testCreateShrinkIndexToN() {
135151
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
136152
}
137153

154+
public void testShrinkIndexPrimaryTerm() throws Exception {
155+
final List<Integer> factors = Arrays.asList(2, 3, 5, 7);
156+
final List<Integer> numberOfShardsFactors = randomSubsetOf(scaledRandomIntBetween(1, factors.size()), factors);
157+
final int numberOfShards = numberOfShardsFactors.stream().reduce(1, (x, y) -> x * y);
158+
final int numberOfTargetShards = randomSubsetOf(numberOfShardsFactors).stream().reduce(1, (x, y) -> x * y);
159+
internalCluster().ensureAtLeastNumDataNodes(2);
160+
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", numberOfShards)).get();
161+
162+
final ImmutableOpenMap<String, DiscoveryNode> dataNodes =
163+
client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
164+
assertThat(dataNodes.size(), greaterThanOrEqualTo(2));
165+
final DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
166+
final String mergeNode = discoveryNodes[0].getName();
167+
ensureGreen();
168+
169+
// fail random primary shards to force primary terms to increase
170+
final Index source = resolveIndex("source");
171+
final int iterations = scaledRandomIntBetween(0, 16);
172+
for (int i = 0; i < iterations; i++) {
173+
final String node = randomSubsetOf(1, internalCluster().nodesInclude("source")).get(0);
174+
final IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node);
175+
final IndexService indexShards = indexServices.indexServiceSafe(source);
176+
for (final Integer shardId : indexShards.shardIds()) {
177+
final IndexShard shard = indexShards.getShard(shardId);
178+
if (shard.routingEntry().primary() && randomBoolean()) {
179+
disableAllocation("source");
180+
shard.failShard("test", new Exception("test"));
181+
// this can not succeed until the shard is failed and a replica is promoted
182+
int id = 0;
183+
while (true) {
184+
// find an ID that routes to the right shard, we will only index to the shard that saw a primary failure
185+
final String s = Integer.toString(id);
186+
final int hash = Math.floorMod(Murmur3HashFunction.hash(s), numberOfShards);
187+
if (hash == shardId) {
188+
final IndexRequest request =
189+
new IndexRequest("source", "type", s).source("{ \"f\": \"" + s + "\"}", XContentType.JSON);
190+
client().index(request).get();
191+
break;
192+
} else {
193+
id++;
194+
}
195+
}
196+
enableAllocation("source");
197+
ensureGreen();
198+
}
199+
}
200+
}
201+
202+
// relocate all shards to one node such that we can merge it.
203+
final Settings.Builder prepareShrinkSettings =
204+
Settings.builder().put("index.routing.allocation.require._name", mergeNode).put("index.blocks.write", true);
205+
client().admin().indices().prepareUpdateSettings("source").setSettings(prepareShrinkSettings).get();
206+
ensureGreen();
207+
208+
final IndexMetaData indexMetaData = indexMetaData(client(), "source");
209+
final long beforeShrinkPrimaryTerm = IntStream.range(0, numberOfShards).mapToLong(indexMetaData::primaryTerm).max().getAsLong();
210+
211+
// now merge source into target
212+
final Settings shrinkSettings =
213+
Settings.builder().put("index.number_of_replicas", 0).put("index.number_of_shards", numberOfTargetShards).build();
214+
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target").setSettings(shrinkSettings).get());
215+
216+
ensureGreen();
217+
218+
final IndexMetaData afterShrinkIndexMetaData = indexMetaData(client(), "target");
219+
for (int shardId = 0; shardId < numberOfTargetShards; shardId++) {
220+
assertThat(afterShrinkIndexMetaData.primaryTerm(shardId), equalTo(beforeShrinkPrimaryTerm + 1));
221+
}
222+
}
223+
224+
private static IndexMetaData indexMetaData(final Client client, final String index) {
225+
final ClusterStateResponse clusterStateResponse = client.admin().cluster().state(new ClusterStateRequest()).actionGet();
226+
return clusterStateResponse.getState().metaData().index(index);
227+
}
228+
138229
public void testCreateShrinkIndex() {
139230
internalCluster().ensureAtLeastNumDataNodes(2);
140231
Version version = VersionUtils.randomVersion(random());

0 commit comments

Comments
 (0)