Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_REMOVE_NODE_LEVEL_PLAN = def(8_800_00_0);
public static final TransportVersion LOGSDB_TELEMETRY_CUSTOM_CUTOFF_DATE = def(8_801_00_0);
public static final TransportVersion SOURCE_MODE_TELEMETRY = def(8_802_00_0);
public static final TransportVersion NEW_REFRESH_CLUSTER_BLOCK = def(8_803_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.cluster.block;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -21,6 +22,7 @@
import java.util.EnumSet;
import java.util.Locale;
import java.util.Objects;
import java.util.function.Predicate;

public class ClusterBlock implements Writeable, ToXContentFragment {

Expand Down Expand Up @@ -142,7 +144,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
out.writeOptionalString(uuid);
out.writeString(description);
out.writeEnumSet(levels);
if (out.getTransportVersion().onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)) {
out.writeEnumSet(levels);
} else {
// do not send ClusterBlockLevel.REFRESH to old nodes
out.writeEnumSet(filterLevels(levels, level -> ClusterBlockLevel.REFRESH.equals(level) == false));
}
out.writeBoolean(retryable);
out.writeBoolean(disableStatePersistence);
RestStatus.writeTo(out, status);
Expand Down Expand Up @@ -185,4 +192,19 @@ public int hashCode() {
public boolean isAllowReleaseResources() {
return allowReleaseResources;
}

static EnumSet<ClusterBlockLevel> filterLevels(EnumSet<ClusterBlockLevel> levels, Predicate<ClusterBlockLevel> predicate) {
assert levels != null;
int size = levels.size();
if (size == 0 || (size == 1 && predicate.test(levels.iterator().next()))) {
return levels;
}
var filteredLevels = EnumSet.noneOf(ClusterBlockLevel.class);
for (ClusterBlockLevel level : levels) {
if (predicate.test(level)) {
filteredLevels.add(level);
}
}
return filteredLevels;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public enum ClusterBlockLevel {
READ,
WRITE,
METADATA_READ,
METADATA_WRITE;
METADATA_WRITE,
REFRESH;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not check, but wonder if there are any greater than, less than comparisons against the "level", since the wording signals some order. Perhaps you can take a look (if you have not already)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and saw no comparisons based on the ordinal (and did not expect to saw one either)


public static final EnumSet<ClusterBlockLevel> ALL = EnumSet.allOf(ClusterBlockLevel.class);
public static final EnumSet<ClusterBlockLevel> READ_WRITE = EnumSet.of(READ, WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
RestStatus.TOO_MANY_REQUESTS,
EnumSet.of(ClusterBlockLevel.WRITE)
);
public static final ClusterBlock INDEX_REFRESH_BLOCK = new ClusterBlock(
14,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is with the hole here, just avoiding the unfortunate 13 or leaving room for one to go in between?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

13 is assigned to CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK in the Metadata class.

I was thinking of declaring all ids in the same constant class (all blocks are defined in server) as a possible follow up.

"index refresh blocked, waiting for shard(s) to be started",
true,
false,
false,
RestStatus.REQUEST_TIMEOUT,
EnumSet.of(ClusterBlockLevel.REFRESH)
);

// 'event.ingested' (part of Elastic Common Schema) range is tracked in cluster state, along with @timestamp
public static final String EVENT_INGESTED_FIELD_NAME = "event.ingested";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -127,6 +128,16 @@ public class MetadataCreateIndexService {

public static final int MAX_INDEX_NAME_BYTES = 255;

/**
* Name of the setting used to allow blocking refreshes on newly created indices.
*/
public static final String USE_INDEX_REFRESH_BLOCK_SETTING_NAME = "stateless.indices.use_refresh_block_upon_index_creation";

@FunctionalInterface
interface ClusterBlocksTransformer {
void apply(ClusterBlocks.Builder clusterBlocks, IndexMetadata indexMetadata, TransportVersion minClusterTransportVersion);
}

private final Settings settings;
private final ClusterService clusterService;
private final IndicesService indicesService;
Expand All @@ -139,6 +150,7 @@ public class MetadataCreateIndexService {
private final boolean forbidPrivateIndexSettings;
private final Set<IndexSettingProvider> indexSettingProviders;
private final ThreadPool threadPool;
private final ClusterBlocksTransformer blocksTransformerUponIndexCreation;

public MetadataCreateIndexService(
final Settings settings,
Expand Down Expand Up @@ -166,6 +178,7 @@ public MetadataCreateIndexService(
this.shardLimitValidator = shardLimitValidator;
this.indexSettingProviders = indexSettingProviders.getIndexSettingProviders();
this.threadPool = threadPool;
this.blocksTransformerUponIndexCreation = createClusterBlocksTransformerForIndexCreation(settings);
}

/**
Expand Down Expand Up @@ -540,8 +553,10 @@ private ClusterState applyCreateIndexWithTemporaryService(
currentState,
indexMetadata,
metadataTransformer,
blocksTransformerUponIndexCreation,
allocationService.getShardRoutingRoleStrategy()
);
assert assertHasRefreshBlock(indexMetadata, updated, updated.getMinTransportVersion());
if (request.performReroute()) {
updated = allocationService.reroute(updated, "index [" + indexMetadata.getIndex().getName() + "] created", rerouteListener);
}
Expand Down Expand Up @@ -1294,6 +1309,7 @@ static ClusterState clusterStateCreateIndex(
ClusterState currentState,
IndexMetadata indexMetadata,
BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer,
ClusterBlocksTransformer blocksTransformer,
ShardRoutingRoleStrategy shardRoutingRoleStrategy
) {
final Metadata newMetadata;
Expand All @@ -1307,6 +1323,9 @@ static ClusterState clusterStateCreateIndex(

var blocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());
blocksBuilder.updateBlocks(indexMetadata);
if (blocksTransformer != null) {
blocksTransformer.apply(blocksBuilder, indexMetadata, currentState.getMinTransportVersion());
}

var routingTableBuilder = RoutingTable.builder(shardRoutingRoleStrategy, currentState.routingTable())
.addAsNew(newMetadata.index(indexMetadata.getIndex().getName()));
Expand Down Expand Up @@ -1745,4 +1764,39 @@ public static void validateStoreTypeSetting(Settings indexSettings) {
);
}
}

private static boolean useRefreshBlock(Settings settings) {
return DiscoveryNode.isStateless(settings) && settings.getAsBoolean(USE_INDEX_REFRESH_BLOCK_SETTING_NAME, false);
}

static ClusterBlocksTransformer createClusterBlocksTransformerForIndexCreation(Settings settings) {
if (useRefreshBlock(settings) == false) {
return (clusterBlocks, indexMetadata, minClusterTransportVersion) -> {};
}
logger.debug("applying refresh block on index creation");
return (clusterBlocks, indexMetadata, minClusterTransportVersion) -> {
if (applyRefreshBlock(indexMetadata, minClusterTransportVersion)) {
// Applies the INDEX_REFRESH_BLOCK to the index. This block will remain in cluster state until an unpromotable shard is
// started or a configurable delay is elapsed.
clusterBlocks.addIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK);
}
};
}

private static boolean applyRefreshBlock(IndexMetadata indexMetadata, TransportVersion minClusterTransportVersion) {
return 0 < indexMetadata.getNumberOfReplicas() // index has replicas
&& indexMetadata.getResizeSourceIndex() == null // index is not a split/shrink index
&& indexMetadata.getInSyncAllocationIds().values().stream().allMatch(Set::isEmpty) // index is a new index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this ever be not true? Fine to keep ofc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this to be always true, but decided to copy the conditions from org.elasticsearch.cluster.routing.IndexRoutingTable.Builder#initializeEmpty for extra safety.

&& minClusterTransportVersion.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK);
}

private boolean assertHasRefreshBlock(IndexMetadata indexMetadata, ClusterState clusterState, TransportVersion minTransportVersion) {
var hasRefreshBlock = clusterState.blocks().hasIndexBlock(indexMetadata.getIndex().getName(), IndexMetadata.INDEX_REFRESH_BLOCK);
if (useRefreshBlock(settings) == false || applyRefreshBlock(indexMetadata, minTransportVersion) == false) {
assert hasRefreshBlock == false : indexMetadata.getIndex();
} else {
assert hasRefreshBlock : indexMetadata.getIndex();
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public void testToXContent() throws IOException {
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
},
Expand All @@ -180,7 +181,8 @@ public void testToXContent() throws IOException {
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
}
Expand Down Expand Up @@ -440,7 +442,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
},
Expand All @@ -453,7 +456,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
}
Expand Down Expand Up @@ -712,7 +716,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
},
Expand All @@ -725,7 +730,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@
package org.elasticsearch.cluster.block;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.EnumSet;
import java.util.Map;

import static java.util.EnumSet.copyOf;
import static org.elasticsearch.test.TransportVersionUtils.getFirstVersion;
import static org.elasticsearch.test.TransportVersionUtils.getPreviousVersion;
import static org.elasticsearch.test.TransportVersionUtils.randomVersion;
import static org.elasticsearch.test.TransportVersionUtils.randomVersionBetween;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
Expand All @@ -36,7 +39,7 @@ public void testSerialization() throws Exception {
int iterations = randomIntBetween(5, 20);
for (int i = 0; i < iterations; i++) {
TransportVersion version = randomVersion(random());
ClusterBlock clusterBlock = randomClusterBlock();
ClusterBlock clusterBlock = randomClusterBlock(version);

BytesStreamOutput out = new BytesStreamOutput();
out.setTransportVersion(version);
Expand All @@ -50,13 +53,41 @@ public void testSerialization() throws Exception {
}
}

public void testSerializationBwc() throws Exception {
var out = new BytesStreamOutput();
out.setTransportVersion(
randomVersionBetween(random(), getFirstVersion(), getPreviousVersion(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK))
);

var clusterBlock = randomClusterBlock(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK);
clusterBlock.writeTo(out);

var in = out.bytes().streamInput();
in.setTransportVersion(randomVersion());

assertClusterBlockEquals(
new ClusterBlock(
clusterBlock.id(),
clusterBlock.uuid(),
clusterBlock.description(),
clusterBlock.retryable(),
clusterBlock.disableStatePersistence(),
clusterBlock.isAllowReleaseResources(),
clusterBlock.status(),
// ClusterBlockLevel.REFRESH should not be sent over the wire to nodes with version < NEW_REFRESH_CLUSTER_BLOCK
ClusterBlock.filterLevels(clusterBlock.levels(), level -> ClusterBlockLevel.REFRESH.equals(level) == false)
),
new ClusterBlock(in)
);
}

public void testToStringDanglingComma() {
final ClusterBlock clusterBlock = randomClusterBlock();
final ClusterBlock clusterBlock = randomClusterBlock(randomVersion(random()));
assertThat(clusterBlock.toString(), not(endsWith(",")));
}

public void testGlobalBlocksCheckedIfNoIndicesSpecified() {
ClusterBlock globalBlock = randomClusterBlock();
ClusterBlock globalBlock = randomClusterBlock(randomVersion(random()));
ClusterBlocks clusterBlocks = new ClusterBlocks(Collections.singleton(globalBlock), Map.of());
ClusterBlockException exception = clusterBlocks.indicesBlockedException(randomFrom(globalBlock.levels()), new String[0]);
assertNotNull(exception);
Expand Down Expand Up @@ -113,9 +144,13 @@ public void testGetIndexBlockWithId() {
assertThat(builder.build().getIndexBlockWithId("index", randomValueOtherThan(blockId, ESTestCase::randomInt)), nullValue());
}

private static ClusterBlock randomClusterBlock() {
private static ClusterBlock randomClusterBlock(TransportVersion version) {
final String uuid = randomBoolean() ? UUIDs.randomBase64UUID() : null;
final List<ClusterBlockLevel> levels = Arrays.asList(ClusterBlockLevel.values());
final EnumSet<ClusterBlockLevel> levels = ClusterBlock.filterLevels(
EnumSet.allOf(ClusterBlockLevel.class),
// Filter out ClusterBlockLevel.REFRESH for versions < TransportVersions.NEW_REFRESH_CLUSTER_BLOCK
level -> ClusterBlockLevel.REFRESH.equals(level) == false || version.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)
);
return new ClusterBlock(
randomInt(),
uuid,
Expand Down
Loading