Skip to content

Commit 3a344fb

Browse files
bharath-techieVishalks
authored andcommitted
Added rest layer changes for List all PITs and PIT segments (opensearch-project#4388)
* Changes for list all and pit segments Signed-off-by: Bharathwaj G <[email protected]>
1 parent 9a1ed6f commit 3a344fb

28 files changed

+645
-221
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
66
- Add support for s390x architecture ([#4001](https://github.com/opensearch-project/OpenSearch/pull/4001))
77
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))
88
- Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064))
9+
- Point in time rest layer changes for list PIT and PIT segments API ([#4388](https://github.com/opensearch-project/OpenSearch/pull/4388))
910
- Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342))
1011
- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343))
1112
- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344))

client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java

+4
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,10 @@ static Request deleteAllPits() {
498498
return new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all");
499499
}
500500

501+
static Request getAllPits() {
502+
return new Request(HttpGet.METHOD_NAME, "/_search/point_in_time/_all");
503+
}
504+
501505
static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
502506
Request request = new Request(HttpPost.METHOD_NAME, "/_msearch");
503507

client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java

+35
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.opensearch.action.search.CreatePitResponse;
6464
import org.opensearch.action.search.DeletePitRequest;
6565
import org.opensearch.action.search.DeletePitResponse;
66+
import org.opensearch.action.search.GetAllPitNodesResponse;
6667
import org.opensearch.action.search.MultiSearchRequest;
6768
import org.opensearch.action.search.MultiSearchResponse;
6869
import org.opensearch.action.search.SearchRequest;
@@ -1368,6 +1369,40 @@ public final Cancellable deleteAllPitsAsync(RequestOptions options, ActionListen
13681369
);
13691370
}
13701371

