From e935cf0b09e05dfaeee49da254c9c19edbe4af6a Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Tue, 12 Nov 2024 13:24:39 +0200 Subject: [PATCH 1/2] Fast refresh indices to use search shards The changes of PR #115019 were reverted because it induced ES-8275. Now that the ticket is done, this PR re-introduces the reverted changes. Fast refresh indices should now behave like non fast refresh indices in how they execute (m)gets and searches. I.e., they should use the search shards. For BWC, we define a new transport version. We expect search shards to be upgraded first, before promotable shards. Until the cluster is fully upgraded, the promotable shards (whether upgraded or not) will still receive and execute gets/searches locally. Relates ES-9573 --- .../org/elasticsearch/TransportVersions.java | 1 + .../refresh/TransportShardRefreshAction.java | 32 +++++++----------- ...ansportUnpromotableShardRefreshAction.java | 15 +++++++++ .../action/get/TransportGetAction.java | 4 +-- .../get/TransportShardMultiGetAction.java | 4 +-- .../support/replication/PostWriteRefresh.java | 9 ++--- .../cluster/routing/OperationRouting.java | 9 ++++- .../index/cache/bitset/BitsetFilterCache.java | 7 +--- .../routing/IndexRoutingTableTests.java | 24 +++++++++----- .../cache/bitset/BitSetFilterCacheTests.java | 33 +++++-------------- 10 files changed, 64 insertions(+), 74 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 5f3b466f9f7bd..249ee8bbfe332 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -193,6 +193,7 @@ static TransportVersion def(int id) { public static final TransportVersion ROLE_MONITOR_STATS = def(8_787_00_0); public static final TransportVersion DATA_STREAM_INDEX_VERSION_DEPRECATION_CHECK = def(8_788_00_0); public static final TransportVersion ADD_COMPATIBILITY_VERSIONS_TO_NODE_INFO = def(8_789_00_0); + public static final TransportVersion FAST_REFRESH_ADAPT = def(8_790_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 7857e9a22e9b9..cb667400240f0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.injection.guice.Inject; @@ -120,27 +119,18 @@ public void onPrimaryOperationComplete( ActionListener listener ) { assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed"; - boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get( - clusterService.state().metadata().index(indexShardRoutingTable.shardId().getIndex()).getSettings() + UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest( + indexShardRoutingTable, + replicaRequest.primaryRefreshResult.primaryTerm(), + replicaRequest.primaryRefreshResult.generation(), + false + ); + transportService.sendRequest( + transportService.getLocalNode(), + TransportUnpromotableShardRefreshAction.NAME, + unpromotableReplicaRequest, + new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor) ); - - // Indices marked with fast refresh do not rely on refreshing the unpromotables - if (fastRefresh) { - listener.onResponse(null); - } else { - UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest( - indexShardRoutingTable, - replicaRequest.primaryRefreshResult.primaryTerm(), - replicaRequest.primaryRefreshResult.generation(), - false - ); - transportService.sendRequest( - transportService.getLocalNode(), - TransportUnpromotableShardRefreshAction.NAME, - unpromotableReplicaRequest, - new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor) - ); - } } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java index 6c24ec2d17604..59b458a3e1a9e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java @@ -24,6 +24,9 @@ import java.util.List; +import static org.elasticsearch.TransportVersions.FAST_REFRESH_ADAPT; +import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; + public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction< UnpromotableShardRefreshRequest, ActionResponse.Empty> { @@ -73,6 +76,18 @@ protected void unpromotableShardOperation( return; } + // During an upgrade to FAST_REFRESH_ADAPT, we expect search shards to be first upgraded before the primary is upgraded. Thus, + // when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already. + // Note that the fast refresh setting is final. + // TODO: remove assertion (ES-9563) + assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false + || transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_ADAPT) + : "attempted to refresh a fast refresh search shard " + + shard + + " on transport version " + + transportService.getLocalNodeConnection().getTransportVersion() + + " (before FAST_REFRESH_ADAPT)"; + ActionListener.run(responseListener, listener -> { shard.waitForPrimaryTermAndGeneration( request.getPrimaryTerm(), diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 9e535344c9589..fb4b3907d2bfd 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -126,12 +126,10 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); if (indexShard.routingEntry().isPromotableToPrimary() == false) { - // TODO: Re-evaluate assertion (ES-8227) - // assert indexShard.indexSettings().isFastRefresh() == false - // : "a search shard should not receive a TransportGetAction for an index with fast refresh"; handleGetOnUnpromotableShard(request, indexShard, listener); return; } + // TODO: adapt assertion to assert only that it is not stateless (ES-9563) assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh() : "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting"; if (request.realtime()) { // we are not tied to a refresh cycle here anyway diff --git a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java index 34b3ae50e0b51..633e7ef6793ab 100644 --- a/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java @@ -124,12 +124,10 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); if (indexShard.routingEntry().isPromotableToPrimary() == false) { - // TODO: Re-evaluate assertion (ES-8227) - // assert indexShard.indexSettings().isFastRefresh() == false - // : "a search shard should not receive a TransportShardMultiGetAction for an index with fast refresh"; handleMultiGetOnUnpromotableShard(request, indexShard, listener); return; } + // TODO: adapt assertion to assert only that it is not stateless (ES-9563) assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh() : "in Stateless a promotable to primary shard can receive a TransportShardMultiGetAction only if an index has " + "the fast refresh setting"; diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java index 683c3589c893d..7414aeeb2c405 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; @@ -53,9 +52,7 @@ public void refreshShard( case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() { @Override public void onResponse(Boolean forced) { - // Fast refresh indices do not depend on the unpromotables being refreshed - boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings()); - if (location != null && (indexShard.routingEntry().isSearchable() == false && fastRefresh == false)) { + if (location != null && indexShard.routingEntry().isSearchable() == false) { refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout); } else { listener.onResponse(forced); @@ -68,9 +65,7 @@ public void onFailure(Exception e) { } }); case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> { - // Fast refresh indices do not depend on the unpromotables being refreshed - boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings()); - if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && fastRefresh == false) { + if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) { sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout); } else { l.onResponse(true); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index f7812d284f2af..ea6b93666ffd5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.elasticsearch.TransportVersions.FAST_REFRESH_ADAPT; import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; public class OperationRouting { @@ -305,8 +306,14 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null } public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) { + // TODO: remove if and always return isSearchable (ES-9563) if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) { - return shardRouting.isPromotableToPrimary(); + // Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally. + if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_ADAPT)) { + return shardRouting.isSearchable(); + } else { + return shardRouting.isPromotableToPrimary(); + } } else { return shardRouting.isSearchable(); } diff --git a/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java b/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java index 5277999271984..5792cafb91b77 100644 --- a/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java +++ b/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java @@ -58,8 +58,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; - /** * This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time. *

