Skip to content

Commit f7ff0af

Browse files
committed
Execute actions under permit in primary mode only (#42241)
Today when executing an action on a primary shard under permit, we do not enforce that the shard is in primary mode before executing the action. This commit addresses this by wrapping actions to be executed under permit in a check that the shard is in primary mode before executing the action.
1 parent 685a206 commit f7ff0af

File tree

8 files changed

+235
-47
lines changed

8 files changed

+235
-47
lines changed

server/src/main/java/org/elasticsearch/ElasticsearchException.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1022,7 +1022,12 @@ private enum ElasticsearchExceptionHandle {
10221022
org.elasticsearch.index.seqno.RetentionLeaseNotFoundException.class,
10231023
org.elasticsearch.index.seqno.RetentionLeaseNotFoundException::new,
10241024
154,
1025-
Version.V_6_7_0);
1025+
Version.V_6_7_0),
1026+
SHARD_NOT_IN_PRIMARY_MODE_EXCEPTION(
1027+
org.elasticsearch.index.shard.ShardNotInPrimaryModeException.class,
1028+
org.elasticsearch.index.shard.ShardNotInPrimaryModeException::new,
1029+
155,
1030+
Version.V_6_8_1);
10261031

10271032
final Class<? extends ElasticsearchException> exceptionClass;
10281033
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.elasticsearch.index.shard.ReplicationGroup;
6565
import org.elasticsearch.index.shard.ShardId;
6666
import org.elasticsearch.index.shard.ShardNotFoundException;
67+
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
6768
import org.elasticsearch.indices.IndexClosedException;
6869
import org.elasticsearch.indices.IndicesService;
6970
import org.elasticsearch.node.NodeClosedException;
@@ -308,10 +309,18 @@ protected void doRun() throws Exception {
308309
primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);
309310
}
310311

311-
acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap(
312-
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
313-
this::onFailure
314-
));
312+
acquirePrimaryOperationPermit(
313+
indexShard,
314+
primaryRequest.getRequest(),
315+
ActionListener.wrap(
316+
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
317+
e -> {
318+
if (e instanceof ShardNotInPrimaryModeException) {
319+
onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));
320+
} else {
321+
onFailure(e);
322+
}
323+
}));
315324
}
316325

317326
void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
2929
import org.elasticsearch.cluster.ClusterState;
3030
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
31-
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
32-
import org.elasticsearch.cluster.routing.PlainShardIterator;
3331
import org.elasticsearch.cluster.routing.ShardsIterator;
3432
import org.elasticsearch.cluster.service.ClusterService;
3533
import org.elasticsearch.common.inject.Inject;
@@ -45,7 +43,6 @@
4543
import org.elasticsearch.transport.TransportService;
4644

4745
import java.io.IOException;
48-
import java.util.Collections;
4946
import java.util.Objects;
5047
import java.util.function.Supplier;
5148

@@ -88,14 +85,10 @@ abstract static class TransportRetentionLeaseAction<T extends Request<T>> extend
8885

8986
@Override
9087
protected ShardsIterator shards(final ClusterState state, final InternalRequest request) {
91-
final IndexShardRoutingTable shardRoutingTable = state
88+
return state
9289
.routingTable()
93-
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id());
94-
if (shardRoutingTable.primaryShard().active()) {
95-
return shardRoutingTable.primaryShardIt();
96-
} else {
97-
return new PlainShardIterator(request.request().getShardId(), Collections.emptyList());
98-
}
90+
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id())
91+
.primaryShardIt();
9992
}
10093

10194
@Override
@@ -174,6 +167,7 @@ void doRetentionLeaseAction(final IndexShard indexShard, final AddRequest reques
174167
protected Writeable.Reader<Response> getResponseReader() {
175168
return Response::new;
176169
}
170+
177171
}
178172

179173
@Override
@@ -400,9 +394,10 @@ public static class Response extends ActionResponse {
400394
public Response() {
401395
}
402396

403-
Response(StreamInput in) throws IOException {
397+
Response(final StreamInput in) throws IOException {
404398
super(in);
405399
}
400+
406401
}
407402

408403
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2496,7 +2496,7 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
24962496
verifyNotClosed();
24972497
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
24982498

2499-
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
2499+
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, false, debugInfo);
25002500
}
25012501

25022502
/**
@@ -2507,7 +2507,27 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
25072507
verifyNotClosed();
25082508
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;
25092509

2510-
asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
2510+
asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
2511+
}
2512+
2513+
/**
2514+
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
2515+
* executing the action.
2516+
*
2517+
* @param listener the listener to wrap
2518+
* @return the wrapped listener
2519+
*/
2520+
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
2521+
return ActionListener.delegateFailure(
2522+
listener,
2523+
(l, r) -> {
2524+
if (replicationTracker.isPrimaryMode()) {
2525+
l.onResponse(r);
2526+
} else {
2527+
r.close();
2528+
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
2529+
}
2530+
});
25112531
}
25122532

