Skip to content

Commit 5d26243

Browse files
Make Transport Shard Bulk Action Async (#39793)
This is a dependency of #39504 Motivation: By refactoring `TransportShardBulkAction#shardOperationOnPrimary` to async, we enable using `DeterministicTaskQueue` based tests to run indexing operations. This was previously impossible since we were blocking on the `write` thread until the `update` thread finished the mapping update. With this change, the mapping update will trigger a new task in the `write` queue instead. This change significantly enhances the amount of coverage we get from `SnapshotResiliencyTests` (and other potential future tests) when it comes to tracking down concurrency issues with distributed state machines. The logical change is effectively all in `TransportShardBulkAction`, the rest of the changes is then simply mechanically moving the caller code and tests to being async and passing the `ActionListener` down. Since the move to async would've added more parameters to the `private static` steps in this logic, I decided to inline and dry up (between delete and update) the logic as much as I could instead of passing the listener + wait-consumer down through all of them.
1 parent f92ebb2 commit 5d26243

29 files changed

+663
-465
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,16 @@ protected void acquireReplicaOperationPermit(final IndexShard replica,
8585
}
8686

8787
@Override
88-
protected PrimaryResult<ShardRequest, ReplicationResponse> shardOperationOnPrimary(final ShardRequest shardRequest,
89-
final IndexShard primary) throws Exception {
90-
executeShardOperation(shardRequest, primary);
91-
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
88+
protected void shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary,
89+
ActionListener<PrimaryResult<ShardRequest, ReplicationResponse>> listener) {
90+
ActionListener.completeWith(listener, () -> {
91+
executeShardOperation(shardRequest, primary);
92+
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
93+
});
9294
}
9395

9496
@Override
95-
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception {
97+
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) {
9698
executeShardOperation(shardRequest, replica);
9799
return new ReplicaResult();
98100
}

server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.admin.indices.flush;
2121

22+
import org.elasticsearch.action.ActionListener;
2223
import org.elasticsearch.action.support.ActionFilters;
2324
import org.elasticsearch.action.support.replication.ReplicationResponse;
2425
import org.elasticsearch.action.support.replication.TransportReplicationAction;
@@ -51,11 +52,13 @@ protected ReplicationResponse newResponseInstance() {
5152
}
5253

5354
@Override
54-
protected PrimaryResult<ShardFlushRequest, ReplicationResponse> shardOperationOnPrimary(ShardFlushRequest shardRequest,
55-
IndexShard primary) {
56-
primary.flush(shardRequest.getRequest());
57-
logger.trace("{} flush request executed on primary", primary.shardId());
58-
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
55+
protected void shardOperationOnPrimary(ShardFlushRequest shardRequest, IndexShard primary,
56+
ActionListener<PrimaryResult<ShardFlushRequest, ReplicationResponse>> listener) {
57+
ActionListener.completeWith(listener, () -> {
58+
primary.flush(shardRequest.getRequest());
59+
logger.trace("{} flush request executed on primary", primary.shardId());
60+
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
61+
});
5962
}
6063

6164
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.admin.indices.refresh;
2121

22+
import org.elasticsearch.action.ActionListener;
2223
import org.elasticsearch.action.support.ActionFilters;
2324
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
2425
import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -53,11 +54,13 @@ protected ReplicationResponse newResponseInstance() {
5354
}
5455

5556
@Override
56-
protected PrimaryResult<BasicReplicationRequest, ReplicationResponse> shardOperationOnPrimary(
57-
BasicReplicationRequest shardRequest, IndexShard primary) {
58-
primary.refresh("api");
59-
logger.trace("{} refresh request executed on primary", primary.shardId());
60-
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
57+
protected void shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary,
58+
ActionListener<PrimaryResult<BasicReplicationRequest, ReplicationResponse>> listener) {
59+
ActionListener.completeWith(listener, () -> {
60+
primary.refresh("api");
61+
logger.trace("{} refresh request executed on primary", primary.shardId());
62+
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
63+
});
6164
}
6265

6366
@Override

server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.elasticsearch.action.ActionListener;
2223
import org.elasticsearch.index.mapper.Mapping;
2324
import org.elasticsearch.index.shard.ShardId;
2425

@@ -27,6 +28,6 @@ public interface MappingUpdatePerformer {
2728
/**
2829
* Update the mappings on the master.
2930
*/
30-
void updateMappings(Mapping update, ShardId shardId, String type);
31+
void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener);
3132

3233
}

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 161 additions & 150 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ public ClusterBlockLevel indexBlockLevel() {
8585
}
8686

8787
@Override
88-
protected WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> shardOperationOnPrimary(
89-
ResyncReplicationRequest request, IndexShard primary) {
90-
final ResyncReplicationRequest replicaRequest = performOnPrimary(request);
91-
return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger);
88+
protected void shardOperationOnPrimary(ResyncReplicationRequest request, IndexShard primary,
89+
ActionListener<PrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse>> listener) {
90+
ActionListener.completeWith(listener,
91+
() -> new WritePrimaryResult<>(performOnPrimary(request), new ResyncReplicationResponse(), null, null, primary, logger));
9292
}
9393

9494
public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) {

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,17 @@ public void execute() throws Exception {
101101

102102
totalShards.incrementAndGet();
103103
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
104-
primaryResult = primary.perform(request);
105-
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
104+
primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure));
105+
}
106+
107+
private void handlePrimaryResult(final PrimaryResultT primaryResult) {
108+
this.primaryResult = primaryResult;
109+
primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint());
106110
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
107111
if (replicaRequest != null) {
108112
if (logger.isTraceEnabled()) {
109-
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
113+
logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
110114
}
111-
112115
// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
113116
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
114117
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
@@ -118,14 +121,14 @@ public void execute() throws Exception {
118121
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
119122
final long globalCheckpoint = primary.globalCheckpoint();
120123
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
121-
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on.
124+
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
125+
// on.
122126
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
123127
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
124128
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
125129
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
126130
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
127131
}
128-
129132
successfulShards.incrementAndGet(); // mark primary as successful
130133
decPendingAndFinishIfNeeded();
131134
}
@@ -310,9 +313,9 @@ public interface Primary<
310313
* also complete after. Deal with it.
311314
*
312315
* @param request the request to perform
313-
* @return the request to send to the replicas
316+
* @param listener result listener
314317
*/
315-
PrimaryResultT perform(RequestT request) throws Exception;
318+
void perform(RequestT request, ActionListener<PrimaryResultT> listener);
316319

