Skip to content

Commit

Permalink
Add changes for Create PIT and Delete PIT rest layer and rest high le…
Browse files Browse the repository at this point in the history
…vel client (#4064)

* Create and delete PIT search rest layer changes

Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie authored Sep 1, 2022
1 parent 4a6e937 commit 689a2c4
Show file tree
Hide file tree
Showing 29 changed files with 1,152 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
## [Unreleased]
### Added
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))
- Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064))
- Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342))
- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343))
- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.opensearch.index.reindex.ReindexRequest;
import org.opensearch.index.reindex.UpdateByQueryRequest;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.rest.action.search.RestCreatePitAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.script.mustache.MultiSearchTemplateRequest;
import org.opensearch.script.mustache.SearchTemplateRequest;
Expand Down Expand Up @@ -433,9 +436,19 @@ static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true");
params.withRouting(searchRequest.routing());
params.withPreference(searchRequest.preference());
params.withIndicesOptions(searchRequest.indicesOptions());
if (searchRequest.pointInTimeBuilder() == null) {
params.withIndicesOptions(searchRequest.indicesOptions());
}
params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
/**
* Merging search responses as part of CCS flow to reduce roundtrips is not supported for point in time -
* refer to org.opensearch.action.search.SearchResponseMerger
*/
if (searchRequest.pointInTimeBuilder() != null) {
params.putParam("ccs_minimize_roundtrips", "false");
} else {
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
}
if (searchRequest.getPreFilterShardSize() != null) {
params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
}
Expand Down Expand Up @@ -464,6 +477,27 @@ static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOExcep
return request;
}

