Skip to content

Commit

Permalink
Add support for clear scroll to high level REST client (#25038)
Browse files Browse the repository at this point in the history
  • Loading branch information
javanna authored Jun 6, 2017
1 parent fbf2e3d commit d47d479
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.ActiveShardCount;
Expand Down Expand Up @@ -344,6 +345,11 @@ static Request searchScroll(SearchScrollRequest searchScrollRequest) throws IOEx
return new Request("GET", "/_search/scroll", Collections.emptyMap(), entity);
}

static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOException {
HttpEntity entity = createEntity(clearScrollRequest, REQUEST_BODY_CONTENT_TYPE);
return new Request("DELETE", "/_search/scroll", Collections.emptyMap(), entity);
}

private static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType) throws IOException {
BytesRef source = XContentHelper.toXContent(toXContent, xContentType, false).toBytesRef();
return new ByteArrayEntity(source.bytes, source.offset, source.length, ContentType.create(xContentType.mediaType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -347,6 +349,28 @@ public void searchScrollAsync(SearchScrollRequest searchScrollRequest, ActionLis
listener, emptySet(), headers);
}

/**
* Clears one or more scroll ids using the Clear Scroll api
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html#_clear_scroll_api">
* Clear Scroll API on elastic.co</a>
*/
public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, Header... headers) throws IOException {
return performRequestAndParseEntity(clearScrollRequest, Request::clearScroll, ClearScrollResponse::fromXContent,
emptySet(), headers);
}

/**
* Asynchronously clears one or more scroll ids using the Clear Scroll api
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html#_clear_scroll_api">
* Clear Scroll API on elastic.co</a>
*/
public void clearScrollAsync(ClearScrollRequest clearScrollRequest, ActionListener<ClearScrollResponse> listener, Header... headers) {
performRequestAsyncAndParseEntity(clearScrollRequest, Request::clearScroll, ClearScrollResponse::fromXContent,
listener, emptySet(), headers);
}

private <Req extends ActionRequest, Resp> Resp performRequestAndParseEntity(Req request,
CheckedFunction<Req, Request, IOException> requestConverter,
CheckedFunction<XContentParser, Resp, IOException> entityParser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchType;
Expand Down Expand Up @@ -731,6 +732,21 @@ public void testSearchScroll() throws IOException {
assertEquals("/_search/scroll", request.endpoint);
assertEquals(0, request.params.size());
assertToXContentBody(searchScrollRequest, request.entity);
assertEquals(Request.REQUEST_BODY_CONTENT_TYPE.mediaType(), request.entity.getContentType().getValue());
}

public void testClearScroll() throws IOException {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
int numScrolls = randomIntBetween(1, 10);
for (int i = 0; i < numScrolls; i++) {
clearScrollRequest.addScrollId(randomAlphaOfLengthBetween(5, 10));
}
Request request = Request.clearScroll(clearScrollRequest);
assertEquals("DELETE", request.method);
assertEquals("/_search/scroll", request.endpoint);
assertEquals(0, request.params.size());
assertToXContentBody(clearScrollRequest, request.entity);
assertEquals(Request.REQUEST_BODY_CONTENT_TYPE.mediaType(), request.entity.getContentType().getValue());
}

private static void assertToXContentBody(ToXContent expectedBody, HttpEntity actualEntity) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponseSections;
import org.elasticsearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -161,6 +163,19 @@ public void testSearchScroll() throws IOException {
isNotNull(HttpEntity.class), argThat(new HeadersVarargMatcher(headers)));
}

public void testClearScroll() throws IOException {
Header[] headers = randomHeaders(random(), "Header");
ClearScrollResponse mockClearScrollResponse = new ClearScrollResponse(randomBoolean(), randomIntBetween(0, Integer.MAX_VALUE));
mockResponse(mockClearScrollResponse);
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(randomAlphaOfLengthBetween(5, 10));
ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, headers);
assertEquals(mockClearScrollResponse.isSucceeded(), clearScrollResponse.isSucceeded());
assertEquals(mockClearScrollResponse.getNumFreed(), clearScrollResponse.getNumFreed());
verify(restClient).performRequest(eq("DELETE"), eq("/_search/scroll"), eq(Collections.emptyMap()),
isNotNull(HttpEntity.class), argThat(new HeadersVarargMatcher(headers)));
}

