Skip to content

Commit cf9953f

Browse files
committed
Defer reroute when starting shards
Today we reroute the cluster as part of the process of starting a shard, which runs at `URGENT` priority. In large clusters, rerouting may take some time to complete, and this means that a mere trickle of shard-started events can cause starvation for other, lower-priority, tasks that are pending on the master. However, it isn't really necessary to perform a reroute when starting a shard, as long as one occurs eventually. This commit removes the inline reroute from the process of starting a shard and replaces it with a deferred one that runs at `NORMAL` priority, avoiding starvation of higher-priority tasks. This may improve some of the situations related to elastic#42738 and elastic#42105.
1 parent 61b6a9b commit cf9953f

File tree

61 files changed

+462
-515
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+462
-515
lines changed

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.common.inject.Inject;
4848
import org.elasticsearch.common.io.stream.StreamInput;
4949
import org.elasticsearch.common.io.stream.StreamOutput;
50+
import org.elasticsearch.common.settings.Setting;
5051
import org.elasticsearch.common.unit.TimeValue;
5152
import org.elasticsearch.index.shard.ShardId;
5253
import org.elasticsearch.node.NodeClosedException;
@@ -71,6 +72,7 @@
7172
import java.util.Objects;
7273
import java.util.Set;
7374
import java.util.function.Predicate;
75+
import java.util.function.Supplier;
7476

7577
public class ShardStateAction {
7678

@@ -79,10 +81,23 @@ public class ShardStateAction {
7981
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
8082
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
8183

84+
/**
85+
* Adjusts the priority of the followup reroute task. NORMAL is right for reasonable clusters, but in a badly configured cluster it may
86+
* be necessary to raise this higher to recover the older behaviour of rerouting after processing every shard-started task. Deliberately
87+
* undocumented, since this is a last-resort escape hatch for experts rather than something we want to expose to anyone, and deprecated
88+
* since we will remove it once we have confirmed from experience that this priority is appropriate in all cases.
89+
*/
90+
public static final Setting<Priority> FOLLOW_UP_REROUTE_PRIORITY_SETTING
91+
= new Setting<>("cluster.routing.allocation.shard_state.reroute.priority", Priority.NORMAL.toString(),
92+
name -> Priority.valueOf(name.toUpperCase(Locale.ROOT)),
93+
Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated);
94+
8295
private final TransportService transportService;
8396
private final ClusterService clusterService;
8497
private final ThreadPool threadPool;
8598

99+
private volatile Priority followUpRerouteTaskPriority;
100+
86101
// a list of shards that failed during replication
87102
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
88103
private final TransportRequestDeduplicator<FailedShardEntry> remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>();
@@ -94,11 +109,18 @@ public ShardStateAction(ClusterService clusterService, TransportService transpor
94109
this.clusterService = clusterService;
95110
this.threadPool = threadPool;
96111

112+
followUpRerouteTaskPriority = FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(clusterService.getSettings());
113+
clusterService.getClusterSettings().addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING,
114+
this::setFollowUpRerouteTaskPriority);
115+
97116
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ThreadPool.Names.SAME, StartedShardEntry::new,
98-
new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
117+
new ShardStartedTransportHandler(clusterService,
118+
new ShardStartedClusterStateTaskExecutor(allocationService, rerouteService, () -> followUpRerouteTaskPriority, logger),
119+
logger));
99120
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ThreadPool.Names.SAME, FailedShardEntry::new,
100121
new ShardFailedTransportHandler(clusterService,
101-
new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, logger), logger));
122+
new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, () -> followUpRerouteTaskPriority, logger),
123+
logger));
102124
}
103125

104126
private void sendShardAction(final String actionName, final ClusterState currentState,
@@ -215,6 +237,10 @@ public void onTimeout(TimeValue timeout) {
215237
}, changePredicate);
216238
}
217239

