Skip to content

Commit 07edfb5

Browse files
committed
Rename GET_FILES action to GET_SEGMENT_FILES and add a test for PeerReplicationSource.
Signed-off-by: Marc Handalian <[email protected]>
1 parent bef2ee6 commit 07edfb5

File tree

3 files changed

+130
-9
lines changed

3 files changed

+130
-9
lines changed

server/src/main/java/org/opensearch/indices/replication/PeerReplicationSource.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.List;
2222

2323
import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO;
24-
import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_FILES;
24+
import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES;
2525

2626
/**
2727
* Implementation of {@link SegmentReplicationSource} where the source is another Node.
@@ -30,19 +30,19 @@
3030
*/
3131
public class PeerReplicationSource implements SegmentReplicationSource {
3232

33-
private DiscoveryNode targetNode;
33+
private DiscoveryNode localNode;
3434
private String allocationId;
3535
private RetryableTransportClient transportClient;
3636

3737
public PeerReplicationSource(
3838
TransportService transportService,
3939
RecoverySettings recoverySettings,
40-
DiscoveryNode sourceNode,
4140
DiscoveryNode targetNode,
41+
DiscoveryNode localNode,
4242
String allocationId
4343
) {
44-
transportClient = new RetryableTransportClient(transportService, sourceNode, recoverySettings.internalActionLongTimeout());
45-
this.targetNode = targetNode;
44+
transportClient = new RetryableTransportClient(transportService, targetNode, recoverySettings.internalActionLongTimeout());
45+
this.localNode = localNode;
4646
this.allocationId = allocationId;
4747
}
4848

@@ -54,7 +54,7 @@ public void getCheckpointMetadata(
5454
) {
5555
final Writeable.Reader<CheckpointInfoResponse> reader = CheckpointInfoResponse::new;
5656
final ActionListener<CheckpointInfoResponse> responseListener = ActionListener.map(listener, r -> r);
57-
CheckpointInfoRequest request = new CheckpointInfoRequest(replicationId, allocationId, targetNode, checkpoint);
57+
CheckpointInfoRequest request = new CheckpointInfoRequest(replicationId, allocationId, localNode, checkpoint);
5858
transportClient.executeRetryableAction(GET_CHECKPOINT_INFO, request, responseListener, reader);
5959
}
6060

@@ -69,7 +69,7 @@ public void getSegmentFiles(
6969
final Writeable.Reader<GetSegmentFilesResponse> reader = GetSegmentFilesResponse::new;
7070
final ActionListener<GetSegmentFilesResponse> responseListener = ActionListener.map(listener, r -> r);
7171

72-
GetSegmentFilesRequest request = new GetSegmentFilesRequest(replicationId, allocationId, targetNode, filesToFetch, checkpoint);
73-
transportClient.executeRetryableAction(GET_FILES, request, responseListener, reader);
72+
GetSegmentFilesRequest request = new GetSegmentFilesRequest(replicationId, allocationId, localNode, filesToFetch, checkpoint);
73+
transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader);
7474
}
7575
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class SegmentReplicationSourceService {
2323
*/
2424
public static class Actions {
2525
public static final String GET_CHECKPOINT_INFO = "internal:index/shard/segrep/checkpoint_info";
26-
public static final String GET_FILES = "internal:index/shard/segrep/get_files";
26+
public static final String GET_SEGMENT_FILES = "internal:index/shard/segrep/get_segment_files";
2727
}
2828

2929
public SegmentReplicationSourceService() {
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.replication;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.action.ActionListener;
13+
import org.opensearch.cluster.node.DiscoveryNode;
14+
import org.opensearch.cluster.node.DiscoveryNodeRole;
15+
import org.opensearch.cluster.service.ClusterService;
16+
import org.opensearch.common.settings.ClusterSettings;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.core.internal.io.IOUtils;
19+
import org.opensearch.index.shard.IndexShard;
20+
import org.opensearch.index.shard.IndexShardTestCase;
21+
import org.opensearch.index.store.Store;
22+
import org.opensearch.indices.recovery.RecoverySettings;
23+
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
24+
import org.opensearch.test.transport.CapturingTransport;
25+
import org.opensearch.transport.TransportService;
26+
27+
import java.util.Collections;
28+
29+
import static java.util.Collections.emptyMap;
30+
import static org.hamcrest.Matchers.equalTo;
31+
import static org.mockito.Mockito.mock;
32+
import static org.opensearch.test.ClusterServiceUtils.createClusterService;
33+
34+
public class PeerReplicationSourceTests extends IndexShardTestCase {
35+
36+
private CapturingTransport transport;
37+
private ClusterService clusterService;
38+
private TransportService transportService;
39+
private PeerReplicationSource peerReplicationSource;
40+
private IndexShard indexShard;
41+
private DiscoveryNode localNode;
42+
private DiscoveryNode sourceNode;
43+
44+
@Override
45+
public void setUp() throws Exception {
46+
super.setUp();
47+
final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build();
48+
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
49+
final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);
50+
transport = new CapturingTransport();
51+
localNode = newDiscoveryNode("localNode");
52+
sourceNode = newDiscoveryNode("sourceNode");
53+
clusterService = createClusterService(threadPool, localNode);
54+
transportService = transport.createTransportService(
55+
clusterService.getSettings(),
56+
threadPool,
57+
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
58+
boundAddress -> clusterService.localNode(),
59+
null,
60+
Collections.emptySet()
61+
);
62+
transportService.start();
63+
transportService.acceptIncomingRequests();
64+
65+
indexShard = newStartedShard(true);
66+
67+
peerReplicationSource = new PeerReplicationSource(
68+
transportService,
69+
recoverySettings,
70+
sourceNode,
71+
localNode,
72+
indexShard.routingEntry().allocationId().toString()
73+
);
74+
}
75+
76+
@Override
77+
public void tearDown() throws Exception {
78+
IOUtils.close(transportService, clusterService, transport);
79+
closeShards(indexShard);
80+
super.tearDown();
81+
}
82+
83+
public void testGetCheckpointMetadata_invokesTransportService() {
84+
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L);
85+
final long replicationId = 1L;
86+
peerReplicationSource.getCheckpointMetadata(replicationId, checkpoint, mock(ActionListener.class));
87+
CapturingTransport.CapturedRequest[] capturedRequests1 = transport.getCapturedRequestsAndClear();
88+
assertThat(capturedRequests1.length, equalTo(1));
89+
CapturingTransport.CapturedRequest request = capturedRequests1[0];
90+
assertEquals(sourceNode, request.node);
91+
assertEquals(SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, request.action);
92+
}
93+
94+
public void testGetFiles_invokesTransportService() {
95+
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L);
96+
final long replicationId = 1L;
97+
peerReplicationSource.getSegmentFiles(
98+
replicationId,
99+
checkpoint,
100+
Collections.emptyList(),
101+
mock(Store.class),
102+
mock(ActionListener.class)
103+
);
104+
CapturingTransport.CapturedRequest[] capturedRequests1 = transport.getCapturedRequestsAndClear();
105+
assertThat(capturedRequests1.length, equalTo(1));
106+
CapturingTransport.CapturedRequest request = capturedRequests1[0];
107+
assertEquals(sourceNode, request.node);
108+
assertEquals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, request.action);
109+
}
110+
111+
private DiscoveryNode newDiscoveryNode(String nodeName) {
112+
return new DiscoveryNode(
113+
nodeName,
114+
randomAlphaOfLength(10),
115+
buildNewFakeTransportAddress(),
116+
emptyMap(),
117+
Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE),
118+
Version.CURRENT
119+
);
120+
}
121+
}

0 commit comments

Comments
 (0)