Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
005215e
Automatically prepare indices for splitting
s1monw Nov 16, 2017
4072379
prevent automatic num routing shards for pre 7.0 indices
s1monw Nov 20, 2017
6ab582e
fix auto routing shards for 1 shard source indices
s1monw Nov 20, 2017
0e3fbbb
Merge branch 'master' into prepare_for_split
s1monw Nov 20, 2017
6d90ad8
add additional verification for broken split setups
s1monw Nov 20, 2017
e4da795
fix test:
s1monw Nov 21, 2017
033c6ad
Merge branch 'master' into prepare_for_split
s1monw Nov 21, 2017
fb13969
Improved index-split docs
clintongormley Nov 21, 2017
27a6d34
apply review comments
s1monw Nov 21, 2017
71f5c35
Merge pull request #3 from clintongormley/prepare_for_split
s1monw Nov 21, 2017
9262c9e
fix unittest
s1monw Nov 21, 2017
8482a60
stabelize SplitIndexIT
s1monw Nov 21, 2017
6fe2bff
Fix routing test to actually be sane
s1monw Nov 21, 2017
b02b9f3
fix SharedSignificantTermsTestMethods tests
s1monw Nov 21, 2017
8176378
fix SimpleRoutingIT again
s1monw Nov 22, 2017
1f46ce3
use a factor but incompatible one
s1monw Nov 22, 2017
a6616cd
bound num routing shards to 1024
s1monw Nov 22, 2017
b2a9a08
fix comments
s1monw Nov 22, 2017
dccad0b
fix tests for new hashing
s1monw Nov 22, 2017
36aca55
Merge branch 'master' into prepare_for_split
s1monw Nov 22, 2017
1cd9c78
Merge branch 'master' into prepare_for_split
s1monw Nov 22, 2017
e820289
Merge branch 'master' into prepare_for_split
s1monw Nov 22, 2017
791c2f1
Merge branch 'master' into prepare_for_split
s1monw Nov 23, 2017
4db1b97
add note to migration guide
s1monw Nov 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public void testSearchWithMatrixStats() throws IOException {
assertEquals(5, matrixStats.getFieldCount("num"));
assertEquals(56d, matrixStats.getMean("num"), 0d);
assertEquals(1830d, matrixStats.getVariance("num"), 0d);
assertEquals(0.09340198804973057, matrixStats.getSkewness("num"), 0d);
assertEquals(0.09340198804973046, matrixStats.getSkewness("num"), 0d);
assertEquals(1.2741646510794589, matrixStats.getKurtosis("num"), 0d);
assertEquals(5, matrixStats.getFieldCount("num2"));
assertEquals(29d, matrixStats.getMean("num2"), 0d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,14 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Resi

if (IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING.exists(targetIndexSettings)) {
throw new IllegalArgumentException("cannot provide a routing partition size value when resizing an index");

}
if (IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(targetIndexSettings)) {
throw new IllegalArgumentException("cannot provide index.number_of_routing_shards on resize");
// if we have a source index with 1 shards it's legal to set this
final boolean splitFromSingleShards = resizeRequest.getResizeType() == ResizeType.SPLIT && metaData.getNumberOfShards() == 1;
if (splitFromSingleShards == false) {
throw new IllegalArgumentException("cannot provide index.number_of_routing_shards on resize");
}
}
String cause = resizeRequest.getResizeType().name().toLowerCase(Locale.ROOT) + "_index";
targetIndex.cause(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1333,25 +1333,33 @@ public int getRoutingFactor() {
* @return a the source shard ID to split off from
*/
public static ShardId selectSplitShard(int shardId, IndexMetaData sourceIndexMetadata, int numTargetShards) {
int numSourceShards = sourceIndexMetadata.getNumberOfShards();
if (shardId >= numTargetShards) {
throw new IllegalArgumentException("the number of target shards (" + numTargetShards + ") must be greater than the shard id: "
+ shardId);
}
int numSourceShards = sourceIndexMetadata.getNumberOfShards();
final int routingFactor = getRoutingFactor(numSourceShards, numTargetShards);
assertSplitMetadata(numSourceShards, numTargetShards, sourceIndexMetadata);
return new ShardId(sourceIndexMetadata.getIndex(), shardId/routingFactor);
}

private static void assertSplitMetadata(int numSourceShards, int numTargetShards, IndexMetaData sourceIndexMetadata) {
if (numSourceShards > numTargetShards) {
throw new IllegalArgumentException("the number of source shards [" + numSourceShards
+ "] must be less that the number of target shards [" + numTargetShards + "]");
+ "] must be less that the number of target shards [" + numTargetShards + "]");
}
int routingFactor = getRoutingFactor(numSourceShards, numTargetShards);
// now we verify that the numRoutingShards is valid in the source index
int routingNumShards = sourceIndexMetadata.getRoutingNumShards();
// note: if the number of shards is 1 in the source index we can just assume it's correct since from 1 we can split into anything
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand ot's important here to bypass the assertions as we don't have any relationship between the source routing shards and the target one in the case where the source has only one physical shards. I think the "validate this in various places in the code" part is maybe a leftover from a previous iteration?

// this is important to special case here since we use this to validate this in various places in the code but allow to split form
// 1 to N but we never modify the sourceIndexMetadata to accommodate for that
int routingNumShards = numSourceShards == 1 ? numTargetShards : sourceIndexMetadata.getRoutingNumShards();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe do something like:

final int selectedShard;
if (numSourceShards == 1) {
  selectedShard = 0;
} else {
  ... the current logic...
  selectedShard = shardId/routingFactor;
}
return new ShardId(sourceIndexMetadata.getIndex(), selectedShard);

will be easier to read , I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also s/form/from/

if (routingNumShards % numTargetShards != 0) {
throw new IllegalStateException("the number of routing shards ["
+ routingNumShards + "] must be a multiple of the target shards [" + numTargetShards + "]");
}
// this is just an additional assertion that ensures we are a factor of the routing num shards.
assert getRoutingFactor(numTargetShards, sourceIndexMetadata.getRoutingNumShards()) >= 0;
return new ShardId(sourceIndexMetadata.getIndex(), shardId/routingFactor);
assert sourceIndexMetadata.getNumberOfShards() == 1 // special case - we can split into anything from 1 shard
|| getRoutingFactor(numTargetShards, routingNumShards) >= 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,15 +379,24 @@ public ClusterState execute(ClusterState currentState) throws Exception {
indexSettingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName());
indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index());

final Settings idxSettings = indexSettingsBuilder.build();
int numTargetShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(idxSettings);
final int routingNumShards;
if (recoverFromIndex == null) {
Settings idxSettings = indexSettingsBuilder.build();
routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings);
final Version indexVersionCreated = idxSettings.getAsVersion(IndexMetaData.SETTING_VERSION_CREATED, null);
final IndexMetaData sourceMetaData = recoverFromIndex == null ? null :
currentState.metaData().getIndexSafe(recoverFromIndex);
if (sourceMetaData == null || sourceMetaData.getNumberOfShards() == 1) {
// in this case we either have no index to recover from or
// we have a source index with 1 shard and without an explicit split factor
// or one that is valid in that case we can split into whatever and auto-generate a new factor.
if (IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(idxSettings)) {
routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings);
} else {
routingNumShards = calculateNumRoutingShards(numTargetShards, indexVersionCreated);
}
} else {
assert IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettingsBuilder.build()) == false
: "index.number_of_routing_shards should be present on the target index on resize";
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
: "index.number_of_routing_shards should not be present on the target index on resize";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

routingNumShards = sourceMetaData.getRoutingNumShards();
}
// remove the setting it's temporary and is only relevant once we create the index
Expand All @@ -408,7 +417,6 @@ public ClusterState execute(ClusterState currentState) throws Exception {
* the maximum primary term on all the shards in the source index. This ensures that we have correct
* document-level semantics regarding sequence numbers in the shrunken index.
*/
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
final long primaryTerm =
IntStream
.range(0, sourceMetaData.getNumberOfShards())
Expand Down Expand Up @@ -717,4 +725,21 @@ static void prepareResizeIndexSettings(ClusterState currentState, Set<String> ma
.put(IndexMetaData.INDEX_RESIZE_SOURCE_NAME.getKey(), resizeSourceIndex.getName())
.put(IndexMetaData.INDEX_RESIZE_SOURCE_UUID.getKey(), resizeSourceIndex.getUUID());
}

/**
* Returns a default number of routing shards based on the number of shards of the index. The default number of routing shards will
* allow any index to be split at least once and at most 10 times by a factor of two. The closer the number or shards gets to 1024
* the less default split operations are supported
*/
public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) {
if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify why this needs to be version dependent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a comment in the line below?!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I get this means we only do the new behavior until the cluster is fully upgraded, but I don't see why we care? I mean, if the master is a 7.0.0 master, we can start creating indices with a different hashing logic and not worry about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's mainly for testing purposes and BWC behavior being more predictable otherwise some rest tests will randomly fail

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh well :)

// only select this automatically for indices that are created on or after 7.0 this will prevent this new behaviour
// until we have a fully upgraded cluster see {@link IndexMetaDataE#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing end of comment?

int base = 9; // logBase2(512)
final int minNumSplits = 1;
return numShards * 1 << Math.max(minNumSplits, (base - (int) (Math.log(numShards) / Math.log(2))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be better for results to be in 513..1024 than 512..1023 by ceiling the log? Feel free to ignore but I'd do the following:

// We use as a default number of routing shards the higher number that can be expressed as {@code numShards * 2^x`} that is less than or equal to the maximum number of shards: 1024.
int log2MaxNumShards = 10; // logBase2(1024)
int log2NumShards = 32 - Integer.numberOfLeadingZeros(numShards - 1); // ceil(logBase2(numShards))
int numSplits = log2MaxNumShards - log2NumShards;
numSplits = Math.max(1, numSplits); // Ensure the index can be split at least once
return numShards * 1 << numSplits;

And then in tests make sure that the value is in 513..1024 for any number of shards in 0..512, and equal to numShards*2 for any number of shards that is greater than 512?

} else {
return numShards;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ public void testInvalidPartitionSize() {
response = prepareCreate("test_" + shards + "_" + partitionSize)
.setSettings(Settings.builder()
.put("index.number_of_shards", shards)
.put("index.number_of_routing_shards", shards)
.put("index.routing_partition_size", partitionSize))
.execute().actionGet();
} catch (IllegalStateException | IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -66,7 +67,6 @@
import java.util.List;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.IntFunction;
import java.util.stream.IntStream;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -88,21 +88,38 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

public void testCreateSplitIndexToN() throws IOException {
int[][] possibleShardSplits = new int[][] {{2,4,8}, {3, 6, 12}, {1, 2, 4}};
int[][] possibleShardSplits = new int[][]{{2, 4, 8}, {3, 6, 12}, {1, 2, 4}};
int[] shardSplits = randomFrom(possibleShardSplits);
assertEquals(shardSplits[0], (shardSplits[0] * shardSplits[1]) / shardSplits[1]);
assertEquals(shardSplits[1], (shardSplits[1] * shardSplits[2]) / shardSplits[2]);
splitToN(shardSplits[0], shardSplits[1], shardSplits[2]);
}

public void testSplitFromOneToN() {
splitToN(1, 5, 10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you had an explicit reason not to have a shrink to one shard then split again test (where we can take values in the split that doesn't compute with the source index)? alternatively we can explicitly set the routing shards on the source index to something that doesn't make sense when we start.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand what you mean

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test now start with 1 shard source and the split twice. both of these always have a valid number of routing shards in the source index. I think the interesting part of the test is see how we reset the number of routing shard. For example, start with a 3 shards index. Shrink to 1 (number of routing shards stays 3) then split to say, 2. Does that help?

client().admin().indices().prepareDelete("*").get();
int randomSplit = randomIntBetween(2, 6);
splitToN(1, randomSplit, randomSplit * 2);
}

private void splitToN(int sourceShards, int firstSplitShards, int secondSplitShards) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍


assertEquals(sourceShards, (sourceShards * firstSplitShards) / firstSplitShards);
assertEquals(firstSplitShards, (firstSplitShards * secondSplitShards) / secondSplitShards);
internalCluster().ensureAtLeastNumDataNodes(2);
final boolean useRouting = randomBoolean();
final boolean useNested = randomBoolean();
final boolean useMixedRouting = useRouting ? randomBoolean() : false;
CreateIndexRequestBuilder createInitialIndex = prepareCreate("source");
final int routingShards = shardSplits[2] * randomIntBetween(1, 10);
Settings.Builder settings = Settings.builder().put(indexSettings())
.put("number_of_shards", shardSplits[0])
.put("index.number_of_routing_shards", routingShards);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we randomly still do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can.. I will do it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

Settings.Builder settings = Settings.builder().put(indexSettings()).put("number_of_shards", sourceShards);
final int routingShards;
if (randomBoolean()) {
// randomly set the value manually
routingShards = secondSplitShards * randomIntBetween(1, 10);
settings.put("index.number_of_routing_shards", routingShards);
} else {
routingShards = MetaDataCreateIndexService.calculateNumRoutingShards(sourceShards, Version.CURRENT);
}
if (useRouting && useMixedRouting == false && randomBoolean()) {
settings.put("index.routing_partition_size", randomIntBetween(1, routingShards - 1));
settings.put("index.routing_partition_size", randomIntBetween(1, routingShards-1));
if (useNested) {
createInitialIndex.addMapping("t1", "_routing", "required=true", "nested1", "type=nested");
} else {
Expand Down Expand Up @@ -172,11 +189,15 @@ public void testCreateSplitIndexToN() throws IOException {
.setSettings(Settings.builder()
.put("index.blocks.write", true)).get();
ensureGreen();
Settings.Builder firstSplitSettingsBuilder = Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", firstSplitShards);
if (sourceShards == 1 && randomBoolean()) { // try to set it if we have a source index with 1 shard
firstSplitSettingsBuilder.put("index.number_of_routing_shards", routingShards);
}
assertAcked(client().admin().indices().prepareResizeIndex("source", "first_split")
.setResizeType(ResizeType.SPLIT)
.setSettings(Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", shardSplits[1]).build()).get());
.setSettings(firstSplitSettingsBuilder.build()).get());
ensureGreen();
assertHitCount(client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs);

Expand Down Expand Up @@ -204,7 +225,7 @@ public void testCreateSplitIndexToN() throws IOException {
.setResizeType(ResizeType.SPLIT)
.setSettings(Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", shardSplits[2]).build()).get());
.put("index.number_of_shards", secondSplitShards).build()).get());
ensureGreen();
assertHitCount(client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs);
// let it be allocated anywhere and bump replicas
Expand Down Expand Up @@ -340,7 +361,6 @@ public void testCreateSplitIndex() {
prepareCreate("source").setSettings(Settings.builder().put(indexSettings())
.put("number_of_shards", 1)
.put("index.version.created", version)
.put("index.number_of_routing_shards", 2)
).get();
final int docs = randomIntBetween(0, 128);
for (int i = 0; i < docs; i++) {
Expand Down Expand Up @@ -443,7 +463,6 @@ public void testCreateSplitWithIndexSort() throws Exception {
Settings.builder()
.put(indexSettings())
.put("sort.field", "id")
.put("index.number_of_routing_shards", 16)
.put("sort.order", "desc")
.put("number_of_shards", 2)
.put("number_of_replicas", 0)
Expand Down
Loading