From f9e526416141ba81f61b5b18812babe1c9869092 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 26 Nov 2024 11:23:35 +0100 Subject: [PATCH 01/10] Add INDEX_REFRESH_BLOCK Relates ES-10131 --- .../cluster/block/ClusterBlockLevel.java | 3 +- .../cluster/metadata/IndexMetadata.java | 9 +++ .../metadata/MetadataCreateIndexService.java | 49 +++++++++++++ .../MetadataCreateIndexServiceTests.java | 68 ++++++++++++++++++- 4 files changed, 127 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlockLevel.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlockLevel.java index f6330fb18e5e6..262044b091ac7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlockLevel.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlockLevel.java @@ -15,7 +15,8 @@ public enum ClusterBlockLevel { READ, WRITE, METADATA_READ, - METADATA_WRITE; + METADATA_WRITE, + REFRESH; public static final EnumSet ALL = EnumSet.allOf(ClusterBlockLevel.class); public static final EnumSet READ_WRITE = EnumSet.of(READ, WRITE); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 6456240c2317e..b7c1ee5fbad96 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -140,6 +140,15 @@ public class IndexMetadata implements Diffable, ToXContentFragmen RestStatus.TOO_MANY_REQUESTS, EnumSet.of(ClusterBlockLevel.WRITE) ); + public static final ClusterBlock INDEX_REFRESH_BLOCK = new ClusterBlock( + 14, + "index refresh blocked, waiting for shard(s) to be started", + true, + false, + false, + RestStatus.REQUEST_TIMEOUT, + EnumSet.of(ClusterBlockLevel.REFRESH) + ); // 'event.ingested' (part of Elastic Common Schema) range is tracked in cluster state, along with @timestamp public static final String EVENT_INGESTED_FIELD_NAME = "event.ingested"; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 1f014a526b9a6..547146d014304 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; @@ -127,6 +128,15 @@ public class MetadataCreateIndexService { public static final int MAX_INDEX_NAME_BYTES = 255; + /** + * Name of the setting used to allow blocking refreshes on newly created indices. + */ + public static final String USE_INDEX_REFRESH_BLOCK_SETTING_NAME = "stateless.indices.use_refresh_block_upon_index_creation"; + + @FunctionalInterface + interface ClusterBlocksTransformer extends BiConsumer { + } + private final Settings settings; private final ClusterService clusterService; private final IndicesService indicesService; @@ -139,6 +149,7 @@ public class MetadataCreateIndexService { private final boolean forbidPrivateIndexSettings; private final Set indexSettingProviders; private final ThreadPool threadPool; + private final @Nullable ClusterBlocksTransformer blocksTransformerUponIndexCreation; public MetadataCreateIndexService( final Settings settings, @@ -166,6 +177,7 @@ public MetadataCreateIndexService( this.shardLimitValidator = shardLimitValidator; this.indexSettingProviders = indexSettingProviders.getIndexSettingProviders(); this.threadPool = threadPool; + this.blocksTransformerUponIndexCreation = createClusterBlocksTransformerForIndexCreation(settings); } /** @@ -540,8 +552,10 @@ private ClusterState applyCreateIndexWithTemporaryService( currentState, indexMetadata, metadataTransformer, + blocksTransformerUponIndexCreation, allocationService.getShardRoutingRoleStrategy() ); + assert assertHasRefreshBlock(indexMetadata, updated); if (request.performReroute()) { updated = allocationService.reroute(updated, "index [" + indexMetadata.getIndex().getName() + "] created", rerouteListener); } @@ -1294,6 +1308,7 @@ static ClusterState clusterStateCreateIndex( ClusterState currentState, IndexMetadata indexMetadata, BiConsumer metadataTransformer, + ClusterBlocksTransformer blocksTransformer, ShardRoutingRoleStrategy shardRoutingRoleStrategy ) { final Metadata newMetadata; @@ -1307,6 +1322,9 @@ static ClusterState clusterStateCreateIndex( var blocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks()); blocksBuilder.updateBlocks(indexMetadata); + if (blocksTransformer != null) { + blocksTransformer.accept(blocksBuilder, indexMetadata); + } var routingTableBuilder = RoutingTable.builder(shardRoutingRoleStrategy, currentState.routingTable()) .addAsNew(newMetadata.index(indexMetadata.getIndex().getName())); @@ -1745,4 +1763,35 @@ public static void validateStoreTypeSetting(Settings indexSettings) { ); } } + + private static boolean useRefreshBlock(Settings settings) { + return DiscoveryNode.isStateless(settings) && settings.getAsBoolean(USE_INDEX_REFRESH_BLOCK_SETTING_NAME, false); + } + + static ClusterBlocksTransformer createClusterBlocksTransformerForIndexCreation(Settings settings) { + if (useRefreshBlock(settings) == false) { + return (builder, indexMetadata) -> {}; + } + logger.info("applying refresh block on index creation"); + return (builder, indexMetadata) -> { + if (applyRefreshBlock(indexMetadata)) { + builder.addIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK); + } + }; + } + + private static boolean applyRefreshBlock(IndexMetadata indexMetadata) { + return 0 < indexMetadata.getNumberOfReplicas() // index has replicas + && indexMetadata.getResizeSourceIndex() == null // index is not a split/shrink index + && indexMetadata.getInSyncAllocationIds().values().stream().allMatch(Set::isEmpty); // index is a new index + } + + private boolean assertHasRefreshBlock(IndexMetadata indexMetadata, ClusterState clusterState) { + if (useRefreshBlock(settings) == false || applyRefreshBlock(indexMetadata) == false) { + assert clusterState.blocks().hasIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK) == false; + } else { + assert clusterState.blocks().hasIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK); + } + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 96a74d2e23aad..80d5f26a07e59 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -105,6 +105,8 @@ import static org.elasticsearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases; import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; import static org.elasticsearch.indices.ShardLimitValidatorTests.createTestShardLimitService; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -1133,7 +1135,7 @@ public void testClusterStateCreateIndexThrowsWriteIndexValidationException() thr assertThat( expectThrows( IllegalStateException.class, - () -> clusterStateCreateIndex(currentClusterState, newIndex, null, TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) + () -> clusterStateCreateIndex(currentClusterState, newIndex, null, null, TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) ).getMessage(), startsWith("alias [alias1] has more than one write index [") ); @@ -1153,6 +1155,7 @@ public void testClusterStateCreateIndex() { currentClusterState, newIndexMetadata, null, + null, TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY ); assertThat(updatedClusterState.blocks().getIndexBlockWithId("test", INDEX_READ_ONLY_BLOCK.id()), is(INDEX_READ_ONLY_BLOCK)); @@ -1198,6 +1201,7 @@ public void testClusterStateCreateIndexWithMetadataTransaction() { currentClusterState, newIndexMetadata, metadataTransformer, + null, TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY ); assertTrue(updatedClusterState.metadata().findAllAliases(new String[] { "my-index" }).containsKey("my-index")); @@ -1547,6 +1551,68 @@ public void testDeprecateSimpleFS() { ); } + public void testClusterStateCreateIndexWithClusterBlockTransformer() { + var emptyClusterState = ClusterState.builder(ClusterState.EMPTY_STATE).build(); + { + var updatedClusterState = clusterStateCreateIndex( + emptyClusterState, + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(randomIntBetween(1, 3)) + .build(), + null, + MetadataCreateIndexService.createClusterBlocksTransformerForIndexCreation(Settings.EMPTY), + TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY + ); + assertThat(updatedClusterState.blocks().indices(), is(anEmptyMap())); + assertThat(updatedClusterState.blocks().hasIndexBlock("test", IndexMetadata.INDEX_REFRESH_BLOCK), is(false)); + assertThat(updatedClusterState.routingTable().index("test"), is(notNullValue())); + } + { + var settings = Settings.builder() + .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) + .put(MetadataCreateIndexService.USE_INDEX_REFRESH_BLOCK_SETTING_NAME, true) + .build(); + int nbReplicas = randomIntBetween(0, 1); + var updatedClusterState = clusterStateCreateIndex( + emptyClusterState, + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(nbReplicas) + .build(), + null, + MetadataCreateIndexService.createClusterBlocksTransformerForIndexCreation(settings), + TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY + ); + assertThat(updatedClusterState.blocks().indices(), is(aMapWithSize(nbReplicas))); + assertThat(updatedClusterState.blocks().hasIndexBlock("test", IndexMetadata.INDEX_REFRESH_BLOCK), is(0 < nbReplicas)); + assertThat(updatedClusterState.routingTable().index("test"), is(notNullValue())); + } + } + + public void testCreateRefreshBlockUponIndexCreationApplier() { + boolean isStateless = randomBoolean(); + boolean useRefreshBlock = randomBoolean(); + + var applier = MetadataCreateIndexService.createClusterBlocksTransformerForIndexCreation( + Settings.builder() + .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, isStateless) + .put(MetadataCreateIndexService.USE_INDEX_REFRESH_BLOCK_SETTING_NAME, useRefreshBlock) + .build() + ); + assertThat(applier, notNullValue()); + + var blocks = ClusterBlocks.builder().blocks(ClusterState.EMPTY_STATE.blocks()); + applier.accept(blocks, IndexMetadata.builder("test") + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(randomIntBetween(1, 3)) + .build()); + assertThat(blocks.hasIndexBlock("test", IndexMetadata.INDEX_REFRESH_BLOCK), is(isStateless && useRefreshBlock)); + } + private IndexTemplateMetadata addMatchingTemplate(Consumer configurator) { IndexTemplateMetadata.Builder builder = templateMetadataBuilder("template1", "te*"); configurator.accept(builder); From 3a592e18907f6142744e4ff54a566968cf96d88c Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 26 Nov 2024 12:01:44 +0100 Subject: [PATCH 02/10] spotless --- .../metadata/MetadataCreateIndexService.java | 3 +-- .../metadata/MetadataCreateIndexServiceTests.java | 13 ++++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 547146d014304..f31a70c594128 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -134,8 +134,7 @@ public class MetadataCreateIndexService { public static final String USE_INDEX_REFRESH_BLOCK_SETTING_NAME = "stateless.indices.use_refresh_block_upon_index_creation"; @FunctionalInterface - interface ClusterBlocksTransformer extends BiConsumer { - } + interface ClusterBlocksTransformer extends BiConsumer {} private final Settings settings; private final ClusterService clusterService; diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 80d5f26a07e59..b203cc851f580 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -1605,11 +1605,14 @@ public void testCreateRefreshBlockUponIndexCreationApplier() { assertThat(applier, notNullValue()); var blocks = ClusterBlocks.builder().blocks(ClusterState.EMPTY_STATE.blocks()); - applier.accept(blocks, IndexMetadata.builder("test") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(randomIntBetween(1, 3)) - .build()); + applier.accept( + blocks, + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(randomIntBetween(1, 3)) + .build() + ); assertThat(blocks.hasIndexBlock("test", IndexMetadata.INDEX_REFRESH_BLOCK), is(isStateless && useRefreshBlock)); } From d447c471aaf6806aa77df069b09bd60c9789cedd Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 26 Nov 2024 12:52:14 +0100 Subject: [PATCH 03/10] fix unit test --- .../cluster/ClusterStateTests.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index 9613086aa9f57..668aea70c23f2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -167,7 +167,8 @@ public void testToXContent() throws IOException { "read", "write", "metadata_read", - "metadata_write" + "metadata_write", + "refresh" ] } }, @@ -180,7 +181,8 @@ public void testToXContent() throws IOException { "read", "write", "metadata_read", - "metadata_write" + "metadata_write", + "refresh" ] } } @@ -440,7 +442,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti "read", "write", "metadata_read", - "metadata_write" + "metadata_write", + "refresh" ] } }, @@ -453,7 +456,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti "read", "write", "metadata_read", - "metadata_write" + "metadata_write", + "refresh" ] } } @@ -712,7 +716,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti "read", "write", "metadata_read", - "metadata_write" + "metadata_write", + "refresh" ] } }, @@ -725,7 +730,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti "read", "write", "metadata_read", - "metadata_write" + "metadata_write", + "refresh" ] } } From 37b3123f7fa359e81c92efe1cf0421115f071d33 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 26 Nov 2024 15:18:52 +0100 Subject: [PATCH 04/10] fix serialization --- .../org/elasticsearch/TransportVersions.java | 1 + .../cluster/block/ClusterBlock.java | 24 +++++++++++++++++- .../metadata/MetadataCreateIndexService.java | 25 +++++++++++-------- .../common/io/stream/StreamOutput.java | 12 +++++++-- 4 files changed, 48 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 688d2aaf905a6..635c23568d576 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -206,6 +206,7 @@ static TransportVersion def(int id) { public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0); public static final TransportVersion INDEXING_PRESSURE_THROTTLING_STATS = def(8_798_00_0); public static final TransportVersion REINDEX_DATA_STREAMS = def(8_799_00_0); + public static final TransportVersion NEW_REFRESH_CLUSTER_BLOCK = def(8_800_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java index 4e47925d383c2..36e6bd6927db5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster.block; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -21,6 +22,7 @@ import java.util.EnumSet; import java.util.Locale; import java.util.Objects; +import java.util.function.Predicate; public class ClusterBlock implements Writeable, ToXContentFragment { @@ -142,7 +144,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); out.writeOptionalString(uuid); out.writeString(description); - out.writeEnumSet(levels); + if (out.getTransportVersion().onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)) { + out.writeEnumSet(levels); + } else { + // do not send ClusterBlockLevel.REFRESH to old nodes + out.writeEnumSet(filterLevels(levels, level -> ClusterBlockLevel.REFRESH.equals(level) == false)); + } out.writeBoolean(retryable); out.writeBoolean(disableStatePersistence); RestStatus.writeTo(out, status); @@ -185,4 +192,19 @@ public int hashCode() { public boolean isAllowReleaseResources() { return allowReleaseResources; } + + private static EnumSet filterLevels(EnumSet levels, Predicate predicate) { + assert levels != null; + int size = levels.size(); + if (size == 0 || (size == 1 && predicate.test(levels.iterator().next()))) { + return levels; + } + var filteredLevels = EnumSet.noneOf(ClusterBlockLevel.class); + for (ClusterBlockLevel level : levels) { + if (predicate.test(level)) { + filteredLevels.add(level); + } + } + return filteredLevels; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index f31a70c594128..11b6aa6534be1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -134,7 +134,9 @@ public class MetadataCreateIndexService { public static final String USE_INDEX_REFRESH_BLOCK_SETTING_NAME = "stateless.indices.use_refresh_block_upon_index_creation"; @FunctionalInterface - interface ClusterBlocksTransformer extends BiConsumer {} + interface ClusterBlocksTransformer { + void apply(ClusterBlocks.Builder clusterBlocks, IndexMetadata indexMetadata, TransportVersion minClusterTransportVersion); + } private final Settings settings; private final ClusterService clusterService; @@ -554,7 +556,7 @@ private ClusterState applyCreateIndexWithTemporaryService( blocksTransformerUponIndexCreation, allocationService.getShardRoutingRoleStrategy() ); - assert assertHasRefreshBlock(indexMetadata, updated); + assert assertHasRefreshBlock(indexMetadata, updated, updated.getMinTransportVersion()); if (request.performReroute()) { updated = allocationService.reroute(updated, "index [" + indexMetadata.getIndex().getName() + "] created", rerouteListener); } @@ -1322,7 +1324,7 @@ static ClusterState clusterStateCreateIndex( var blocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks()); blocksBuilder.updateBlocks(indexMetadata); if (blocksTransformer != null) { - blocksTransformer.accept(blocksBuilder, indexMetadata); + blocksTransformer.apply(blocksBuilder, indexMetadata, currentState.getMinTransportVersion()); } var routingTableBuilder = RoutingTable.builder(shardRoutingRoleStrategy, currentState.routingTable()) @@ -1769,24 +1771,25 @@ private static boolean useRefreshBlock(Settings settings) { static ClusterBlocksTransformer createClusterBlocksTransformerForIndexCreation(Settings settings) { if (useRefreshBlock(settings) == false) { - return (builder, indexMetadata) -> {}; + return (clusterBlocks, indexMetadata, minClusterTransportVersion) -> {}; } logger.info("applying refresh block on index creation"); - return (builder, indexMetadata) -> { - if (applyRefreshBlock(indexMetadata)) { - builder.addIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK); + return (clusterBlocks, indexMetadata, minClusterTransportVersion) -> { + if (applyRefreshBlock(indexMetadata, minClusterTransportVersion)) { + clusterBlocks.addIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK); } }; } - private static boolean applyRefreshBlock(IndexMetadata indexMetadata) { + private static boolean applyRefreshBlock(IndexMetadata indexMetadata, TransportVersion minClusterTransportVersion) { return 0 < indexMetadata.getNumberOfReplicas() // index has replicas && indexMetadata.getResizeSourceIndex() == null // index is not a split/shrink index - && indexMetadata.getInSyncAllocationIds().values().stream().allMatch(Set::isEmpty); // index is a new index + && indexMetadata.getInSyncAllocationIds().values().stream().allMatch(Set::isEmpty) // index is a new index + && minClusterTransportVersion.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK); } - private boolean assertHasRefreshBlock(IndexMetadata indexMetadata, ClusterState clusterState) { - if (useRefreshBlock(settings) == false || applyRefreshBlock(indexMetadata) == false) { + private boolean assertHasRefreshBlock(IndexMetadata indexMetadata, ClusterState clusterState, TransportVersion minTransportVersion) { + if (useRefreshBlock(settings) == false || applyRefreshBlock(indexMetadata, minTransportVersion) == false) { assert clusterState.blocks().hasIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK) == false; } else { assert clusterState.blocks().hasIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK); diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index d724e5ea25ca6..40a93beec8fc0 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -15,6 +15,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -1167,12 +1168,19 @@ public void writeNamedWriteableCollection(Collection l writeCollection(list, StreamOutput::writeNamedWriteable); } + private static > boolean assertEnumToWrite(E enumValue, TransportVersion version) { + assert enumValue instanceof XContentType == false : "XContentHelper#writeTo should be used for XContentType serialisation"; + assert enumValue != ClusterBlockLevel.REFRESH || version.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK) + : "ClusterBlockLevel.REFRESH should only be sent to nodes with recent version"; + return true; + } + /** * Writes an enum with type {@code E} in terms of the value of its ordinal. Enums serialized like this must have a corresponding test * which uses {@code EnumSerializationTestUtils#assertEnumSerialization} to fix the wire protocol. */ public > void writeEnum(E enumValue) throws IOException { - assert enumValue instanceof XContentType == false : "XContentHelper#writeTo should be used for XContentType serialisation"; + assert assertEnumToWrite(enumValue, version); writeVInt(enumValue.ordinal()); } @@ -1185,7 +1193,7 @@ public > void writeOptionalEnum(@Nullable E enumValue) throws writeBoolean(false); } else { writeBoolean(true); - assert enumValue instanceof XContentType == false : "XContentHelper#writeTo should be used for XContentType serialisation"; + assert assertEnumToWrite(enumValue, version); writeVInt(enumValue.ordinal()); } } From 40a3a187e8f00d7bcdc6cb2d0ecef1839a4a661c Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 26 Nov 2024 16:55:20 +0100 Subject: [PATCH 05/10] tests --- .../cluster/block/ClusterBlock.java | 2 +- .../cluster/block/ClusterBlockTests.java | 50 ++++++++++++++++--- .../MetadataCreateIndexServiceTests.java | 12 +++-- 3 files changed, 53 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java index 36e6bd6927db5..25c6a1ff5b67f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java @@ -193,7 +193,7 @@ public boolean isAllowReleaseResources() { return allowReleaseResources; } - private static EnumSet filterLevels(EnumSet levels, Predicate predicate) { + static EnumSet filterLevels(EnumSet levels, Predicate predicate) { assert levels != null; int size = levels.size(); if (size == 0 || (size == 1 && predicate.test(levels.iterator().next()))) { diff --git a/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java b/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java index 311f2ec36af5c..fcbafd40cd093 100644 --- a/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java @@ -10,19 +10,22 @@ package org.elasticsearch.cluster.block; import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; -import java.util.Arrays; import java.util.Collections; -import java.util.List; +import java.util.EnumSet; import java.util.Map; import static java.util.EnumSet.copyOf; +import static org.elasticsearch.test.TransportVersionUtils.getFirstVersion; +import static org.elasticsearch.test.TransportVersionUtils.getPreviousVersion; import static org.elasticsearch.test.TransportVersionUtils.randomVersion; +import static org.elasticsearch.test.TransportVersionUtils.randomVersionBetween; import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; @@ -36,7 +39,7 @@ public void testSerialization() throws Exception { int iterations = randomIntBetween(5, 20); for (int i = 0; i < iterations; i++) { TransportVersion version = randomVersion(random()); - ClusterBlock clusterBlock = randomClusterBlock(); + ClusterBlock clusterBlock = randomClusterBlock(version); BytesStreamOutput out = new BytesStreamOutput(); out.setTransportVersion(version); @@ -50,13 +53,42 @@ public void testSerialization() throws Exception { } } + public void testSerializationBwc() throws Exception { + // + var out = new BytesStreamOutput(); + out.setTransportVersion( + randomVersionBetween(random(), getFirstVersion(), getPreviousVersion(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)) + ); + + var clusterBlock = randomClusterBlock(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK); + clusterBlock.writeTo(out); + + var in = out.bytes().streamInput(); + in.setTransportVersion(randomVersion()); + + assertClusterBlockEquals( + new ClusterBlock( + clusterBlock.id(), + clusterBlock.uuid(), + clusterBlock.description(), + clusterBlock.retryable(), + clusterBlock.disableStatePersistence(), + clusterBlock.isAllowReleaseResources(), + clusterBlock.status(), + // ClusterBlockLevel.REFRESH should not be sent over the wire to nodes with version < NEW_REFRESH_CLUSTER_BLOCK + ClusterBlock.filterLevels(clusterBlock.levels(), level -> ClusterBlockLevel.REFRESH.equals(level) == false) + ), + new ClusterBlock(in) + ); + } + public void testToStringDanglingComma() { - final ClusterBlock clusterBlock = randomClusterBlock(); + final ClusterBlock clusterBlock = randomClusterBlock(randomVersion(random())); assertThat(clusterBlock.toString(), not(endsWith(","))); } public void testGlobalBlocksCheckedIfNoIndicesSpecified() { - ClusterBlock globalBlock = randomClusterBlock(); + ClusterBlock globalBlock = randomClusterBlock(randomVersion(random())); ClusterBlocks clusterBlocks = new ClusterBlocks(Collections.singleton(globalBlock), Map.of()); ClusterBlockException exception = clusterBlocks.indicesBlockedException(randomFrom(globalBlock.levels()), new String[0]); assertNotNull(exception); @@ -113,9 +145,13 @@ public void testGetIndexBlockWithId() { assertThat(builder.build().getIndexBlockWithId("index", randomValueOtherThan(blockId, ESTestCase::randomInt)), nullValue()); } - private static ClusterBlock randomClusterBlock() { + private static ClusterBlock randomClusterBlock(TransportVersion version) { final String uuid = randomBoolean() ? UUIDs.randomBase64UUID() : null; - final List levels = Arrays.asList(ClusterBlockLevel.values()); + final EnumSet levels = ClusterBlock.filterLevels( + EnumSet.allOf(ClusterBlockLevel.class), + // Filter out ClusterBlockLevel.REFRESH for versions < TransportVersions.NEW_REFRESH_CLUSTER_BLOCK + level -> ClusterBlockLevel.REFRESH.equals(level) == false || version.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK) + ); return new ClusterBlock( randomInt(), uuid, diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java index b203cc851f580..494740bf2a98f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -66,6 +66,7 @@ import org.elasticsearch.snapshots.EmptySnapshotsInfoService; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.elasticsearch.test.index.IndexVersionUtils; import org.elasticsearch.threadpool.TestThreadPool; @@ -1595,6 +1596,7 @@ public void testClusterStateCreateIndexWithClusterBlockTransformer() { public void testCreateRefreshBlockUponIndexCreationApplier() { boolean isStateless = randomBoolean(); boolean useRefreshBlock = randomBoolean(); + var minTransportVersion = TransportVersionUtils.randomCompatibleVersion(random()); var applier = MetadataCreateIndexService.createClusterBlocksTransformerForIndexCreation( Settings.builder() @@ -1605,15 +1607,19 @@ public void testCreateRefreshBlockUponIndexCreationApplier() { assertThat(applier, notNullValue()); var blocks = ClusterBlocks.builder().blocks(ClusterState.EMPTY_STATE.blocks()); - applier.accept( + applier.apply( blocks, IndexMetadata.builder("test") .settings(settings(IndexVersion.current())) .numberOfShards(1) .numberOfReplicas(randomIntBetween(1, 3)) - .build() + .build(), + minTransportVersion + ); + assertThat( + blocks.hasIndexBlock("test", IndexMetadata.INDEX_REFRESH_BLOCK), + is(isStateless && useRefreshBlock && minTransportVersion.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)) ); - assertThat(blocks.hasIndexBlock("test", IndexMetadata.INDEX_REFRESH_BLOCK), is(isStateless && useRefreshBlock)); } private IndexTemplateMetadata addMatchingTemplate(Consumer configurator) { From 063e904603cc84a199edc4df3014d10933e5c9f0 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 26 Nov 2024 18:00:44 +0100 Subject: [PATCH 06/10] fix test --- .../MetadataCreateIndexServiceTests.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 494740bf2a98f..1876a1f2da556 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.version.CompatibilityVersions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -1553,8 +1554,8 @@ public void testDeprecateSimpleFS() { } public void testClusterStateCreateIndexWithClusterBlockTransformer() { - var emptyClusterState = ClusterState.builder(ClusterState.EMPTY_STATE).build(); { + var emptyClusterState = ClusterState.builder(ClusterState.EMPTY_STATE).build(); var updatedClusterState = clusterStateCreateIndex( emptyClusterState, IndexMetadata.builder("test") @@ -1571,6 +1572,11 @@ public void testClusterStateCreateIndexWithClusterBlockTransformer() { assertThat(updatedClusterState.routingTable().index("test"), is(notNullValue())); } { + var minTransportVersion = TransportVersionUtils.randomCompatibleVersion(random()); + var emptyClusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("_node_id")).build()) + .putCompatibilityVersions("_node_id", new CompatibilityVersions(minTransportVersion, Map.of())) + .build(); var settings = Settings.builder() .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .put(MetadataCreateIndexService.USE_INDEX_REFRESH_BLOCK_SETTING_NAME, true) @@ -1582,18 +1588,21 @@ public void testClusterStateCreateIndexWithClusterBlockTransformer() { .settings(settings(IndexVersion.current())) .numberOfShards(1) .numberOfReplicas(nbReplicas) - .build(), + .build() + .withTimestampRanges(IndexLongFieldRange.UNKNOWN, IndexLongFieldRange.UNKNOWN, minTransportVersion), null, MetadataCreateIndexService.createClusterBlocksTransformerForIndexCreation(settings), TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY ); - assertThat(updatedClusterState.blocks().indices(), is(aMapWithSize(nbReplicas))); - assertThat(updatedClusterState.blocks().hasIndexBlock("test", IndexMetadata.INDEX_REFRESH_BLOCK), is(0 < nbReplicas)); + + var expectRefreshBlock = 0 < nbReplicas && minTransportVersion.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK); + assertThat(updatedClusterState.blocks().indices(), is(aMapWithSize(expectRefreshBlock ? 1 : 0))); + assertThat(updatedClusterState.blocks().hasIndexBlock("test", IndexMetadata.INDEX_REFRESH_BLOCK), is(expectRefreshBlock)); assertThat(updatedClusterState.routingTable().index("test"), is(notNullValue())); } } - public void testCreateRefreshBlockUponIndexCreationApplier() { + public void testCreateClusterBlocksTransformerForIndexCreation() { boolean isStateless = randomBoolean(); boolean useRefreshBlock = randomBoolean(); var minTransportVersion = TransportVersionUtils.randomCompatibleVersion(random()); From d83846593e2324605104dcaa5bf5b4333f82d50b Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 28 Nov 2024 16:04:40 +0100 Subject: [PATCH 07/10] debug --- .../cluster/metadata/MetadataCreateIndexService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 11b6aa6534be1..a8bf88cd84826 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -1773,7 +1773,7 @@ static ClusterBlocksTransformer createClusterBlocksTransformerForIndexCreation(S if (useRefreshBlock(settings) == false) { return (clusterBlocks, indexMetadata, minClusterTransportVersion) -> {}; } - logger.info("applying refresh block on index creation"); + logger.debug("applying refresh block on index creation"); return (clusterBlocks, indexMetadata, minClusterTransportVersion) -> { if (applyRefreshBlock(indexMetadata, minClusterTransportVersion)) { clusterBlocks.addIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK); From ccee9d13f8cfd390fc92e72a1e6bda305eb78d48 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 28 Nov 2024 16:26:31 +0100 Subject: [PATCH 08/10] nullable --- .../cluster/metadata/MetadataCreateIndexService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index a8bf88cd84826..eb393ffee5057 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -150,7 +150,7 @@ interface ClusterBlocksTransformer { private final boolean forbidPrivateIndexSettings; private final Set indexSettingProviders; private final ThreadPool threadPool; - private final @Nullable ClusterBlocksTransformer blocksTransformerUponIndexCreation; + private final ClusterBlocksTransformer blocksTransformerUponIndexCreation; public MetadataCreateIndexService( final Settings settings, From 7525d2dd7f8fbbbebcebaeff9213574a3de759a3 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 28 Nov 2024 16:44:10 +0100 Subject: [PATCH 09/10] nits --- .../cluster/metadata/MetadataCreateIndexService.java | 7 +++++-- .../org/elasticsearch/cluster/block/ClusterBlockTests.java | 1 - 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index eb393ffee5057..52e4d75ac5116 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -1776,6 +1776,8 @@ static ClusterBlocksTransformer createClusterBlocksTransformerForIndexCreation(S logger.debug("applying refresh block on index creation"); return (clusterBlocks, indexMetadata, minClusterTransportVersion) -> { if (applyRefreshBlock(indexMetadata, minClusterTransportVersion)) { + // Applies the INDEX_REFRESH_BLOCK to the index. This block will remain in cluster state until an unpromotable shard is + // started or a configurable delay is elapsed. clusterBlocks.addIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK); } }; @@ -1789,10 +1791,11 @@ private static boolean applyRefreshBlock(IndexMetadata indexMetadata, TransportV } private boolean assertHasRefreshBlock(IndexMetadata indexMetadata, ClusterState clusterState, TransportVersion minTransportVersion) { + var hasRefreshBlock = clusterState.blocks().hasIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK); if (useRefreshBlock(settings) == false || applyRefreshBlock(indexMetadata, minTransportVersion) == false) { - assert clusterState.blocks().hasIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK) == false; + assert hasRefreshBlock == false : indexMetadata.getIndex(); } else { - assert clusterState.blocks().hasIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK); + assert hasRefreshBlock : indexMetadata.getIndex(); } return true; } diff --git a/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java b/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java index fcbafd40cd093..0237fff8fdda5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java @@ -54,7 +54,6 @@ public void testSerialization() throws Exception { } public void testSerializationBwc() throws Exception { - // var out = new BytesStreamOutput(); out.setTransportVersion( randomVersionBetween(random(), getFirstVersion(), getPreviousVersion(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)) From 00d3dcec2fd31a2b42e1588b2e0a5a16148f164f Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 29 Nov 2024 08:21:56 +0100 Subject: [PATCH 10/10] revert StreamOutput --- .../elasticsearch/common/io/stream/StreamOutput.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 40a93beec8fc0..d724e5ea25ca6 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -15,7 +15,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -1168,19 +1167,12 @@ public void writeNamedWriteableCollection(Collection l writeCollection(list, StreamOutput::writeNamedWriteable); } - private static > boolean assertEnumToWrite(E enumValue, TransportVersion version) { - assert enumValue instanceof XContentType == false : "XContentHelper#writeTo should be used for XContentType serialisation"; - assert enumValue != ClusterBlockLevel.REFRESH || version.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK) - : "ClusterBlockLevel.REFRESH should only be sent to nodes with recent version"; - return true; - } - /** * Writes an enum with type {@code E} in terms of the value of its ordinal. Enums serialized like this must have a corresponding test * which uses {@code EnumSerializationTestUtils#assertEnumSerialization} to fix the wire protocol. */ public > void writeEnum(E enumValue) throws IOException { - assert assertEnumToWrite(enumValue, version); + assert enumValue instanceof XContentType == false : "XContentHelper#writeTo should be used for XContentType serialisation"; writeVInt(enumValue.ordinal()); } @@ -1193,7 +1185,7 @@ public > void writeOptionalEnum(@Nullable E enumValue) throws writeBoolean(false); } else { writeBoolean(true); - assert assertEnumToWrite(enumValue, version); + assert enumValue instanceof XContentType == false : "XContentHelper#writeTo should be used for XContentType serialisation"; writeVInt(enumValue.ordinal()); } }