25132533
private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.shard;
21+
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
24+
import java.io.IOException;
25+
26+
public class ShardNotInPrimaryModeException extends IllegalIndexShardStateException {
27+
28+
public ShardNotInPrimaryModeException(final ShardId shardId, final IndexShardState currentState) {
29+
super(shardId, currentState, "shard is not in primary mode");
30+
}
31+
32+
public ShardNotInPrimaryModeException(final StreamInput in) throws IOException {
33+
super(in);
34+
}
35+
36+
}

server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
6767
import org.elasticsearch.index.shard.IndexShardState;
6868
import org.elasticsearch.index.shard.ShardId;
69+
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
6970
import org.elasticsearch.indices.IndexTemplateMissingException;
7071
import org.elasticsearch.indices.InvalidIndexTemplateException;
7172
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
@@ -816,6 +817,7 @@ public void testIds() {
816817
ids.put(152, NoSuchRemoteClusterException.class);
817818
ids.put(153, RetentionLeaseAlreadyExistsException.class);
818819
ids.put(154, RetentionLeaseNotFoundException.class);
820+
ids.put(155, ShardNotInPrimaryModeException.class);
819821

820822
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
821823
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,11 @@
6464
import org.elasticsearch.index.IndexService;
6565
import org.elasticsearch.index.shard.IndexShard;
6666
import org.elasticsearch.index.shard.IndexShardClosedException;
67+
import org.elasticsearch.index.shard.IndexShardState;
6768
import org.elasticsearch.index.shard.ReplicationGroup;
6869
import org.elasticsearch.index.shard.ShardId;
6970
import org.elasticsearch.index.shard.ShardNotFoundException;
71+
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
7072
import org.elasticsearch.indices.IndexClosedException;
7173
import org.elasticsearch.indices.IndicesService;
7274
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@@ -390,6 +392,43 @@ public void testNotStartedPrimary() {
390392
assertIndexShardCounter(0);
391393
}
392394

395+
public void testShardNotInPrimaryMode() {
396+
final String index = "test";
397+
final ShardId shardId = new ShardId(index, "_na_", 0);
398+
final ClusterState state = state(index, true, ShardRoutingState.RELOCATING);
399+
setState(clusterService, state);
400+
final ReplicationTask task = maybeTask();
401+
final Request request = new Request(shardId);
402+
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
403+
final AtomicBoolean executed = new AtomicBoolean();
404+
405+
final ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
406+
final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id());
407+
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
408+
= new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm);
409+
410+
isPrimaryMode.set(false);
411+
412+
new TestAction(Settings.EMPTY, "internal:test-action", transportService, clusterService, shardStateAction, threadPool) {
413+
@Override
414+
protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
415+
ActionListener<PrimaryResult<Request, TestResponse>> listener) {
416+
assertPhase(task, "primary");
417+
assertFalse(executed.getAndSet(true));
418+
super.shardOperationOnPrimary(shardRequest, primary, listener);
419+
}
420+
}.new AsyncPrimaryAction(primaryRequest, listener, task).run();
421+
422+
assertFalse(executed.get());
423+
assertIndexShardCounter(0); // no permit should be held
424+
425+
final ExecutionException e = expectThrows(ExecutionException.class, listener::get);
426+
assertThat(e.getCause(), instanceOf(ReplicationOperation.RetryOnPrimaryException.class));
427+
assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode")));
428+
assertThat(e.getCause().getCause(), instanceOf(ShardNotInPrimaryModeException.class));
429+
assertThat(e.getCause().getCause(), hasToString(containsString("shard is not in primary mode")));
430+
}
431+
393432
/**
394433
* When relocating a primary shard, there is a cluster state update at the end of relocation where the active primary is switched from
395434
* the relocation source to the relocation target. If relocation source receives and processes this cluster state
@@ -1126,6 +1165,8 @@ private void assertIndexShardCounter(int expected) {
11261165

11271166
private final AtomicBoolean isRelocated = new AtomicBoolean(false);
11281167

1168+
private final AtomicBoolean isPrimaryMode = new AtomicBoolean(true);
1169+
11291170
/**
11301171
* Sometimes build a ReplicationTask for tracking the phase of the
11311172
* TransportReplicationAction. Since TransportReplicationAction has to work
@@ -1271,10 +1312,16 @@ private IndexService mockIndexService(final IndexMetaData indexMetaData, Cluster
12711312
private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) {
12721313
final IndexShard indexShard = mock(IndexShard.class);
12731314
when(indexShard.shardId()).thenReturn(shardId);
1315+
when(indexShard.state()).thenReturn(IndexShardState.STARTED);
12741316
doAnswer(invocation -> {
12751317
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
1276-
count.incrementAndGet();
1277-
callback.onResponse(count::decrementAndGet);
1318+
if (isPrimaryMode.get()) {
1319+
count.incrementAndGet();
1320+
callback.onResponse(count::decrementAndGet);
1321+
1322+
} else {
1323+
callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED));
1324+
}
12781325
return null;
12791326
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
12801327
doAnswer(invocation -> {

0 commit comments

Comments
 (0)