317320
/**
318321
* Notifies the primary of a local checkpoint for the given allocation.

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.apache.lucene.store.AlreadyClosedException;
24+
import org.elasticsearch.Assertions;
2425
import org.elasticsearch.ElasticsearchException;
2526
import org.elasticsearch.ExceptionsHelper;
2627
import org.elasticsearch.action.ActionListener;
@@ -189,8 +190,8 @@ protected void resolveRequest(final IndexMetaData indexMetaData, final Request r
189190
* @param shardRequest the request to the primary shard
190191
* @param primary the primary shard to perform the operation on
191192
*/
192-
protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrimary(
193-
Request shardRequest, IndexShard primary) throws Exception;
193+
protected abstract void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
194+
ActionListener<PrimaryResult<ReplicaRequest, Response>> listener);
194195

195196
/**
196197
* Synchronously execute the specified replica operation. This is done under a permit from
@@ -416,7 +417,7 @@ protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaReq
416417
}
417418
}
418419

419-
protected static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
420+
public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
420421
Response extends ReplicationResponse>
421422
implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
422423
final ReplicaRequest replicaRequest;
@@ -915,11 +916,15 @@ public void failShard(String reason, Exception e) {
915916
}
916917

917918
@Override
918-
public PrimaryResult<ReplicaRequest, Response> perform(Request request) throws Exception {
919-
PrimaryResult<ReplicaRequest, Response> result = shardOperationOnPrimary(request, indexShard);
920-
assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
921-
+ "] with a primary failure [" + result.finalFailure + "]";
922-
return result;
919+
public void perform(Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener) {
920+
if (Assertions.ENABLED) {
921+
listener = ActionListener.map(listener, result -> {
922+
assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
923+
+ "] with a primary failure [" + result.finalFailure + "]";
924+
return result;
925+
});
926+
}
927+
shardOperationOnPrimary(request, indexShard, listener);
923928
}
924929

925930
@Override

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,12 @@ protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
103103
/**
104104
* Called on the primary with a reference to the primary {@linkplain IndexShard} to modify.
105105
*
106-
* @return the result of the operation on primary, including current translog location and operation response and failure
107-
* async refresh is performed on the <code>primary</code> shard according to the <code>Request</code> refresh policy
106+
* @param listener listener for the result of the operation on primary, including current translog location and operation response
107+
* and failure async refresh is performed on the <code>primary</code> shard according to the <code>Request</code> refresh policy
108108
*/
109109
@Override
110-
protected abstract WritePrimaryResult<ReplicaRequest, Response> shardOperationOnPrimary(
111-
Request request, IndexShard primary) throws Exception;
110+
protected abstract void shardOperationOnPrimary(
111+
Request request, IndexShard primary, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener);
112112

