Skip to content

Commit 4b0f36d

Browse files
authored
Execute actions under permit in primary mode only (#42241)
Today when executing an action on a primary shard under permit, we do not enforce that the shard is in primary mode before executing the action. This commit addresses this by wrapping actions to be executed under permit in a check that the shard is in primary mode before executing the action.
1 parent 1dcaf4f commit 4b0f36d

File tree

8 files changed

+235
-48
lines changed

8 files changed

+235
-48
lines changed

server/src/main/java/org/elasticsearch/ElasticsearchException.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1022,7 +1022,12 @@ private enum ElasticsearchExceptionHandle {
10221022
org.elasticsearch.index.seqno.RetentionLeaseNotFoundException.class,
10231023
org.elasticsearch.index.seqno.RetentionLeaseNotFoundException::new,
10241024
154,
1025-
Version.V_6_7_0);
1025+
Version.V_6_7_0),
1026+
SHARD_NOT_IN_PRIMARY_MODE_EXCEPTION(
1027+
org.elasticsearch.index.shard.ShardNotInPrimaryModeException.class,
1028+
org.elasticsearch.index.shard.ShardNotInPrimaryModeException::new,
1029+
155,
1030+
Version.V_6_8_1);
10261031

10271032
final Class<? extends ElasticsearchException> exceptionClass;
10281033
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.elasticsearch.index.shard.ReplicationGroup;
6464
import org.elasticsearch.index.shard.ShardId;
6565
import org.elasticsearch.index.shard.ShardNotFoundException;
66+
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
6667
import org.elasticsearch.indices.IndexClosedException;
6768
import org.elasticsearch.indices.IndicesService;
6869
import org.elasticsearch.node.NodeClosedException;
@@ -307,10 +308,18 @@ protected void doRun() throws Exception {
307308
primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);
308309
}
309310

310-
acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap(
311-
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
312-
this::onFailure
313-
));
311+
acquirePrimaryOperationPermit(
312+
indexShard,
313+
primaryRequest.getRequest(),
314+
ActionListener.wrap(
315+
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
316+
e -> {
317+
if (e instanceof ShardNotInPrimaryModeException) {
318+
onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));
319+
} else {
320+
onFailure(e);
321+
}
322+
}));
314323
}
315324

316325
void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
2929
import org.elasticsearch.cluster.ClusterState;
3030
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
31-
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
32-
import org.elasticsearch.cluster.routing.PlainShardIterator;
3331
import org.elasticsearch.cluster.routing.ShardsIterator;
3432
import org.elasticsearch.cluster.service.ClusterService;
3533
import org.elasticsearch.common.inject.Inject;
@@ -45,7 +43,6 @@
4543
import org.elasticsearch.transport.TransportService;
4644

4745
import java.io.IOException;
48-
import java.util.Collections;
4946
import java.util.Objects;
5047
import java.util.function.Supplier;
5148

@@ -88,14 +85,10 @@ abstract static class TransportRetentionLeaseAction<T extends Request<T>> extend
8885

8986
@Override
9087
protected ShardsIterator shards(final ClusterState state, final InternalRequest request) {
91-
final IndexShardRoutingTable shardRoutingTable = state
88+
return state
9289
.routingTable()
93-
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id());
94-
if (shardRoutingTable.primaryShard().active()) {
95-
return shardRoutingTable.primaryShardIt();
96-
} else {
97-
return new PlainShardIterator(request.request().getShardId(), Collections.emptyList());
98-
}
90+
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id())
91+
.primaryShardIt();
9992
}
10093

10194
@Override
@@ -174,6 +167,7 @@ void doRetentionLeaseAction(final IndexShard indexShard, final AddRequest reques
174167
protected Writeable.Reader<Response> getResponseReader() {
175168
return Response::new;
176169
}
170+
177171
}
178172

179173
@Override
@@ -400,9 +394,10 @@ public static class Response extends ActionResponse {
400394
public Response() {
401395
}
402396

403-
Response(StreamInput in) throws IOException {
397+
Response(final StreamInput in) throws IOException {
404398
super(in);
405399
}
400+
406401
}
407402

408403
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.index.shard;
2121

2222
import com.carrotsearch.hppc.ObjectLongMap;
23-
2423
import org.apache.logging.log4j.Logger;
2524
import org.apache.logging.log4j.message.ParameterizedMessage;
2625
import org.apache.lucene.index.CheckIndex;
@@ -2496,7 +2495,7 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
24962495
verifyNotClosed();
24972496
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
24982497

2499-
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
2498+
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, false, debugInfo);
25002499
}
25012500

25022501
/**
@@ -2507,7 +2506,27 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
25072506
verifyNotClosed();
25082507
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;
25092508

2510-
asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
2509+
asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
2510+
}
2511+
2512+
/**
2513+
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
2514+
* executing the action.
2515+
*
2516+
* @param listener the listener to wrap
2517+
* @return the wrapped listener
2518+
*/
2519+
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
2520+
return ActionListener.delegateFailure(
2521+
listener,
2522+
(l, r) -> {
2523+
if (replicationTracker.isPrimaryMode()) {
2524+
l.onResponse(r);
2525+
} else {
2526+
r.close();
2527+
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
2528+
}
2529+
});
25112530
}
25122531