240+
private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) {
241+
this.followUpRerouteTaskPriority = followUpRerouteTaskPriority;
242+
}
243+
218244
private static class ShardFailedTransportHandler implements TransportRequestHandler<FailedShardEntry> {
219245
private final ClusterService clusterService;
220246
private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor;
@@ -282,11 +308,14 @@ public static class ShardFailedClusterStateTaskExecutor implements ClusterStateT
282308
private final AllocationService allocationService;
283309
private final RerouteService rerouteService;
284310
private final Logger logger;
311+
private final Supplier<Priority> prioritySupplier;
285312

286-
public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, Logger logger) {
313+
public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService,
314+
Supplier<Priority> prioritySupplier, Logger logger) {
287315
this.allocationService = allocationService;
288316
this.rerouteService = rerouteService;
289317
this.logger = logger;
318+
this.prioritySupplier = prioritySupplier;
290319
}
291320

292321
@Override
@@ -380,7 +409,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
380409
// assign it again, even if that means putting it back on the node on which it previously failed:
381410
final String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
382411
logger.trace("{}, scheduling a reroute", reason);
383-
rerouteService.reroute(reason, Priority.HIGH, ActionListener.wrap(
412+
rerouteService.reroute(reason, prioritySupplier.get(), ActionListener.wrap(
384413
r -> logger.trace("{}, reroute completed", reason),
385414
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
386415
}
@@ -511,10 +540,15 @@ public static class ShardStartedClusterStateTaskExecutor
511540
implements ClusterStateTaskExecutor<StartedShardEntry>, ClusterStateTaskListener {
512541
private final AllocationService allocationService;
513542
private final Logger logger;
543+
private final RerouteService rerouteService;
544+
private final Supplier<Priority> prioritySupplier;
514545

515-
public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, Logger logger) {
546+
public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService,
547+
Supplier<Priority> prioritySupplier, Logger logger) {
516548
this.allocationService = allocationService;
517549
this.logger = logger;
550+
this.rerouteService = rerouteService;
551+
this.prioritySupplier = prioritySupplier;
518552
}
519553

520554
@Override
@@ -589,6 +623,15 @@ public void onFailure(String source, Exception e) {
589623
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
590624
}
591625
}
626+
627+
@Override
628+
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
629+
if (clusterChangedEvent.previousState() != clusterChangedEvent.state()) {
630+
rerouteService.reroute("reroute after starting shards", prioritySupplier.get(), ActionListener.wrap(
631+
r -> logger.trace("reroute after starting shards succeeded"),
632+
e -> logger.debug("reroute after starting shards failed", e)));
633+
}
634+
}
592635
}
593636