private void mockResponse(ToXContent toXContent) throws IOException {
Response response = mock(Response.class);
ContentType contentType = ContentType.parse(Request.REQUEST_BODY_CONTENT_TYPE.mediaType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@

package org.elasticsearch.client;

import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.join.aggregations.Children;
import org.elasticsearch.join.aggregations.ChildrenAggregationBuilder;
Expand All @@ -37,6 +45,7 @@
import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder;
Expand All @@ -46,11 +55,14 @@
import java.util.Arrays;
import java.util.Collections;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;

public class SearchIT extends ESRestHighLevelClientTestCase {
Expand Down Expand Up @@ -386,6 +398,63 @@ public void testSearchWithSuggest() throws IOException {
}
}

public void testSearchScroll() throws Exception {

for (int i = 0; i < 100; i++) {
XContentBuilder builder = jsonBuilder().startObject().field("field", i).endObject();
HttpEntity entity = new NStringEntity(builder.string(), ContentType.APPLICATION_JSON);
client().performRequest("PUT", "test/type1/" + Integer.toString(i), Collections.emptyMap(), entity);
}
client().performRequest("POST", "/test/_refresh");

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(35).sort("field", SortOrder.ASC);
SearchRequest searchRequest = new SearchRequest("test").scroll(TimeValue.timeValueMinutes(2)).source(searchSourceBuilder);
SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync);

try {
long counter = 0;
assertSearchHeader(searchResponse);
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100L));
assertThat(searchResponse.getHits().getHits().length, equalTo(35));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.getSortValues()[0]).longValue(), equalTo(counter++));
}

searchResponse = execute(new SearchScrollRequest(searchResponse.getScrollId()).scroll(TimeValue.timeValueMinutes(2)),
highLevelClient()::searchScroll, highLevelClient()::searchScrollAsync);

assertThat(searchResponse.getHits().getTotalHits(), equalTo(100L));
assertThat(searchResponse.getHits().getHits().length, equalTo(35));
for (SearchHit hit : searchResponse.getHits()) {
assertEquals(counter++, ((Number) hit.getSortValues()[0]).longValue());
}

searchResponse = execute(new SearchScrollRequest(searchResponse.getScrollId()).scroll(TimeValue.timeValueMinutes(2)),
highLevelClient()::searchScroll, highLevelClient()::searchScrollAsync);

assertThat(searchResponse.getHits().getTotalHits(), equalTo(100L));
assertThat(searchResponse.getHits().getHits().length, equalTo(30));
for (SearchHit hit : searchResponse.getHits()) {
assertEquals(counter++, ((Number) hit.getSortValues()[0]).longValue());
}
} finally {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(searchResponse.getScrollId());
ClearScrollResponse clearScrollResponse = execute(clearScrollRequest, highLevelClient()::clearScroll,
highLevelClient()::clearScrollAsync);
assertThat(clearScrollResponse.getNumFreed(), greaterThan(0));
assertTrue(clearScrollResponse.isSucceeded());

SearchScrollRequest scrollRequest = new SearchScrollRequest(searchResponse.getScrollId()).scroll(TimeValue.timeValueMinutes(2));
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> execute(scrollRequest,
highLevelClient()::searchScroll, highLevelClient()::searchScrollAsync));
assertEquals(RestStatus.NOT_FOUND, exception.status());
assertThat(exception.getRootCause(), instanceOf(ElasticsearchException.class));
ElasticsearchException rootCause = (ElasticsearchException) exception.getRootCause();
assertThat(rootCause.getMessage(), containsString("No search context found for"));
}
}

private static void assertSearchHeader(SearchResponse searchResponse) {
assertThat(searchResponse.getTook().nanos(), greaterThanOrEqualTo(0L));
assertEquals(0, searchResponse.getFailedShards());
Expand Down

0 comments on commit d47d479

Please sign in to comment.