25132532
private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.shard;
21+
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
24+
import java.io.IOException;
25+
26+
public class ShardNotInPrimaryModeException extends IllegalIndexShardStateException {
27+
28+
public ShardNotInPrimaryModeException(final ShardId shardId, final IndexShardState currentState) {
29+
super(shardId, currentState, "shard is not in primary mode");
30+
}
31+
32+
public ShardNotInPrimaryModeException(final StreamInput in) throws IOException {
33+
super(in);
34+
}
35+
36+
}

server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
6767
import org.elasticsearch.index.shard.IndexShardState;
6868
import org.elasticsearch.index.shard.ShardId;
69+
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
6970
import org.elasticsearch.indices.IndexTemplateMissingException;
7071
import org.elasticsearch.indices.InvalidIndexTemplateException;
7172
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
@@ -816,6 +817,7 @@ public void testIds() {
816817
ids.put(152, NoSuchRemoteClusterException.class);
817818
ids.put(153, RetentionLeaseAlreadyExistsException.class);
818819
ids.put(154, RetentionLeaseNotFoundException.class);
820+
ids.put(155, ShardNotInPrimaryModeException.class);
819821

820822
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
821823
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,11 @@
6464
import org.elasticsearch.index.IndexService;
6565
import org.elasticsearch.index.shard.IndexShard;
6666
import org.elasticsearch.index.shard.IndexShardClosedException;
67+
import org.elasticsearch.index.shard.IndexShardState;
6768
import org.elasticsearch.index.shard.ReplicationGroup;
6869
import org.elasticsearch.index.shard.ShardId;
6970
import org.elasticsearch.index.shard.ShardNotFoundException;
71+
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
7072
import org.elasticsearch.indices.IndexClosedException;
7173
import org.elasticsearch.indices.IndicesService;
7274
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@@ -390,6 +392,43 @@ public void testNotStartedPrimary() {
390392
assertIndexShardCounter(0);
391393
}
392394

395+
public void testShardNotInPrimaryMode() {
396+
final String index = "test";
397+
final ShardId shardId = new ShardId(index, "_na_", 0);
398+
final ClusterState state = state(index, true, ShardRoutingState.RELOCATING);
399+
setState(clusterService, state);
400+
final ReplicationTask task = maybeTask();
401+
final Request request = new Request(shardId);
402+
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
403+
final AtomicBoolean executed = new AtomicBoolean();
404+
405+
final ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
406+
final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id());
407+
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
408+
= new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm);
409+
410+
isPrimaryMode.set(false);
411+
412+
new TestAction(Settings.EMPTY, "internal:test-action", transportService, clusterService, shardStateAction, threadPool) {
413+
@Override
414+
protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
415+
ActionListener<PrimaryResult<Request, TestResponse>> listener) {
416+
assertPhase(task, "primary");
417+
assertFalse(executed.getAndSet(true));
418+
super.shardOperationOnPrimary(shardRequest, primary, listener);
419+
}
420+
}.new AsyncPrimaryAction(primaryRequest, listener, task).run();
421+
422+
assertFalse(executed.get());
423+
assertIndexShardCounter(0); // no permit should be held
424+
425+
final ExecutionException e = expectThrows(ExecutionException.class, listener::get);
426+
assertThat(e.getCause(), instanceOf(ReplicationOperation.RetryOnPrimaryException.class));
427+
assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode")));
428+
assertThat(e.getCause().getCause(), instanceOf(ShardNotInPrimaryModeException.class));
429+
assertThat(e.getCause().getCause(), hasToString(containsString("shard is not in primary mode")));
430+
}
431+
393432
/**
394433
* When relocating a primary shard, there is a cluster state update at the end of relocation where the active primary is switched from
395434
* the relocation source to the relocation target. If relocation source receives and processes this cluster state
@@ -1126,6 +1165,8 @@ private void assertIndexShardCounter(int expected) {
11261165

11271166
private final AtomicBoolean isRelocated = new AtomicBoolean(false);
11281167

1168+
private final AtomicBoolean isPrimaryMode = new AtomicBoolean(true);
1169+
11291170
/**
11301171
* Sometimes build a ReplicationTask for tracking the phase of the
11311172
* TransportReplicationAction. Since TransportReplicationAction has to work
@@ -1271,10 +1312,16 @@ private IndexService mockIndexService(final IndexMetaData indexMetaData, Cluster
12711312
private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) {
12721313
final IndexShard indexShard = mock(IndexShard.class);
12731314
when(indexShard.shardId()).thenReturn(shardId);
1315+
when(indexShard.state()).thenReturn(IndexShardState.STARTED);
12741316
doAnswer(invocation -> {
12751317
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
1276-
count.incrementAndGet();
1277-
callback.onResponse(count::decrementAndGet);
1318+
if (isPrimaryMode.get()) {
1319+
count.incrementAndGet();
1320+
callback.onResponse(count::decrementAndGet);
1321+
1322+
} else {
1323+
callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED));
1324+
}
12781325
return null;
12791326
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
12801327
doAnswer(invocation -> {

0 commit comments

Comments
 (0)