Skip to content

Commit c508939

Browse files
committed
Add Orchestration components for SegmentReplication.
This change introduces target and source services for Segment Replication. It also refactors PeerRecoveryTargetService and RemoteRecoveryTargetHandler to reuse a FileChunkRequestHandler and transport client that issues retryable requests. Signed-off-by: Marc Handalian <[email protected]>
1 parent a023ad9 commit c508939

28 files changed

+1258
-229
lines changed

server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
import org.opensearch.index.shard.IndexShardState;
7878
import org.opensearch.index.shard.ShardId;
7979
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
80-
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
80+
import org.opensearch.indices.recovery.FileChunkRequest;
8181
import org.opensearch.monitor.fs.FsInfo;
8282
import org.opensearch.plugins.Plugin;
8383
import org.opensearch.snapshots.SnapshotState;
@@ -397,7 +397,7 @@ public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionExc
397397
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
398398
(connection, requestId, action, request, options) -> {
399399
if (corrupt.get() && action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
400-
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
400+
FileChunkRequest req = (FileChunkRequest) request;
401401
byte[] array = BytesRef.deepCopyOf(req.content().toBytesRef()).bytes;
402402
int i = randomIntBetween(0, req.content().length() - 1);
403403
array[i] = (byte) ~array[i]; // flip one byte in the content
@@ -474,11 +474,11 @@ public void testCorruptionOnNetworkLayer() throws ExecutionException, Interrupte
474474
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
475475
(connection, requestId, action, request, options) -> {
476476
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
477-
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
477+
FileChunkRequest req = (FileChunkRequest) request;
478478
if (truncate && req.length() > 1) {
479479
BytesRef bytesRef = req.content().toBytesRef();
480480
BytesArray array = new BytesArray(bytesRef.bytes, bytesRef.offset, (int) req.length() - 1);
481-
request = new RecoveryFileChunkRequest(
481+
request = new FileChunkRequest(
482482
req.recoveryId(),
483483
req.requestSeqNo(),
484484
req.shardId(),

server/src/internalClusterTest/java/org/opensearch/recovery/RelocationIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
import org.opensearch.index.shard.IndexShardState;
6868
import org.opensearch.index.shard.ShardId;
6969
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
70-
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
70+
import org.opensearch.indices.recovery.FileChunkRequest;
7171
import org.opensearch.plugins.Plugin;
7272
import org.opensearch.search.SearchHit;
7373
import org.opensearch.search.SearchHits;
@@ -809,7 +809,7 @@ public void sendRequest(
809809
TransportRequestOptions options
810810
) throws IOException {
811811
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
812-
RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request;
812+
FileChunkRequest chunkRequest = (FileChunkRequest) request;
813813
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
814814
// corrupting the segments_N files in order to make sure future recovery re-send files
815815
logger.debug("corrupting [{}] to {}. file name: [{}]", action, connection.getNode(), chunkRequest.name());

server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import org.opensearch.common.unit.ByteSizeValue;
4444
import org.opensearch.index.query.QueryBuilders;
4545
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
46-
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
46+
import org.opensearch.indices.recovery.FileChunkRequest;
4747
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
4848
import org.opensearch.plugins.Plugin;
4949
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -146,7 +146,7 @@ public void testCancelRecoveryAndResume() throws Exception {
146146
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
147147
(connection, requestId, action, request, options) -> {
148148
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
149-
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
149+
FileChunkRequest req = (FileChunkRequest) request;
150150
logger.info("file chunk [{}] lastChunk: {}", req, req.lastChunk());
151151
if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) {
152152
latch.countDown();

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,6 +1504,11 @@ static Engine.Searcher wrapSearcher(
15041504
}
15051505
}
15061506

1507+
public IndexCommit getLatestSegmentInfos() {
1508+
// TODO: Fix to create metadata from seginfos.
1509+
return null;
1510+
}
1511+
15071512
/**
15081513
* Wrapper for a non-closing reader
15091514
*
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@
4343
import java.io.IOException;
4444

4545
/**
46-
* Request for a recovery file chunk
46+
* Request containing a file chunk.
4747
*
4848
* @opensearch.internal
4949
*/
50-
public final class RecoveryFileChunkRequest extends RecoveryTransportRequest {
50+
public final class FileChunkRequest extends RecoveryTransportRequest {
5151
private final boolean lastChunk;
5252
private final long recoveryId;
5353
private final ShardId shardId;
@@ -58,7 +58,7 @@ public final class RecoveryFileChunkRequest extends RecoveryTransportRequest {
5858

5959
private final int totalTranslogOps;
6060

61-
public RecoveryFileChunkRequest(StreamInput in) throws IOException {
61+
public FileChunkRequest(StreamInput in) throws IOException {
6262
super(in);
6363
recoveryId = in.readLong();
6464
shardId = new ShardId(in);
@@ -75,7 +75,7 @@ public RecoveryFileChunkRequest(StreamInput in) throws IOException {
7575
sourceThrottleTimeInNanos = in.readLong();
7676
}
7777

78-
public RecoveryFileChunkRequest(
78+
public FileChunkRequest(
7979
long recoveryId,
8080
final long requestSeqNo,
8181
ShardId shardId,
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.recovery;
10+
11+
import org.apache.lucene.store.RateLimiter;
12+
import org.opensearch.action.ActionListener;
13+
import org.opensearch.indices.replication.common.ReplicationCollection;
14+
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
15+
import org.opensearch.indices.replication.common.ReplicationTarget;
16+
import org.opensearch.tasks.Task;
17+
import org.opensearch.transport.TransportChannel;
18+
import org.opensearch.transport.TransportRequestHandler;
19+
20+
import java.util.concurrent.atomic.AtomicLong;
21+
22+
/**
23+
* A request handler for processing file-chunk Requests.
24+
* @param <T> a {@link ReplicationTarget} implementation that can write incoming file chunks.
25+
*
26+
* @opensearch.internal
27+
*/
28+
public class FileChunkRequestHandler<T extends ReplicationTarget> implements TransportRequestHandler<FileChunkRequest> {
29+
30+
// How many bytes we've copied since we last called RateLimiter.pause
31+
private final AtomicLong bytesSinceLastPause = new AtomicLong();
32+
private final RecoverySettings recoverySettings;
33+
private final ReplicationCollection<T> onGoingTransfers;
34+
35+
public FileChunkRequestHandler(ReplicationCollection<T> onGoingTransfers, RecoverySettings recoverySettings) {
36+
this.onGoingTransfers = onGoingTransfers;
37+
this.recoverySettings = recoverySettings;
38+
}
39+
40+
@Override
41+
public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception {
42+
try (ReplicationCollection.ReplicationRef<T> recoveryRef = onGoingTransfers.getSafe(request.recoveryId(), request.shardId())) {
43+
final ReplicationTarget replicationTarget = recoveryRef.get();
44+
final ActionListener<Void> listener = replicationTarget.createOrFinishListener(
45+
channel,
46+
PeerRecoveryTargetService.Actions.FILE_CHUNK,
47+
request
48+
);
49+
if (listener == null) {
50+
return;
51+
}
52+
53+
final ReplicationLuceneIndex indexState = replicationTarget.state().getIndex();
54+
if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) {
55+
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
56+
}
57+
58+
RateLimiter rateLimiter = recoverySettings.rateLimiter();
59+
if (rateLimiter != null) {
60+
long bytes = bytesSinceLastPause.addAndGet(request.content().length());
61+
if (bytes > rateLimiter.getMinPauseCheckBytes()) {
62+
// Time to pause
63+
bytesSinceLastPause.addAndGet(-bytes);
64+
long throttleTimeInNanos = rateLimiter.pause(bytes);
65+
indexState.addTargetThrottling(throttleTimeInNanos);
66+
replicationTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
67+
}
68+
}
69+
70+
replicationTarget.writeFileChunk(
71+
request.metadata(),
72+
request.position(),
73+
request.content(),
74+
request.lastChunk(),
75+
request.totalTranslogOps(),
76+
listener
77+
);
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)