Skip to content

Commit 59f9784

Browse files
committed
Prepare to make send translog of recovery non-blocking (#37458)
This commit prepares the required infra to make send a translog snapshot of the recovery source non-blocking. I'll make a follow-up to make the send snapshot method non-blocking. Relates #37291
1 parent 0a2d0cd commit 59f9784

File tree

10 files changed

+162
-158
lines changed

10 files changed

+162
-158
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2966,7 +2966,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
29662966
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
29672967
*
29682968
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
2969-
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long)
2969+
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long, ActionListener)
29702970
*/
29712971
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
29722972
assert seqNo != UNASSIGNED_SEQ_NO

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -481,14 +481,12 @@ class TranslogOperationsRequestHandler implements TransportRequestHandler<Recove
481481
@Override
482482
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws IOException {
483483
try (RecoveryRef recoveryRef =
484-
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
484+
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
485485
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
486486
final RecoveryTarget recoveryTarget = recoveryRef.target();
487-
try {
488-
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
489-
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary());
490-
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
491-
} catch (MapperException exception) {
487+
final ActionListener<RecoveryTranslogOperationsResponse> listener =
488+
new HandledTransportAction.ChannelActionListener<>(channel, Actions.TRANSLOG_OPS, request);
489+
final Consumer<Exception> retryOnMappingException = exception -> {
492490
// in very rare cases a translog replay from primary is processed before a mapping update on this node
493491
// which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
494492
logger.debug("delaying recovery due to missing mapping changes", exception);
@@ -500,31 +498,36 @@ public void onNewClusterState(ClusterState state) {
500498
try {
501499
messageReceived(request, channel);
502500
} catch (Exception e) {
503-
onFailure(e);
504-
}
505-
}
506-
507-
protected void onFailure(Exception e) {
508-
try {
509-
channel.sendResponse(e);
510-
} catch (IOException e1) {
511-
logger.warn("failed to send error back to recovery source", e1);
501+
listener.onFailure(e);
512502
}
513503
}
514504

515505
@Override
516506
public void onClusterServiceClose() {
517-
onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates"));
507+
listener.onFailure(new ElasticsearchException(
508+
"cluster service was closed while waiting for mapping updates"));
518509
}
519510

520511
@Override
521512
public void onTimeout(TimeValue timeout) {
522513
// note that we do not use a timeout (see comment above)
523-
onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout +
524-
"])"));
514+
listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " +
515+
"(timeout [" + timeout + "])"));
525516
}
526517
});
527-
}
518+
};
519+
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
520+
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
521+
ActionListener.wrap(
522+
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
523+
e -> {
524+
if (e instanceof MapperException) {
525+
retryOnMappingException.accept(e);
526+
} else {
527+
listener.onFailure(e);
528+
}
529+
})
530+
);
528531
}
529532
}
530533
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.Version;
3434
import org.elasticsearch.action.ActionListener;
3535
import org.elasticsearch.action.StepListener;
36+
import org.elasticsearch.action.support.PlainActionFuture;
3637
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3738
import org.elasticsearch.cluster.routing.ShardRouting;
3839
import org.elasticsearch.common.StopWatch;
@@ -226,25 +227,27 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
226227
logger.trace("snapshot translog for recovery; current size is [{}]",
227228
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
228229
}
229-
final SendSnapshotResult sendSnapshotResult;
230-
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
231-
// we can release the retention lock here because the snapshot itself will retain the required operations.
232-
IOUtils.close(retentionLock, () -> resources.remove(retentionLock));
233-
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
234-
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
235-
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
236-
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
237-
sendSnapshotResult = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
238-
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
239-
} catch (Exception e) {
240-
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
241-
}
242230

