Skip to content

Commit 233df6b

Browse files
Make Transport Shard Bulk Action Async (#39793) (#41112)
This is a dependency of #39504 Motivation: By refactoring `TransportShardBulkAction#shardOperationOnPrimary` to async, we enable using `DeterministicTaskQueue` based tests to run indexing operations. This was previously impossible since we were blocking on the `write` thread until the `update` thread finished the mapping update. With this change, the mapping update will trigger a new task in the `write` queue instead. This change significantly enhances the amount of coverage we get from `SnapshotResiliencyTests` (and other potential future tests) when it comes to tracking down concurrency issues with distributed state machines. The logical change is effectively all in `TransportShardBulkAction`, the rest of the changes is then simply mechanically moving the caller code and tests to being async and passing the `ActionListener` down. Since the move to async would've added more parameters to the `private static` steps in this logic, I decided to inline and dry up (between delete and update) the logic as much as I could instead of passing the listener + wait-consumer down through all of them.
1 parent 93d6766 commit 233df6b

File tree

30 files changed

+692
-518
lines changed

30 files changed

+692
-518
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,16 @@ protected void acquireReplicaOperationPermit(final IndexShard replica,
8585
}
8686

8787
@Override
88-
protected PrimaryResult<ShardRequest, ReplicationResponse> shardOperationOnPrimary(final ShardRequest shardRequest,
89-
final IndexShard primary) throws Exception {
90-
executeShardOperation(shardRequest, primary);
91-
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
88+
protected void shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary,
89+
ActionListener<PrimaryResult<ShardRequest, ReplicationResponse>> listener) {
90+
ActionListener.completeWith(listener, () -> {
91+
executeShardOperation(shardRequest, primary);
92+
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
93+
});
9294
}
9395

9496
@Override
95-
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception {
97+
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) {
9698
executeShardOperation(shardRequest, replica);
9799
return new ReplicaResult();
98100
}

server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.admin.indices.flush;
2121

22+
import org.elasticsearch.action.ActionListener;
2223
import org.elasticsearch.action.support.ActionFilters;
2324
import org.elasticsearch.action.support.replication.ReplicationResponse;
2425
import org.elasticsearch.action.support.replication.TransportReplicationAction;
@@ -51,11 +52,13 @@ protected ReplicationResponse newResponseInstance() {
5152
}
5253

5354
@Override
54-
protected PrimaryResult<ShardFlushRequest, ReplicationResponse> shardOperationOnPrimary(ShardFlushRequest shardRequest,
55-
IndexShard primary) {
56-
primary.flush(shardRequest.getRequest());
57-
logger.trace("{} flush request executed on primary", primary.shardId());
58-
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
55+
protected void shardOperationOnPrimary(ShardFlushRequest shardRequest, IndexShard primary,
56+
ActionListener<PrimaryResult<ShardFlushRequest, ReplicationResponse>> listener) {
57+
ActionListener.completeWith(listener, () -> {
58+
primary.flush(shardRequest.getRequest());
59+
logger.trace("{} flush request executed on primary", primary.shardId());
60+
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
61+
});
5962
}
6063

6164
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.admin.indices.refresh;
2121

22+
import org.elasticsearch.action.ActionListener;
2223
import org.elasticsearch.action.support.ActionFilters;
2324
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
2425
import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -53,11 +54,13 @@ protected ReplicationResponse newResponseInstance() {
5354
}
5455

5556
@Override
56-
protected PrimaryResult<BasicReplicationRequest, ReplicationResponse> shardOperationOnPrimary(
57-
BasicReplicationRequest shardRequest, IndexShard primary) {
58-
primary.refresh("api");
59-
logger.trace("{} refresh request executed on primary", primary.shardId());
60-
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
57+
protected void shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary,
58+
ActionListener<PrimaryResult<BasicReplicationRequest, ReplicationResponse>> listener) {
59+
ActionListener.completeWith(listener, () -> {
60+
primary.refresh("api");
61+
logger.trace("{} refresh request executed on primary", primary.shardId());
62+
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
63+
});
6164
}
6265

6366
@Override

server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.elasticsearch.action.ActionListener;
2223
import org.elasticsearch.index.mapper.Mapping;
2324
import org.elasticsearch.index.shard.ShardId;
2425

@@ -27,6 +28,6 @@ public interface MappingUpdatePerformer {
2728
/**
2829
* Update the mappings on the master.
2930
*/
30-
void updateMappings(Mapping update, ShardId shardId, String type);
31+
void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener);
3132

3233
}

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 151 additions & 150 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,13 @@ public ClusterBlockLevel indexBlockLevel() {
8585
}
8686

8787
@Override
88-
protected WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> shardOperationOnPrimary(
89-
ResyncReplicationRequest request, IndexShard primary) throws Exception {
90-
final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary);
91-
return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger);
88+
protected void shardOperationOnPrimary(ResyncReplicationRequest request, IndexShard primary,
89+
ActionListener<PrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse>> listener) {
90+
ActionListener.completeWith(listener,
91+
() -> new WritePrimaryResult<>(performOnPrimary(request), new ResyncReplicationResponse(), null, null, primary, logger));
9292
}
9393