1372+
/**
1373+
* Get all point in time searches using list all PITs API
1374+
*
1375+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
1376+
* @return the response
1377+
*/
1378+
public final GetAllPitNodesResponse getAllPits(RequestOptions options) throws IOException {
1379+
return performRequestAndParseEntity(
1380+
new MainRequest(),
1381+
(request) -> RequestConverters.getAllPits(),
1382+
options,
1383+
GetAllPitNodesResponse::fromXContent,
1384+
emptySet()
1385+
);
1386+
}
1387+
1388+
/**
1389+
* Asynchronously get all point in time searches using list all PITs API
1390+
*
1391+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
1392+
* @param listener the listener to be notified upon request completion
1393+
* @return the response
1394+
*/
1395+
public final Cancellable getAllPitsAsync(RequestOptions options, ActionListener<GetAllPitNodesResponse> listener) {
1396+
return performRequestAsyncAndParseEntity(
1397+
new MainRequest(),
1398+
(request) -> RequestConverters.getAllPits(),
1399+
options,
1400+
GetAllPitNodesResponse::fromXContent,
1401+
listener,
1402+
emptySet()
1403+
);
1404+
}
1405+
13711406
/**
13721407
* Clears one or more scroll ids using the Clear Scroll API.
13731408
*

client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java

+38-9
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
import org.opensearch.action.search.DeletePitInfo;
1919
import org.opensearch.action.search.DeletePitRequest;
2020
import org.opensearch.action.search.DeletePitResponse;
21+
import org.opensearch.action.search.GetAllPitNodesResponse;
2122
import org.opensearch.common.unit.TimeValue;
2223

2324
import java.io.IOException;
2425
import java.util.ArrayList;
2526
import java.util.List;
2627
import java.util.concurrent.TimeUnit;
28+
import java.util.stream.Collectors;
2729

2830
/**
2931
* Tests point in time API with rest high level client
@@ -52,21 +54,24 @@ public void indexDocuments() throws IOException {
5254

5355
public void testCreateAndDeletePit() throws IOException {
5456
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
55-
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
56-
assertTrue(pitResponse.getId() != null);
57-
assertEquals(1, pitResponse.getTotalShards());
58-
assertEquals(1, pitResponse.getSuccessfulShards());
59-
assertEquals(0, pitResponse.getFailedShards());
60-
assertEquals(0, pitResponse.getSkippedShards());
57+
CreatePitResponse createPitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
58+
assertTrue(createPitResponse.getId() != null);
59+
assertEquals(1, createPitResponse.getTotalShards());
60+
assertEquals(1, createPitResponse.getSuccessfulShards());
61+
assertEquals(0, createPitResponse.getFailedShards());
62+
assertEquals(0, createPitResponse.getSkippedShards());
63+
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
64+
List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
65+
assertTrue(pits.contains(createPitResponse.getId()));
6166
List<String> pitIds = new ArrayList<>();
62-
pitIds.add(pitResponse.getId());
67+
pitIds.add(createPitResponse.getId());
6368
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
6469
DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync);
6570
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful());
66-
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId()));
71+
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(createPitResponse.getId()));
6772
}
6873

69-
public void testDeleteAllPits() throws IOException {
74+
public void testDeleteAllAndListAllPits() throws IOException {
7075
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
7176
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
7277
CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
@@ -80,6 +85,11 @@ public void testDeleteAllPits() throws IOException {
8085
pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
8186
assertTrue(pitResponse.getId() != null);
8287
assertTrue(pitResponse1.getId() != null);
88+
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
89+
90+
List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
91+
assertTrue(pits.contains(pitResponse.getId()));
92+
assertTrue(pits.contains(pitResponse1.getId()));
8393
ActionListener<DeletePitResponse> deletePitListener = new ActionListener<>() {
8494
@Override
8595
public void onResponse(DeletePitResponse response) {
@@ -95,8 +105,27 @@ public void onFailure(Exception e) {
95105
}
96106
}
97107
};
108+
final CreatePitResponse pitResponse3 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
109+
110+
ActionListener<GetAllPitNodesResponse> getPitsListener = new ActionListener<GetAllPitNodesResponse>() {
111+
@Override
112+
public void onResponse(GetAllPitNodesResponse response) {
113+
List<String> pits = response.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
114+
assertTrue(pits.contains(pitResponse3.getId()));
115+
}
116+
117+
@Override
118+
public void onFailure(Exception e) {
119+
if (!(e instanceof OpenSearchStatusException)) {
120+
throw new AssertionError("List all PITs failed", e);
121+
}
122+
}
123+
};
124+
highLevelClient().getAllPitsAsync(RequestOptions.DEFAULT, getPitsListener);
98125
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
99126
// validate no pits case
127+
getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
128+
assertTrue(getAllPitResponse.getPitInfos().size() == 0);
100129
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
101130
}
102131
}

client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ public class RestHighLevelClientTests extends OpenSearchTestCase {
135135
"ping",
136136
"info",
137137
"delete_all_pits",
138+
"get_all_pits",
138139
// security
139140
"security.get_ssl_certificates",
140141
"security.authenticate",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"get_all_pits":{
3+
"documentation":{
4+
"url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/",
5+
"description":"Lists all active point in time searches."
6+
},
7+
"stability":"stable",
8+
"url":{
9+
"paths":[
10+
{
11+
"path":"/_search/point_in_time/_all",
12+
"methods":[
13+
"GET"
14+
]
15+
}
16+
]
17+
}
18+
}
19+
}

rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml

+12
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@
7979
- match: {hits.total: 3 }
8080
- length: {hits.hits: 1 }
8181

82+
- do:
83+
get_all_pits: {}
84+
85+
- match: {pits.0.pit_id: $pit_id}
86+
- match: {pits.0.keep_alive: 82800000 }
87+
8288
- do:
8389
delete_pit:
8490
body:
@@ -119,6 +125,12 @@
119125
- set: {pit_id: pit_id}
120126
- match: { _shards.failed: 0}
121127

128+
- do:
129+
get_all_pits: {}
130+
131+
- match: {pits.0.pit_id: $pit_id}
132+
- match: {pits.0.keep_alive: 82800000 }
133+
122134
- do:
123135
delete_all_pits: {}
124136

server/src/main/java/org/opensearch/action/ActionModule.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -238,16 +238,14 @@
238238
import org.opensearch.action.search.ClearScrollAction;
239239
import org.opensearch.action.search.CreatePitAction;
240240
import org.opensearch.action.search.DeletePitAction;
241-
import org.opensearch.action.search.GetAllPitsAction;
242241
import org.opensearch.action.search.MultiSearchAction;
243-
import org.opensearch.action.search.NodesGetAllPitsAction;
242+
import org.opensearch.action.search.GetAllPitsAction;
244243
import org.opensearch.action.search.SearchAction;
245244
import org.opensearch.action.search.SearchScrollAction;
246245
import org.opensearch.action.search.TransportClearScrollAction;
247246
import org.opensearch.action.search.TransportCreatePitAction;
248247
import org.opensearch.action.search.TransportDeletePitAction;
249248
import org.opensearch.action.search.TransportGetAllPitsAction;
250-
import org.opensearch.action.search.TransportNodesGetAllPitsAction;
251249
import org.opensearch.action.search.TransportMultiSearchAction;
252250
import org.opensearch.action.search.TransportSearchAction;
253251
import org.opensearch.action.search.TransportSearchScrollAction;
@@ -385,6 +383,7 @@
385383
import org.opensearch.rest.action.cat.RestClusterManagerAction;
386384
import org.opensearch.rest.action.cat.RestNodeAttrsAction;
387385
import org.opensearch.rest.action.cat.RestNodesAction;
386+
import org.opensearch.rest.action.cat.RestPitSegmentsAction;
388387
import org.opensearch.rest.action.cat.RestPluginsAction;
389388
import org.opensearch.rest.action.cat.RestRepositoriesAction;
390389
import org.opensearch.rest.action.cat.RestSegmentsAction;
@@ -413,6 +412,7 @@
413412
import org.opensearch.rest.action.search.RestCreatePitAction;
414413
import org.opensearch.rest.action.search.RestDeletePitAction;
415414
import org.opensearch.rest.action.search.RestExplainAction;
415+
import org.opensearch.rest.action.search.RestGetAllPitsAction;
416416
import org.opensearch.rest.action.search.RestMultiSearchAction;
417417
import org.opensearch.rest.action.search.RestSearchAction;
418418
import org.opensearch.rest.action.search.RestSearchScrollAction;
@@ -675,10 +675,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
675675

676676
// point in time actions
677677
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
678-
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
679678
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
680679
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);
681-
actions.register(NodesGetAllPitsAction.INSTANCE, TransportNodesGetAllPitsAction.class);
680+
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
682681

683682
// Remote Store
684683
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);
@@ -858,6 +857,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
858857
// Point in time API
859858
registerHandler.accept(new RestCreatePitAction());
860859
registerHandler.accept(new RestDeletePitAction());
860+
registerHandler.accept(new RestGetAllPitsAction(nodesInCluster));
861+
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));
861862

862863
for (ActionPlugin plugin : actionPlugins) {
863864
for (RestHandler handler : plugin.getRestHandlers(

server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java

+34
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.common.Strings;
1414
import org.opensearch.common.io.stream.StreamInput;
1515
import org.opensearch.common.io.stream.StreamOutput;
16+
import org.opensearch.common.xcontent.XContentParser;
1617

1718
import java.io.IOException;
1819
import java.util.ArrayList;
@@ -84,4 +85,37 @@ public ActionRequestValidationException validate() {
8485
}
8586
return validationException;
8687
}
88+
89+
public void fromXContent(XContentParser parser) throws IOException {
90+
pitIds.clear();
91+
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
92+
throw new IllegalArgumentException("Malformed content, must start with an object");
93+
} else {
94+
XContentParser.Token token;
95+
String currentFieldName = null;
96+
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
97+
if (token == XContentParser.Token.FIELD_NAME) {
98+
currentFieldName = parser.currentName();
99+
} else if ("pit_id".equals(currentFieldName)) {
100+
if (token == XContentParser.Token.START_ARRAY) {
101+
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
102+
if (token.isValue() == false) {
103+
throw new IllegalArgumentException("pit_id array element should only contain PIT identifier");
104+
}
105+
pitIds.add(parser.text());
106+
}
107+
} else {
108+
if (token.isValue() == false) {
109+
throw new IllegalArgumentException("pit_id element should only contain PIT identifier");
110+
}
111+
pitIds.add(parser.text());
112+
}
113+
} else {
114+
throw new IllegalArgumentException(
115+
"Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] "
116+
);
117+
}
118+
}
119+
}
120+
}
87121
}

server/src/main/java/org/opensearch/action/search/CreatePitController.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ public void executeCreatePit(
9898
task.getParentTaskId(),
9999
Collections.emptyMap()
100100
);
101+
/**
102+
* This is needed for cross cluster functionality to work with PITs and current ccsMinimizeRoundTrips is
103+
* not supported for point in time
104+
*/
105+
searchRequest.setCcsMinimizeRoundtrips(false);
101106
/**
102107
* Phase 1 of create PIT
103108
*/
@@ -193,6 +198,29 @@ void executeUpdatePitId(
193198
);
194199
for (Map.Entry<ShardId, SearchContextIdForNode> entry : contextId.shards().entrySet()) {
195200
DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode());
201+
if (node == null) {
202+
node = this.clusterService.state().getNodes().get(entry.getValue().getNode());
203+
}
204+
if (node == null) {
205+
logger.error(
206+
() -> new ParameterizedMessage(
207+
"Create pit update phase for PIT ID [{}] failed " + "because node [{}] not found",
208+
searchResponse.pointInTimeId(),
209+
entry.getValue().getNode()
210+
)
211+
);
212+
groupedActionListener.onFailure(
213+
new OpenSearchException(
214+
"Create pit update phase for PIT ID ["
215+
+ searchResponse.pointInTimeId()
216+
+ "] failed because node["
217+
+ entry.getValue().getNode()
218+
+ "] "
219+
+ "not found"
220+
)
221+
);
222+
return;
223+
}
196224
try {
197225
final Transport.Connection connection = searchTransportService.getConnection(entry.getValue().getClusterAlias(), node);
198226
searchTransportService.updatePitContext(
@@ -206,11 +234,12 @@ void executeUpdatePitId(
206234
groupedActionListener
207235
);
208236
} catch (Exception e) {
237+
String nodeName = node.getName();
209238
logger.error(
210239
() -> new ParameterizedMessage(
211240
"Create pit update phase failed for PIT ID [{}] on node [{}]",
212241
searchResponse.pointInTimeId(),
213-
node
242+
nodeName
214243
),
215244
e
216245
);

server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java

-11
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,11 @@
2121
*/
2222
public class GetAllPitNodesRequest extends BaseNodesRequest<GetAllPitNodesRequest> {
2323

24-
// Security plugin intercepts and sets the response with permitted PIT contexts
25-
private GetAllPitNodesResponse getAllPitNodesResponse;
26-
2724
@Inject
2825
public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) {
2926
super(concreteNodes);
3027
}
3128

32-
public void setGetAllPitNodesResponse(GetAllPitNodesResponse getAllPitNodesResponse) {
33-
this.getAllPitNodesResponse = getAllPitNodesResponse;
34-
}
35-
36-
public GetAllPitNodesResponse getGetAllPitNodesResponse() {
37-
return getAllPitNodesResponse;
38-
}
39-
4029
public GetAllPitNodesRequest(StreamInput in) throws IOException {
4130
super(in);
4231
}

0 commit comments

Comments
 (0)