@@ -105,10 +103,7 @@ static boolean shouldLoadRandomAccessFiltersEagerly(IndexSettings settings) { boolean loadFiltersEagerlySetting = settings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING); boolean isStateless = DiscoveryNode.isStateless(settings.getNodeSettings()); if (isStateless) { - return loadFiltersEagerlySetting - && (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE) - || (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.INDEX_ROLE) - && INDEX_FAST_REFRESH_SETTING.get(settings.getSettings()))); + return loadFiltersEagerlySetting && DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE); } else { return loadFiltersEagerlySetting; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java index 21b30557cafea..b949bf9c2bb03 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.TransportVersion; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -19,6 +20,7 @@ import java.util.List; +import static org.elasticsearch.TransportVersions.FAST_REFRESH_ADAPT; import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -27,16 +29,22 @@ public class IndexRoutingTableTests extends ESTestCase { public void testReadyForSearch() { - innerReadyForSearch(false); - innerReadyForSearch(true); + innerReadyForSearch(false, false); + innerReadyForSearch(false, true); + innerReadyForSearch(true, false); + innerReadyForSearch(true, true); } - private void innerReadyForSearch(boolean fastRefresh) { + // TODO: remove if (fastRefresh && beforeFastRefreshRCO) branches (ES-9563) + private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshRCO) { Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID()); ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS); when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn( Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build() ); + when(clusterState.getMinTransportVersion()).thenReturn( + beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_ADAPT.id() - 1_00_0) : TransportVersion.current() + ); // 2 primaries that are search and index ShardId p1 = new ShardId(index, 0); IndexShardRoutingTable shardTable1 = new IndexShardRoutingTable( @@ -55,7 +63,7 @@ private void innerReadyForSearch(boolean fastRefresh) { shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY))); shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY))); indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 }); - if (fastRefresh) { + if (fastRefresh && beforeFastRefreshRCO) { assertTrue(indexRoutingTable.readyForSearch(clusterState)); } else { assertFalse(indexRoutingTable.readyForSearch(clusterState)); @@ -91,7 +99,7 @@ private void innerReadyForSearch(boolean fastRefresh) { ) ); indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 }); - if (fastRefresh) { + if (fastRefresh && beforeFastRefreshRCO) { assertTrue(indexRoutingTable.readyForSearch(clusterState)); } else { assertFalse(indexRoutingTable.readyForSearch(clusterState)); @@ -118,8 +126,6 @@ private void innerReadyForSearch(boolean fastRefresh) { assertTrue(indexRoutingTable.readyForSearch(clusterState)); // 2 unassigned primaries that are index only with some replicas that are all available - // Fast refresh indices do not support replicas so this can not practically happen. If we add support we will want to ensure - // that readyForSearch allows for searching replicas when the index shard is not available. shardTable1 = new IndexShardRoutingTable( p1, List.of( @@ -137,8 +143,8 @@ private void innerReadyForSearch(boolean fastRefresh) { ) ); indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 }); - if (fastRefresh) { - assertFalse(indexRoutingTable.readyForSearch(clusterState)); // if we support replicas for fast refreshes this needs to change + if (fastRefresh && beforeFastRefreshRCO) { + assertFalse(indexRoutingTable.readyForSearch(clusterState)); } else { assertTrue(indexRoutingTable.readyForSearch(clusterState)); } diff --git a/server/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java b/server/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java index d7d5c886e0741..b12cd256eebcc 100644 --- a/server/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java +++ b/server/src/test/java/org/elasticsearch/index/cache/bitset/BitSetFilterCacheTests.java @@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME; -import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; import static org.elasticsearch.index.cache.bitset.BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -273,35 +272,21 @@ public void testShouldLoadRandomAccessFiltersEagerly() { for (var hasIndexRole : values) { for (var loadFiltersEagerly : values) { for (var isStateless : values) { - for (var fastRefresh : values) { - if (isStateless == false && fastRefresh) { - // fast refresh is only relevant for stateless indices - continue; - } - - boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly( - bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly, fastRefresh) - ); - if (isStateless) { - assertEquals(loadFiltersEagerly && ((hasIndexRole && fastRefresh) || hasIndexRole == false), result); - } else { - assertEquals(loadFiltersEagerly, result); - } + boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly( + bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly) + ); + if (isStateless) { + assertEquals(loadFiltersEagerly && hasIndexRole == false, result); + } else { + assertEquals(loadFiltersEagerly, result); } } } } } - private IndexSettings bitsetFilterCacheSettings( - boolean isStateless, - boolean hasIndexRole, - boolean loadFiltersEagerly, - boolean fastRefresh - ) { - var indexSettingsBuilder = Settings.builder() - .put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly) - .put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh); + private IndexSettings bitsetFilterCacheSettings(boolean isStateless, boolean hasIndexRole, boolean loadFiltersEagerly) { + var indexSettingsBuilder = Settings.builder().put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly); var nodeSettingsBuilder = Settings.builder() .putList( From 46f330e54f1c233206d1e1718d5f233fb0c80073 Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Thu, 14 Nov 2024 11:03:56 +0200 Subject: [PATCH 2/2] FAST_REFRESH_RCO_2 --- .../main/java/org/elasticsearch/TransportVersions.java | 2 +- .../refresh/TransportUnpromotableShardRefreshAction.java | 8 ++++---- .../elasticsearch/cluster/routing/OperationRouting.java | 4 ++-- .../cluster/routing/IndexRoutingTableTests.java | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 94e33ac5deb1a..41c6e80829dce 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -197,7 +197,7 @@ static TransportVersion def(int id) { public static final TransportVersion VERTEX_AI_INPUT_TYPE_ADDED = def(8_790_00_0); public static final TransportVersion SKIP_INNER_HITS_SEARCH_SOURCE = def(8_791_00_0); public static final TransportVersion QUERY_RULES_LIST_INCLUDES_TYPES = def(8_792_00_0); - public static final TransportVersion FAST_REFRESH_ADAPT = def(8_793_00_0); + public static final TransportVersion FAST_REFRESH_RCO_2 = def(8_793_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java index 59b458a3e1a9e..4458c008babcd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java @@ -24,7 +24,7 @@ import java.util.List; -import static org.elasticsearch.TransportVersions.FAST_REFRESH_ADAPT; +import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2; import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction< @@ -76,17 +76,17 @@ protected void unpromotableShardOperation( return; } - // During an upgrade to FAST_REFRESH_ADAPT, we expect search shards to be first upgraded before the primary is upgraded. Thus, + // During an upgrade to FAST_REFRESH_RCO_2, we expect search shards to be first upgraded before the primary is upgraded. Thus, // when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already. // Note that the fast refresh setting is final. // TODO: remove assertion (ES-9563) assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false - || transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_ADAPT) + || transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_RCO_2) : "attempted to refresh a fast refresh search shard " + shard + " on transport version " + transportService.getLocalNodeConnection().getTransportVersion() - + " (before FAST_REFRESH_ADAPT)"; + + " (before FAST_REFRESH_RCO_2)"; ActionListener.run(responseListener, listener -> { shard.waitForPrimaryTermAndGeneration( diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index ea6b93666ffd5..13fc874f52e9f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -32,7 +32,7 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.elasticsearch.TransportVersions.FAST_REFRESH_ADAPT; +import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2; import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; public class OperationRouting { @@ -309,7 +309,7 @@ public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clu // TODO: remove if and always return isSearchable (ES-9563) if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) { // Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally. - if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_ADAPT)) { + if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_RCO_2)) { return shardRouting.isSearchable(); } else { return shardRouting.isPromotableToPrimary(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java index b949bf9c2bb03..e5786b1b3449e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTableTests.java @@ -20,7 +20,7 @@ import java.util.List; -import static org.elasticsearch.TransportVersions.FAST_REFRESH_ADAPT; +import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO_2; import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -43,7 +43,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build() ); when(clusterState.getMinTransportVersion()).thenReturn( - beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_ADAPT.id() - 1_00_0) : TransportVersion.current() + beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_RCO_2.id() - 1_00_0) : TransportVersion.current() ); // 2 primaries that are search and index ShardId p1 = new ShardId(index, 0);