594637
public static class StartedShardEntry extends TransportRequest {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRout
109109
Collections.sort(startedShards, Comparator.comparing(ShardRouting::primary));
110110
applyStartedShards(allocation, startedShards);
111111
gatewayAllocator.applyStartedShards(allocation, startedShards);
112-
reroute(allocation);
113112
String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString());
114113
return buildResultAndLogHealthChange(clusterState, allocation, "shards started [" + startedShardsAsString + "] ...");
115114
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.cluster.InternalClusterInfoService;
3232
import org.elasticsearch.cluster.NodeConnectionsService;
3333
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
34+
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3435
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
3536
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper;
3637
import org.elasticsearch.cluster.coordination.Coordinator;
@@ -213,6 +214,7 @@ public void apply(Settings value, Settings current, Settings previous) {
213214
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
214215
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
215216
SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,
217+
ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING,
216218
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING,
217219
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING,
218220
DestructiveOperations.REQUIRES_NAME_SETTING,

server/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeActionTests.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.action.support.ActiveShardCount;
2626
import org.elasticsearch.cluster.ClusterName;
2727
import org.elasticsearch.cluster.ClusterState;
28+
import org.elasticsearch.cluster.ESAllocationTestCase;
2829
import org.elasticsearch.cluster.EmptyClusterInfoService;
2930
import org.elasticsearch.cluster.block.ClusterBlocks;
3031
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -33,7 +34,6 @@
3334
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
3435
import org.elasticsearch.cluster.node.DiscoveryNodes;
3536
import org.elasticsearch.cluster.routing.RoutingTable;
36-
import org.elasticsearch.cluster.routing.ShardRoutingState;
3737
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3838
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
3939
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@@ -114,8 +114,7 @@ public void testErrorCondition() {
114114
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
115115
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
116116
// now we start the shard
117-
routingTable = service.applyStartedShards(clusterState,
118-
routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
117+
routingTable = ESAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, "source").routingTable();
119118
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
120119

121120
TransportResizeAction.prepareCreateIndexRequest(new ResizeRequest("target", "source"), clusterState,
@@ -133,8 +132,7 @@ public void testPassNumRoutingShards() {
133132
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
134133
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
135134
// now we start the shard
136-
routingTable = service.applyStartedShards(clusterState,
137-
routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
135+
routingTable = ESAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, "source").routingTable();
138136
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
139137

140138
ResizeRequest resizeRequest = new ResizeRequest("target", "source");
@@ -163,8 +161,7 @@ public void testPassNumRoutingShardsAndFail() {
163161
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
164162
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
165163
// now we start the shard
166-
routingTable = service.applyStartedShards(clusterState,
167-
routingTable.index("source").shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
164+
routingTable = ESAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, "source").routingTable();
168165
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
169166

170167
ResizeRequest resizeRequest = new ResizeRequest("target", "source");
@@ -198,8 +195,7 @@ public void testShrinkIndexSettings() {
198195
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
199196
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
200197
// now we start the shard
201-
routingTable = service.applyStartedShards(clusterState,
202-
routingTable.index(indexName).shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
198+
routingTable = ESAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, indexName).routingTable();
203199
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
204200
int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards();
205201
DocsStats stats = new DocsStats(between(0, (IndexWriter.MAX_DOCS) / numSourceShards), between(1, 1000), between(1, 10000));

server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ public void testNoRerouteOnStaleClusterState() {
466466
ShardRouting relocationTarget = clusterService.state().getRoutingTable().shardRoutingTable(shardId)
467467
.shardsWithState(ShardRoutingState.INITIALIZING).get(0);
468468
AllocationService allocationService = ESAllocationTestCase.createAllocationService();
469-
ClusterState updatedState = allocationService.applyStartedShards(state, Collections.singletonList(relocationTarget));
469+
ClusterState updatedState = ESAllocationTestCase.startShardsAndReroute(allocationService, state, relocationTarget);
470470

471471
setState(clusterService, updatedState);
472472
logger.debug("--> relocation complete state:\n{}", clusterService.state());

server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.cluster.node.DiscoveryNodes;
3333
import org.elasticsearch.cluster.routing.GroupShardsIterator;
3434
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
35-
import org.elasticsearch.cluster.routing.RoutingNodes;
3635
import org.elasticsearch.cluster.routing.RoutingTable;
3736
import org.elasticsearch.cluster.routing.ShardIterator;
3837
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -42,6 +41,7 @@
4241
import org.elasticsearch.cluster.routing.allocation.FailedShard;
4342
import org.elasticsearch.cluster.routing.allocation.StaleShard;
4443
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
44+
import org.elasticsearch.common.Priority;
4545
import org.elasticsearch.common.UUIDs;
4646
import org.elasticsearch.common.collect.Tuple;
4747
import org.elasticsearch.common.settings.Settings;
@@ -50,14 +50,12 @@
5050
import org.elasticsearch.index.shard.ShardId;
5151

5252
import java.util.ArrayList;
53-
import java.util.Arrays;
5453
import java.util.Collections;
5554
import java.util.List;
5655
import java.util.Set;
5756
import java.util.stream.Collectors;
5857
import java.util.stream.IntStream;
5958

60-
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
6159
import static org.hamcrest.CoreMatchers.equalTo;
6260
import static org.hamcrest.CoreMatchers.instanceOf;
6361
import static org.hamcrest.Matchers.contains;
@@ -89,7 +87,7 @@ public void setUp() throws Exception {
8987
.build();
9088
clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
9189
.metaData(metaData).routingTable(routingTable).build();
92-
executor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger);
90+
executor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, () -> Priority.NORMAL, logger);
9391
}
9492

9593
public void testEmptyTaskListProducesSameClusterState() throws Exception {
@@ -121,7 +119,7 @@ public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Excepti
121119
List<FailedShardEntry> failingTasks = createExistingShards(currentState, reason);
122120
List<FailedShardEntry> nonExistentTasks = createNonExistentShards(currentState, reason);
123121
ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor =
124-
new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) {
122+
new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, () -> Priority.NORMAL, logger) {
125123
@Override
126124
ClusterState applyFailedShards(ClusterState currentState, List<FailedShard> failedShards, List<StaleShard> staleShards) {
127125
throw new RuntimeException("simulated applyFailedShards failure");
@@ -165,22 +163,22 @@ public void testIllegalShardFailureRequests() throws Exception {
165163
public void testMarkAsStaleWhenFailingShard() throws Exception {
166164
final MockAllocationService allocation = createAllocationService();
167165
ClusterState clusterState = createClusterStateWithStartedShards("test markAsStale");
168-
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
166+
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
169167
IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index(INDEX).shard(0);
170168
long primaryTerm = clusterState.metaData().index(INDEX).primaryTerm(0);
171169
final Set<String> oldInSync = clusterState.metaData().index(INDEX).inSyncAllocationIds(0);
172170
{
173171
ShardStateAction.FailedShardEntry failShardOnly = new ShardStateAction.FailedShardEntry(shardRoutingTable.shardId(),
174172
randomFrom(oldInSync), primaryTerm, "dummy", null, false);
175-
ClusterState appliedState = executor.execute(clusterState, Arrays.asList(failShardOnly)).resultingState;
173+
ClusterState appliedState = executor.execute(clusterState, Collections.singletonList(failShardOnly)).resultingState;
176174
Set<String> newInSync = appliedState.metaData().index(INDEX).inSyncAllocationIds(0);
177175
assertThat(newInSync, equalTo(oldInSync));
178176
}
179177
{
180178
final String failedAllocationId = randomFrom(oldInSync);
181179
ShardStateAction.FailedShardEntry failAndMarkAsStale = new ShardStateAction.FailedShardEntry(shardRoutingTable.shardId(),
182180
failedAllocationId, primaryTerm, "dummy", null, true);
183-
ClusterState appliedState = executor.execute(clusterState, Arrays.asList(failAndMarkAsStale)).resultingState;
181+
ClusterState appliedState = executor.execute(clusterState, Collections.singletonList(failAndMarkAsStale)).resultingState;
184182
Set<String> newInSync = appliedState.metaData().index(INDEX).inSyncAllocationIds(0);
185183
assertThat(Sets.difference(oldInSync, newInSync), contains(failedAllocationId));
186184
}
@@ -192,11 +190,9 @@ private ClusterState createClusterStateWithStartedShards(String reason) {
192190
IntStream.rangeClosed(1, numberOfNodes).mapToObj(node -> newNode("node" + node)).forEach(nodes::add);
193191
ClusterState stateAfterAddingNode =
194192
ClusterState.builder(clusterState).nodes(nodes).build();
195-
RoutingTable afterReroute =
196-
allocationService.reroute(stateAfterAddingNode, reason).routingTable();
193+
RoutingTable afterReroute = allocationService.reroute(stateAfterAddingNode, reason).routingTable();
197194
ClusterState stateAfterReroute = ClusterState.builder(stateAfterAddingNode).routingTable(afterReroute).build();
198-
RoutingNodes routingNodes = stateAfterReroute.getRoutingNodes();
199-
return allocationService.applyStartedShards(stateAfterReroute, routingNodes.shardsWithState(ShardRoutingState.INITIALIZING));
195+
return ESAllocationTestCase.startInitializingShardsAndReroute(allocationService, stateAfterReroute);
200196
}
201197

202198
private List<ShardStateAction.FailedShardEntry> createExistingShards(ClusterState currentState, String reason) {

0 commit comments

Comments
 (0)