113113
/**
114114
* Called once per replica with a reference to the replica {@linkplain IndexShard} to modify.

server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.elasticsearch.cluster.action.index;
2121

22-
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2325
import org.elasticsearch.action.support.master.MasterNodeRequest;
2426
import org.elasticsearch.client.Client;
2527
import org.elasticsearch.client.IndicesAdminClient;
@@ -29,6 +31,7 @@
2931
import org.elasticsearch.common.settings.Setting.Property;
3032
import org.elasticsearch.common.settings.Settings;
3133
import org.elasticsearch.common.unit.TimeValue;
34+
import org.elasticsearch.common.util.concurrent.FutureUtils;
3235
import org.elasticsearch.common.xcontent.XContentType;
3336
import org.elasticsearch.index.Index;
3437
import org.elasticsearch.index.mapper.MapperService;
@@ -57,34 +60,36 @@ private void setDynamicMappingUpdateTimeout(TimeValue dynamicMappingUpdateTimeou
5760
this.dynamicMappingUpdateTimeout = dynamicMappingUpdateTimeout;
5861
}
5962

60-
6163
public void setClient(Client client) {
6264
this.client = client.admin().indices();
6365
}
6466

65-
private PutMappingRequestBuilder updateMappingRequest(Index index, String type, Mapping mappingUpdate, final TimeValue timeout) {
66-
if (type.equals(MapperService.DEFAULT_MAPPING)) {
67-
throw new IllegalArgumentException("_default_ mapping should not be updated");
68-
}
69-
return client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON)
70-
.setMasterNodeTimeout(timeout).setTimeout(TimeValue.ZERO);
71-
}
72-
73-
/**
74-
* Same as {@link #updateMappingOnMaster(Index, String, Mapping, TimeValue)}
75-
* using the default timeout.
76-
*/
77-
public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate) {
78-
updateMappingOnMaster(index, type, mappingUpdate, dynamicMappingUpdateTimeout);
79-
}
80-
8167
/**
8268
* Update mappings on the master node, waiting for the change to be committed,
8369
* but not for the mapping update to be applied on all nodes. The timeout specified by
8470
* {@code timeout} is the master node timeout ({@link MasterNodeRequest#masterNodeTimeout()}),
8571
* potentially waiting for a master node to be available.
8672
*/
87-
public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue masterNodeTimeout) {
88-
updateMappingRequest(index, type, mappingUpdate, masterNodeTimeout).get();
73+
public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, ActionListener<Void> listener) {
74+
if (type.equals(MapperService.DEFAULT_MAPPING)) {
75+
throw new IllegalArgumentException("_default_ mapping should not be updated");
76+
}
77+
client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON)
78+
.setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO)
79+
.execute(new ActionListener<AcknowledgedResponse>() {
80+
@Override
81+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
82+
listener.onResponse(null);
83+
}
84+
85+
@Override
86+
public void onFailure(Exception e) {
87+
listener.onFailure(unwrapException(e));
88+
}
89+
});
90+
}
91+
92+
private static Exception unwrapException(Exception cause) {
93+
return cause instanceof ElasticsearchException ? FutureUtils.unwrapEsException((ElasticsearchException) cause) : cause;
8994
}
9095
}

0 commit comments

Comments
 (0)