94-
public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request, IndexShard primary) {
94+
public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) {
9595
return request;
9696
}
9797

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,17 @@ public void execute() throws Exception {
101101

102102
totalShards.incrementAndGet();
103103
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
104-
primaryResult = primary.perform(request);
105-
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
104+
primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure));
105+
}
106+
107+
private void handlePrimaryResult(final PrimaryResultT primaryResult) {
108+
this.primaryResult = primaryResult;
109+
primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint());
106110
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
107111
if (replicaRequest != null) {
108112
if (logger.isTraceEnabled()) {
109-
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
113+
logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
110114
}
111-
112115
// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
113116
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
114117
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
@@ -118,14 +121,14 @@ public void execute() throws Exception {
118121
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
119122
final long globalCheckpoint = primary.globalCheckpoint();
120123
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
121-
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on.
124+
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
125+
// on.
122126
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
123127
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
124128
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
125129
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
126130
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
127131
}
128-
129132
successfulShards.incrementAndGet(); // mark primary as successful
130133
decPendingAndFinishIfNeeded();
131134
}
@@ -310,9 +313,9 @@ public interface Primary<
310313
* also complete after. Deal with it.
311314
*
312315
* @param request the request to perform
313-
* @return the request to send to the replicas
316+
* @param listener result listener
314317
*/
315-
PrimaryResultT perform(RequestT request) throws Exception;
318+
void perform(RequestT request, ActionListener<PrimaryResultT> listener);
316319

317320
/**
318321
* Notifies the primary of a local checkpoint for the given allocation.

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 49 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.apache.lucene.store.AlreadyClosedException;
24+
import org.elasticsearch.Assertions;
2425
import org.elasticsearch.ElasticsearchException;
2526
import org.elasticsearch.ExceptionsHelper;
2627
import org.elasticsearch.Version;
@@ -190,8 +191,8 @@ protected void resolveRequest(final IndexMetaData indexMetaData, final Request r
190191
* @param shardRequest the request to the primary shard
191192
* @param primary the primary shard to perform the operation on
192193
*/
193-
protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrimary(
194-
Request shardRequest, IndexShard primary) throws Exception;
194+
protected abstract void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
195+
ActionListener<PrimaryResult<ReplicaRequest, Response>> listener);
195196

