Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_REMOVE_NODE_LEVEL_PLAN = def(8_800_00_0);
public static final TransportVersion LOGSDB_TELEMETRY_CUSTOM_CUTOFF_DATE = def(8_801_00_0);
public static final TransportVersion SOURCE_MODE_TELEMETRY = def(8_802_00_0);
public static final TransportVersion NEW_REFRESH_CLUSTER_BLOCK = def(8_803_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.cluster.block;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -21,6 +22,7 @@
import java.util.EnumSet;
import java.util.Locale;
import java.util.Objects;
import java.util.function.Predicate;

public class ClusterBlock implements Writeable, ToXContentFragment {

Expand Down Expand Up @@ -142,7 +144,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
out.writeOptionalString(uuid);
out.writeString(description);
out.writeEnumSet(levels);
if (out.getTransportVersion().onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)) {
out.writeEnumSet(levels);
} else {
// do not send ClusterBlockLevel.REFRESH to old nodes
out.writeEnumSet(filterLevels(levels, level -> ClusterBlockLevel.REFRESH.equals(level) == false));
}
out.writeBoolean(retryable);
out.writeBoolean(disableStatePersistence);
RestStatus.writeTo(out, status);
Expand Down Expand Up @@ -185,4 +192,19 @@ public int hashCode() {
public boolean isAllowReleaseResources() {
return allowReleaseResources;
}

static EnumSet<ClusterBlockLevel> filterLevels(EnumSet<ClusterBlockLevel> levels, Predicate<ClusterBlockLevel> predicate) {
assert levels != null;
int size = levels.size();
if (size == 0 || (size == 1 && predicate.test(levels.iterator().next()))) {
return levels;
}
var filteredLevels = EnumSet.noneOf(ClusterBlockLevel.class);
for (ClusterBlockLevel level : levels) {
if (predicate.test(level)) {
filteredLevels.add(level);
}
}
return filteredLevels;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public enum ClusterBlockLevel {
READ,
WRITE,
METADATA_READ,
METADATA_WRITE;
METADATA_WRITE,
REFRESH;

public static final EnumSet<ClusterBlockLevel> ALL = EnumSet.allOf(ClusterBlockLevel.class);
public static final EnumSet<ClusterBlockLevel> READ_WRITE = EnumSet.of(READ, WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
RestStatus.TOO_MANY_REQUESTS,
EnumSet.of(ClusterBlockLevel.WRITE)
);
public static final ClusterBlock INDEX_REFRESH_BLOCK = new ClusterBlock(
14,
"index refresh blocked, waiting for shard(s) to be started",
true,
false,
false,
RestStatus.REQUEST_TIMEOUT,
EnumSet.of(ClusterBlockLevel.REFRESH)
);

// 'event.ingested' (part of Elastic Common Schema) range is tracked in cluster state, along with @timestamp
public static final String EVENT_INGESTED_FIELD_NAME = "event.ingested";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public void testToXContent() throws IOException {
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
},
Expand All @@ -180,7 +181,8 @@ public void testToXContent() throws IOException {
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
}
Expand Down Expand Up @@ -440,7 +442,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
},
Expand All @@ -453,7 +456,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
}
Expand Down Expand Up @@ -712,7 +716,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
},
Expand All @@ -725,7 +730,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
"read",
"write",
"metadata_read",
"metadata_write"
"metadata_write",
"refresh"
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@
package org.elasticsearch.cluster.block;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.EnumSet;
import java.util.Map;

import static java.util.EnumSet.copyOf;
import static org.elasticsearch.test.TransportVersionUtils.getFirstVersion;
import static org.elasticsearch.test.TransportVersionUtils.getPreviousVersion;
import static org.elasticsearch.test.TransportVersionUtils.randomVersion;
import static org.elasticsearch.test.TransportVersionUtils.randomVersionBetween;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
Expand All @@ -36,7 +39,7 @@ public void testSerialization() throws Exception {
int iterations = randomIntBetween(5, 20);
for (int i = 0; i < iterations; i++) {
TransportVersion version = randomVersion(random());
ClusterBlock clusterBlock = randomClusterBlock();
ClusterBlock clusterBlock = randomClusterBlock(version);

BytesStreamOutput out = new BytesStreamOutput();
out.setTransportVersion(version);
Expand All @@ -50,13 +53,41 @@ public void testSerialization() throws Exception {
}
}

public void testSerializationBwc() throws Exception {
var out = new BytesStreamOutput();
out.setTransportVersion(
randomVersionBetween(random(), getFirstVersion(), getPreviousVersion(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK))
);

var clusterBlock = randomClusterBlock(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK);
clusterBlock.writeTo(out);

var in = out.bytes().streamInput();
in.setTransportVersion(randomVersion());

assertClusterBlockEquals(
new ClusterBlock(
clusterBlock.id(),
clusterBlock.uuid(),
clusterBlock.description(),
clusterBlock.retryable(),
clusterBlock.disableStatePersistence(),
clusterBlock.isAllowReleaseResources(),
clusterBlock.status(),
// ClusterBlockLevel.REFRESH should not be sent over the wire to nodes with version < NEW_REFRESH_CLUSTER_BLOCK
ClusterBlock.filterLevels(clusterBlock.levels(), level -> ClusterBlockLevel.REFRESH.equals(level) == false)
),
new ClusterBlock(in)
);
}

public void testToStringDanglingComma() {
final ClusterBlock clusterBlock = randomClusterBlock();
final ClusterBlock clusterBlock = randomClusterBlock(randomVersion(random()));
assertThat(clusterBlock.toString(), not(endsWith(",")));
}

public void testGlobalBlocksCheckedIfNoIndicesSpecified() {
ClusterBlock globalBlock = randomClusterBlock();
ClusterBlock globalBlock = randomClusterBlock(randomVersion(random()));
ClusterBlocks clusterBlocks = new ClusterBlocks(Collections.singleton(globalBlock), Map.of());
ClusterBlockException exception = clusterBlocks.indicesBlockedException(randomFrom(globalBlock.levels()), new String[0]);
assertNotNull(exception);
Expand Down Expand Up @@ -113,9 +144,13 @@ public void testGetIndexBlockWithId() {
assertThat(builder.build().getIndexBlockWithId("index", randomValueOtherThan(blockId, ESTestCase::randomInt)), nullValue());
}

private static ClusterBlock randomClusterBlock() {
private static ClusterBlock randomClusterBlock(TransportVersion version) {
final String uuid = randomBoolean() ? UUIDs.randomBase64UUID() : null;
final List<ClusterBlockLevel> levels = Arrays.asList(ClusterBlockLevel.values());
final EnumSet<ClusterBlockLevel> levels = ClusterBlock.filterLevels(
EnumSet.allOf(ClusterBlockLevel.class),
// Filter out ClusterBlockLevel.REFRESH for versions < TransportVersions.NEW_REFRESH_CLUSTER_BLOCK
level -> ClusterBlockLevel.REFRESH.equals(level) == false || version.onOrAfter(TransportVersions.NEW_REFRESH_CLUSTER_BLOCK)
);
return new ClusterBlock(
randomInt(),
uuid,
Expand Down