static Request createPit(CreatePitRequest createPitRequest) throws IOException {
Params params = new Params();
params.putParam(RestCreatePitAction.ALLOW_PARTIAL_PIT_CREATION, Boolean.toString(createPitRequest.shouldAllowPartialPitCreation()));
params.putParam(RestCreatePitAction.KEEP_ALIVE, createPitRequest.getKeepAlive());
params.withIndicesOptions(createPitRequest.indicesOptions());
Request request = new Request(HttpPost.METHOD_NAME, endpoint(createPitRequest.indices(), "_search/point_in_time"));
request.addParameters(params.asMap());
request.setEntity(createEntity(createPitRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deletePit(DeletePitRequest deletePitRequest) throws IOException {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time");
request.setEntity(createEntity(deletePitRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deleteAllPits() {
return new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
Request request = new Request(HttpPost.METHOD_NAME, "/_msearch");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -1250,6 +1254,120 @@ public final Cancellable scrollAsync(
);
}

/**
* Create PIT context using create PIT API
*
* @param createPitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final CreatePitResponse createPit(CreatePitRequest createPitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
createPitRequest,
RequestConverters::createPit,
options,
CreatePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Create PIT context using create PIT API
*
* @param createPitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable createPitAsync(
CreatePitRequest createPitRequest,
RequestOptions options,
ActionListener<CreatePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
createPitRequest,
RequestConverters::createPit,
options,
CreatePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Delete point in time searches using delete PIT API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final DeletePitResponse deletePit(DeletePitRequest deletePitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
deletePitRequest,
RequestConverters::deletePit,
options,
DeletePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Delete point in time searches using delete PIT API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable deletePitAsync(
DeletePitRequest deletePitRequest,
RequestOptions options,
ActionListener<DeletePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
deletePitRequest,
RequestConverters::deletePit,
options,
DeletePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Delete all point in time searches using delete all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final DeletePitResponse deleteAllPits(RequestOptions options) throws IOException {
return performRequestAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.deleteAllPits(),
options,
DeletePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Delete all point in time searches using delete all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable deleteAllPitsAsync(RequestOptions options, ActionListener<DeletePitResponse> listener) {
return performRequestAsyncAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.deleteAllPits(),
options,
DeletePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Clears one or more scroll ids using the Clear Scroll API.
*
Expand Down
102 changes: 102 additions & 0 deletions client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.junit.Before;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitInfo;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Tests point in time API with rest high level client
*/
public class PitIT extends OpenSearchRestHighLevelClientTestCase {

@Before
public void indexDocuments() throws IOException {
Request doc1 = new Request(HttpPut.METHOD_NAME, "/index/_doc/1");
doc1.setJsonEntity("{\"type\":\"type1\", \"id\":1, \"num\":10, \"num2\":50}");
client().performRequest(doc1);
Request doc2 = new Request(HttpPut.METHOD_NAME, "/index/_doc/2");
doc2.setJsonEntity("{\"type\":\"type1\", \"id\":2, \"num\":20, \"num2\":40}");
client().performRequest(doc2);
Request doc3 = new Request(HttpPut.METHOD_NAME, "/index/_doc/3");
doc3.setJsonEntity("{\"type\":\"type1\", \"id\":3, \"num\":50, \"num2\":35}");
client().performRequest(doc3);
Request doc4 = new Request(HttpPut.METHOD_NAME, "/index/_doc/4");
doc4.setJsonEntity("{\"type\":\"type2\", \"id\":4, \"num\":100, \"num2\":10}");
client().performRequest(doc4);
Request doc5 = new Request(HttpPut.METHOD_NAME, "/index/_doc/5");
doc5.setJsonEntity("{\"type\":\"type2\", \"id\":5, \"num\":100, \"num2\":10}");
client().performRequest(doc5);
client().performRequest(new Request(HttpPost.METHOD_NAME, "/_refresh"));
}

public void testCreateAndDeletePit() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertEquals(1, pitResponse.getTotalShards());
assertEquals(1, pitResponse.getSuccessfulShards());
assertEquals(0, pitResponse.getFailedShards());
assertEquals(0, pitResponse.getSkippedShards());
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync);
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful());
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId()));
}

public void testDeleteAllPits() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertTrue(pitResponse1.getId() != null);
DeletePitResponse deletePitResponse = highLevelClient().deleteAllPits(RequestOptions.DEFAULT);
for (DeletePitInfo deletePitInfo : deletePitResponse.getDeletePitResults()) {
assertTrue(deletePitInfo.isSuccessful());
}
pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertTrue(pitResponse1.getId() != null);
ActionListener<DeletePitResponse> deletePitListener = new ActionListener<>() {
@Override
public void onResponse(DeletePitResponse response) {
for (DeletePitInfo deletePitInfo : response.getDeletePitResults()) {
assertTrue(deletePitInfo.isSuccessful());
}
}

@Override
public void onFailure(Exception e) {
if (!(e instanceof OpenSearchStatusException)) {
throw new AssertionError("Delete all failed");
}
}
};
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
// validate no pits case
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -131,6 +133,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -1303,6 +1306,47 @@ public void testClearScroll() throws IOException {
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testCreatePit() throws IOException {
String[] indices = randomIndicesNames(0, 5);
Map<String, String> expectedParams = new HashMap<>();
expectedParams.put("keep_alive", "1d");
expectedParams.put("allow_partial_pit_creation", "true");
CreatePitRequest createPitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, indices);
setRandomIndicesOptions(createPitRequest::indicesOptions, createPitRequest::indicesOptions, expectedParams);
Request request = RequestConverters.createPit(createPitRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
String index = String.join(",", indices);
if (Strings.hasLength(index)) {
endpoint.add(index);
}
endpoint.add("_search/point_in_time");
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(expectedParams, request.getParameters());
assertToXContentBody(createPitRequest, request.getEntity());
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testDeletePit() throws IOException {
List<String> pitIdsList = new ArrayList<>();
pitIdsList.add("pitId1");
pitIdsList.add("pitId2");
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIdsList);
Request request = RequestConverters.deletePit(deletePitRequest);
String endpoint = "/_search/point_in_time";
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals(endpoint, request.getEndpoint());
assertToXContentBody(deletePitRequest, request.getEntity());
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testDeleteAllPits() {
Request request = RequestConverters.deleteAllPits();
String endpoint = "/_search/point_in_time/_all";
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals(endpoint, request.getEndpoint());
}

public void testSearchTemplate() throws Exception {
// Create a random request.
String[] indices = randomIndicesNames(0, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public class RestHighLevelClientTests extends OpenSearchTestCase {
// core
"ping",
"info",
"delete_all_pits",
// security
"security.get_ssl_certificates",
"security.authenticate",
Expand Down
Loading

0 comments on commit 689a2c4

Please sign in to comment.