231+
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
232+
resources.add(phase2Snapshot);
233+
// we can release the retention lock here because the snapshot itself will retain the required operations.
234+
IOUtils.close(retentionLock);
235+
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
236+
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
237+
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
238+
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
239+
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
240+
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
241+
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
242+
sendSnapshotStep.whenComplete(
243+
r -> IOUtils.close(phase2Snapshot),
244+
e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)));
243245
final StepListener<Void> finalizeStep = new StepListener<>();
244-
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint, finalizeStep);
246+
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
247+
245248
finalizeStep.whenComplete(r -> {
246-
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
247249
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
250+
final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
248251
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
249252
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
250253
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
@@ -507,10 +510,17 @@ TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int to
507510
* @param snapshot a snapshot of the translog
508511
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
509512
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
510-
* @return the send snapshot result
513+
* @param listener a listener which will be notified with the local checkpoint on the target.
511514
*/
512-
SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
513-
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
515+
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
516+
long maxSeqNoOfUpdatesOrDeletes, ActionListener<SendSnapshotResult> listener) throws IOException {
517+
ActionListener.completeWith(listener, () -> sendSnapshotBlockingly(
518+
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes));
519+
}
520+
521+
private SendSnapshotResult sendSnapshotBlockingly(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
522+
Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
523+
long maxSeqNoOfUpdatesOrDeletes) throws IOException {
514524
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
515525
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
516526
assert startingSeqNo <= requiredSeqNoRangeStart :
@@ -538,9 +548,11 @@ SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long
538548
}
539549

540550
final CancellableThreads.IOInterruptible sendBatch = () -> {
541-
final long targetCheckpoint = recoveryTarget.indexTranslogOperations(
542-
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
543-
targetLocalCheckpoint.set(targetCheckpoint);
551+
// TODO: Make this non-blocking
552+
final PlainActionFuture<Long> future = new PlainActionFuture<>();
553+
recoveryTarget.indexTranslogOperations(
554+
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, future);
555+
targetLocalCheckpoint.set(future.actionGet());
544556
};
545557

546558
// send operations in batches

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -397,40 +397,42 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
397397
}
398398

399399
@Override
400-
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
401-
long maxSeqNoOfDeletesOrUpdatesOnPrimary) throws IOException {
402-
final RecoveryState.Translog translog = state().getTranslog();
403-
translog.totalOperations(totalTranslogOps);
404-
assert indexShard().recoveryState() == state();
405-
if (indexShard().state() != IndexShardState.RECOVERING) {
406-
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
407-
}
408-
/*
409-
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
410-
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
411-
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
412-
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
413-
*/
414-
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
415-
/*
416-
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
417-
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that operation was executed on.
418-
*/
419-
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
420-
for (Translog.Operation operation : operations) {
421-
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
422-
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
423-
throw new MapperException("mapping updates are not allowed [" + operation + "]");
400+
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
401+
long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener<Long> listener) {
402+
ActionListener.completeWith(listener, () -> {
403+
final RecoveryState.Translog translog = state().getTranslog();
404+
translog.totalOperations(totalTranslogOps);
405+
assert indexShard().recoveryState() == state();
406+
if (indexShard().state() != IndexShardState.RECOVERING) {
407+
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
424408
}
425-
assert result.getFailure() == null: "unexpected failure while replicating translog entry: " + result.getFailure();
426-
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
427-
}
428-
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
429-
translog.incrementRecoveredOperations(operations.size());
430-
indexShard().sync();
431-
// roll over / flush / trim if needed
432-
indexShard().afterWriteOperation();
433-
return indexShard().getLocalCheckpoint();
409+
/*
410+
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
411+
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
412+
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
413+
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
414+
*/
415+
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
416+
/*
417+
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
418+
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on.
419+
*/
420+
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
421+
for (Translog.Operation operation : operations) {
422+
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
423+
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
424+
throw new MapperException("mapping updates are not allowed [" + operation + "]");
425+
}
426+
assert result.getFailure() == null : "unexpected failure while replicating translog entry: " + result.getFailure();
427+
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
428+
}
429+
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
430+
translog.incrementRecoveredOperations(operations.size());
431+
indexShard().sync();
432+
// roll over / flush / trim if needed
433+
indexShard().afterWriteOperation();
434+
return indexShard().getLocalCheckpoint();
435+
});
434436
}
435437

436438
@Override

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,11 @@ public interface RecoveryTargetHandler {
6868
* @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on
6969
* the primary shard when capturing these operations. This value is at least as high as the
7070
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
71-
* @return the local checkpoint on the target shard
71+
* @param listener a listener which will be notified with the local checkpoint on the target
72+
* after these operations are successfully indexed on the target.
7273
*/
73-
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
74-
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) throws IOException;
74+
void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
75+
long maxSeqNoOfUpdatesOrDeletesOnPrimary, ActionListener<Long> listener);
7576

7677
/**
7778
* Notifies the target of the files it is going to receive

0 commit comments

Comments
 (0)