Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ static TransportVersion def(int id) {
public static final TransportVersion QUERY_RULES_LIST_INCLUDES_TYPES = def(8_792_00_0);
public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS = def(8_793_00_0);
public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS_REVERT = def(8_794_00_0);
public static final TransportVersion FAST_REFRESH_RCO_2 = def(8_795_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -120,27 +119,18 @@ public void onPrimaryOperationComplete(
ActionListener<Void> listener
) {
assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed";
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(
clusterService.state().metadata().index(indexShardRoutingTable.shardId().getIndex()).getSettings()
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.primaryTerm(),
replicaRequest.primaryRefreshResult.generation(),
false
);
transportService.sendRequest(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
);

// Indices marked with fast refresh do not rely on refreshing the unpromotables
if (fastRefresh) {
listener.onResponse(null);
} else {
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.primaryTerm(),
replicaRequest.primaryRefreshResult.generation(),
false
);
transportService.sendRequest(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

import java.util.List;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<
UnpromotableShardRefreshRequest,
ActionResponse.Empty> {
Expand Down Expand Up @@ -73,6 +76,18 @@ protected void unpromotableShardOperation(
return;
}

// During an upgrade to FAST_REFRESH_RCO_2, we expect search shards to be first upgraded before the primary is upgraded. Thus,
// when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already.
// Note that the fast refresh setting is final.
// TODO: remove assertion (ES-9563)
assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false
|| transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_RCO_2)
: "attempted to refresh a fast refresh search shard "
+ shard
+ " on transport version "
+ transportService.getLocalNodeConnection().getTransportVersion()
+ " (before FAST_REFRESH_RCO_2)";

ActionListener.run(responseListener, listener -> {
shard.waitForPrimaryTermAndGeneration(
request.getPrimaryTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,10 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
// TODO: Re-evaluate assertion (ES-8227)
// assert indexShard.indexSettings().isFastRefresh() == false
// : "a search shard should not receive a TransportGetAction for an index with fast refresh";
handleGetOnUnpromotableShard(request, indexShard, listener);
return;
}
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
: "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting";
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,10 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
// TODO: Re-evaluate assertion (ES-8227)
// assert indexShard.indexSettings().isFastRefresh() == false
// : "a search shard should not receive a TransportShardMultiGetAction for an index with fast refresh";
handleMultiGetOnUnpromotableShard(request, indexShard, listener);
return;
}
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
: "in Stateless a promotable to primary shard can receive a TransportShardMultiGetAction only if an index has "
+ "the fast refresh setting";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -53,9 +52,7 @@ public void refreshShard(
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
@Override
public void onResponse(Boolean forced) {
// Fast refresh indices do not depend on the unpromotables being refreshed
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
if (location != null && (indexShard.routingEntry().isSearchable() == false && fastRefresh == false)) {
if (location != null && indexShard.routingEntry().isSearchable() == false) {
refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
} else {
listener.onResponse(forced);
Expand All @@ -68,9 +65,7 @@ public void onFailure(Exception e) {
}
});
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
// Fast refresh indices do not depend on the unpromotables being refreshed
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && fastRefresh == false) {
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
} else {
l.onResponse(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

public class OperationRouting {
Expand Down Expand Up @@ -305,8 +306,14 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null
}

public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) {
// TODO: remove if and always return isSearchable (ES-9563)
if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) {
return shardRouting.isPromotableToPrimary();
// Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally.
if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_RCO_2)) {
return shardRouting.isSearchable();
} else {
return shardRouting.isPromotableToPrimary();
}
} else {
return shardRouting.isSearchable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

/**
* This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
* <p>
Expand Down Expand Up @@ -105,10 +103,7 @@ static boolean shouldLoadRandomAccessFiltersEagerly(IndexSettings settings) {
boolean loadFiltersEagerlySetting = settings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
boolean isStateless = DiscoveryNode.isStateless(settings.getNodeSettings());
if (isStateless) {
return loadFiltersEagerlySetting
&& (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE)
|| (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.INDEX_ROLE)
&& INDEX_FAST_REFRESH_SETTING.get(settings.getSettings())));
return loadFiltersEagerlySetting && DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE);
} else {
return loadFiltersEagerlySetting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -19,6 +20,7 @@

import java.util.List;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -27,16 +29,22 @@
public class IndexRoutingTableTests extends ESTestCase {

public void testReadyForSearch() {
innerReadyForSearch(false);
innerReadyForSearch(true);
innerReadyForSearch(false, false);
innerReadyForSearch(false, true);
innerReadyForSearch(true, false);
innerReadyForSearch(true, true);
}

private void innerReadyForSearch(boolean fastRefresh) {
// TODO: remove if (fastRefresh && beforeFastRefreshRCO) branches (ES-9563)
private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshRCO) {
Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID());
ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS);
when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn(
Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build()
);
when(clusterState.getMinTransportVersion()).thenReturn(
beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_RCO_2.id() - 1_00_0) : TransportVersion.current()
);
// 2 primaries that are search and index
ShardId p1 = new ShardId(index, 0);
IndexShardRoutingTable shardTable1 = new IndexShardRoutingTable(
Expand All @@ -55,7 +63,7 @@ private void innerReadyForSearch(boolean fastRefresh) {
shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
if (fastRefresh && beforeFastRefreshRCO) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
Expand Down Expand Up @@ -91,7 +99,7 @@ private void innerReadyForSearch(boolean fastRefresh) {
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
if (fastRefresh && beforeFastRefreshRCO) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
Expand All @@ -118,8 +126,6 @@ private void innerReadyForSearch(boolean fastRefresh) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));

// 2 unassigned primaries that are index only with some replicas that are all available
// Fast refresh indices do not support replicas so this can not practically happen. If we add support we will want to ensure
// that readyForSearch allows for searching replicas when the index shard is not available.
shardTable1 = new IndexShardRoutingTable(
p1,
List.of(
Expand All @@ -137,8 +143,8 @@ private void innerReadyForSearch(boolean fastRefresh) {
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
assertFalse(indexRoutingTable.readyForSearch(clusterState)); // if we support replicas for fast refreshes this needs to change
if (fastRefresh && beforeFastRefreshRCO) {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
} else {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
import static org.elasticsearch.index.cache.bitset.BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -253,35 +252,21 @@ public void testShouldLoadRandomAccessFiltersEagerly() {
for (var hasIndexRole : values) {
for (var loadFiltersEagerly : values) {
for (var isStateless : values) {
for (var fastRefresh : values) {
if (isStateless == false && fastRefresh) {
// fast refresh is only relevant for stateless indices
continue;
}

boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly, fastRefresh)
);
if (isStateless) {
assertEquals(loadFiltersEagerly && ((hasIndexRole && fastRefresh) || hasIndexRole == false), result);
} else {
assertEquals(loadFiltersEagerly, result);
}
boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly)
);
if (isStateless) {
assertEquals(loadFiltersEagerly && hasIndexRole == false, result);
} else {
assertEquals(loadFiltersEagerly, result);
}
}
}
}
}

private IndexSettings bitsetFilterCacheSettings(
boolean isStateless,
boolean hasIndexRole,
boolean loadFiltersEagerly,
boolean fastRefresh
) {
var indexSettingsBuilder = Settings.builder()
.put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly)
.put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh);
private IndexSettings bitsetFilterCacheSettings(boolean isStateless, boolean hasIndexRole, boolean loadFiltersEagerly) {
var indexSettingsBuilder = Settings.builder().put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly);

var nodeSettingsBuilder = Settings.builder()
.putList(
Expand Down