196197
/**
197198
* Synchronously execute the specified replica operation. This is done under a permit from
@@ -357,58 +358,54 @@ public void handleException(TransportException exp) {
357358
});
358359
} else {
359360
setPhase(replicationTask, "primary");
360-
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
361361
createReplicatedOperation(primaryRequest.getRequest(),
362-
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
363-
primaryShardReference)
364-
.execute();
362+
ActionListener.wrap(result -> result.respond(
363+
new ActionListener<Response>() {
364+
@Override
365+
public void onResponse(Response response) {
366+
if (syncGlobalCheckpointAfterOperation) {
367+
final IndexShard shard = primaryShardReference.indexShard;
368+
try {
369+
shard.maybeSyncGlobalCheckpoint("post-operation");
370+
} catch (final Exception e) {
371+
// only log non-closed exceptions
372+
if (ExceptionsHelper.unwrap(
373+
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
374+
// intentionally swallow, a missed global checkpoint sync should not fail this operation
375+
logger.info(
376+
new ParameterizedMessage(
377+
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
378+
}
379+
}
380+
}
381+
primaryShardReference.close(); // release shard operation lock before responding to caller
382+
setPhase(replicationTask, "finished");
383+
onCompletionListener.onResponse(response);
384+
}
385+
386+
@Override
387+
public void onFailure(Exception e) {
388+
handleException(primaryShardReference, e);
389+
}
390+
}), e -> handleException(primaryShardReference, e)
391+
), primaryShardReference).execute();
365392
}
366393
} catch (Exception e) {
367-
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
368-
onFailure(e);
394+
handleException(primaryShardReference, e);
369395
}
370396
}
371397

398+
private void handleException(PrimaryShardReference primaryShardReference, Exception e) {
399+
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
400+
onFailure(e);
401+
}
402+
372403
@Override
373404
public void onFailure(Exception e) {
374405
setPhase(replicationTask, "finished");
375406
onCompletionListener.onFailure(e);
376407
}
377408

378-
private ActionListener<Response> createResponseListener(final PrimaryShardReference primaryShardReference) {
379-
return new ActionListener<Response>() {
380-
@Override
381-
public void onResponse(Response response) {
382-
if (syncGlobalCheckpointAfterOperation) {
383-
final IndexShard shard = primaryShardReference.indexShard;
384-
try {
385-
shard.maybeSyncGlobalCheckpoint("post-operation");
386-
} catch (final Exception e) {
387-
// only log non-closed exceptions
388-
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
389-
logger.info(
390-
new ParameterizedMessage(
391-
"{} failed to execute post-operation global checkpoint sync",
392-
shard.shardId()),
393-
e);
394-
// intentionally swallow, a missed global checkpoint sync should not fail this operation
395-
}
396-
}
397-
}
398-
primaryShardReference.close(); // release shard operation lock before responding to caller
399-
setPhase(replicationTask, "finished");
400-
onCompletionListener.onResponse(response);
401-
}
402-
403-
@Override
404-
public void onFailure(Exception e) {
405-
primaryShardReference.close(); // release shard operation lock before responding to caller
406-
setPhase(replicationTask, "finished");
407-
onCompletionListener.onFailure(e);
408-
}
409-
};
410-
}
411-
412409
protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
413410
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
414411
PrimaryShardReference primaryShardReference) {
@@ -417,7 +414,7 @@ protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaReq
417414
}
418415
}
419416

420-
protected static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
417+
public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
421418
Response extends ReplicationResponse>
422419
implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
423420
final ReplicaRequest replicaRequest;
@@ -916,11 +913,15 @@ public void failShard(String reason, Exception e) {
916913
}
917914

918915
@Override
919-
public PrimaryResult<ReplicaRequest, Response> perform(Request request) throws Exception {
920-
PrimaryResult<ReplicaRequest, Response> result = shardOperationOnPrimary(request, indexShard);
921-
assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
922-
+ "] with a primary failure [" + result.finalFailure + "]";
923-
return result;
916+
public void perform(Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener) {
917+
if (Assertions.ENABLED) {
918+
listener = ActionListener.map(listener, result -> {
919+
assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
920+
+ "] with a primary failure [" + result.finalFailure + "]";
921+
return result;
922+
});
923+
}
924+
shardOperationOnPrimary(request, indexShard, listener);
924925
}
925926

926927
@Override

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,12 @@ protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
103103
/**
104104
* Called on the primary with a reference to the primary {@linkplain IndexShard} to modify.
105105
*
106-
* @return the result of the operation on primary, including current translog location and operation response and failure
107-
* async refresh is performed on the <code>primary</code> shard according to the <code>Request</code> refresh policy
106+
* @param listener listener for the result of the operation on primary, including current translog location and operation response
107+
* and failure async refresh is performed on the <code>primary</code> shard according to the <code>Request</code> refresh policy
108108
*/
109109
@Override
110-
protected abstract WritePrimaryResult<ReplicaRequest, Response> shardOperationOnPrimary(
111-
Request request, IndexShard primary) throws Exception;
110+
protected abstract void shardOperationOnPrimary(
111+
Request request, IndexShard primary, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener);
112112

113113
/**
114114
* Called once per replica with a reference to the replica {@linkplain IndexShard} to modify.

0 commit comments

Comments
 (0)