Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.Engine.Operation.Origin;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequestBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
Expand Down Expand Up @@ -80,15 +82,17 @@ public void clearAllowedOperations() {
* Executes the cancellation test
*/
private void testCancel(
String action,
ActionType<BulkByScrollResponse> action,
AbstractBulkByScrollRequestBuilder<?, ?> builder,
CancelAssertion assertion,
Matcher<String> taskDescriptionMatcher
) throws Exception {
createIndex(INDEX);

// Scroll by 1 so that cancellation is easier to control
builder.source().setSize(1);
AbstractBulkByScrollRequest<?> request = builder.request();
// Total number of documents created for this test (~10 per primary shard per slice)
int numDocs = getNumShards(INDEX).numPrimaries * 10 * builder.request().getSlices();
int numDocs = getNumShards(INDEX).numPrimaries * 10 * request.getSlices();
ALLOWED_OPERATIONS.release(numDocs);

logger.debug("setting up [{}] docs", numDocs);
Expand All @@ -105,18 +109,15 @@ private void testCancel(
assertHitCount(prepareSearch(INDEX).setSize(0), numDocs);
assertThat(ALLOWED_OPERATIONS.drainPermits(), equalTo(0));

// Scroll by 1 so that cancellation is easier to control
builder.source().setSize(1);

/* Allow a random number of the documents less the number of workers
* to be modified by the reindex action. That way at least one worker
* is blocked. */
int numModifiedDocs = randomIntBetween(builder.request().getSlices() * 2, numDocs);
int numModifiedDocs = randomIntBetween(request.getSlices() * 2, numDocs);
logger.debug("chose to modify [{}] out of [{}] docs", numModifiedDocs, numDocs);
ALLOWED_OPERATIONS.release(numModifiedDocs - builder.request().getSlices());
ALLOWED_OPERATIONS.release(numModifiedDocs - request.getSlices());

// Now execute the reindex action...
ActionFuture<? extends BulkByScrollResponse> future = builder.execute();
ActionFuture<? extends BulkByScrollResponse> future = client().execute(action, request);

/* ... and wait for the indexing operation listeners to block. It
* is important to realize that some of the workers might have
Expand All @@ -130,7 +131,7 @@ private void testCancel(
); // 10 seconds is usually fine but on heavily loaded machines this can take a while

// Status should show the task running
TaskInfo mainTask = findTaskToCancel(action, builder.request().getSlices());
TaskInfo mainTask = findTaskToCancel(action.name(), request.getSlices());
BulkByScrollTask.Status status = (BulkByScrollTask.Status) mainTask.status();
assertNull(status.getReasonCancelled());

Expand All @@ -150,7 +151,7 @@ private void testCancel(
logger.debug("asserting that parent is marked canceled {}", status);
assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled());

if (builder.request().getSlices() > 1) {
if (request.getSlices() > 1) {
boolean foundCancelled = false;
ListTasksResponse sliceList = clusterAdmin().prepareListTasks()
.setTargetParentTaskId(mainTask.taskId())
Expand All @@ -168,11 +169,11 @@ private void testCancel(
}

logger.debug("unblocking the blocked update");
ALLOWED_OPERATIONS.release(builder.request().getSlices());
ALLOWED_OPERATIONS.release(request.getSlices());

// Checks that no more operations are executed
assertBusy(() -> {
if (builder.request().getSlices() == 1) {
if (request.getSlices() == 1) {
/* We can only be sure that we've drained all the permits if we only use a single worker. Otherwise some worker may have
* exhausted all of its documents before we blocked. */
assertEquals(0, ALLOWED_OPERATIONS.availablePermits());
Expand All @@ -191,7 +192,7 @@ private void testCancel(
String tasks = clusterAdmin().prepareListTasks().setTargetParentTaskId(mainTask.taskId()).setDetailed(true).get().toString();
throw new RuntimeException("Exception while waiting for the response. Running tasks: " + tasks, e);
} finally {
if (builder.request().getSlices() >= 1) {
if (request.getSlices() >= 1) {
// If we have more than one worker we might not have made all the modifications
numModifiedDocs -= ALLOWED_OPERATIONS.availablePermits();
}
Expand Down Expand Up @@ -221,7 +222,7 @@ public static TaskInfo findTaskToCancel(String actionName, int workerCount) {
}

public void testReindexCancel() throws Exception {
testCancel(ReindexAction.NAME, reindex().source(INDEX).destination("dest"), (response, total, modified) -> {
testCancel(ReindexAction.INSTANCE, reindex().source(INDEX).destination("dest"), (response, total, modified) -> {
assertThat(response, matcher().created(modified).reasonCancelled(equalTo("by user request")));

refresh("dest");
Expand All @@ -239,17 +240,22 @@ public void testUpdateByQueryCancel() throws Exception {
}""");
assertAcked(clusterAdmin().preparePutPipeline("set-processed", pipeline, XContentType.JSON).get());

testCancel(UpdateByQueryAction.NAME, updateByQuery().setPipeline("set-processed").source(INDEX), (response, total, modified) -> {
assertThat(response, matcher().updated(modified).reasonCancelled(equalTo("by user request")));
assertHitCount(prepareSearch(INDEX).setSize(0).setQuery(termQuery("processed", true)), modified);
}, equalTo("update-by-query [" + INDEX + "]"));
testCancel(
UpdateByQueryAction.INSTANCE,
updateByQuery().setPipeline("set-processed").source(INDEX),
(response, total, modified) -> {
assertThat(response, matcher().updated(modified).reasonCancelled(equalTo("by user request")));
assertHitCount(prepareSearch(INDEX).setSize(0).setQuery(termQuery("processed", true)), modified);
},
equalTo("update-by-query [" + INDEX + "]")
);

assertAcked(clusterAdmin().deletePipeline(new DeletePipelineRequest("set-processed")).get());
}

public void testDeleteByQueryCancel() throws Exception {
testCancel(
DeleteByQueryAction.NAME,
DeleteByQueryAction.INSTANCE,
deleteByQuery().source(INDEX).filter(QueryBuilders.matchAllQuery()),
(response, total, modified) -> {
assertThat(response, matcher().deleted(modified).reasonCancelled(equalTo("by user request")));
Expand All @@ -261,7 +267,7 @@ public void testDeleteByQueryCancel() throws Exception {

public void testReindexCancelWithWorkers() throws Exception {
testCancel(
ReindexAction.NAME,
ReindexAction.INSTANCE,
reindex().source(INDEX).filter(QueryBuilders.matchAllQuery()).destination("dest").setSlices(5),
(response, total, modified) -> {
assertThat(response, matcher().created(modified).reasonCancelled(equalTo("by user request")).slices(hasSize(5)));
Expand All @@ -283,7 +289,7 @@ public void testUpdateByQueryCancelWithWorkers() throws Exception {
assertAcked(clusterAdmin().preparePutPipeline("set-processed", pipeline, XContentType.JSON).get());

testCancel(
UpdateByQueryAction.NAME,
UpdateByQueryAction.INSTANCE,
updateByQuery().setPipeline("set-processed").source(INDEX).setSlices(5),
(response, total, modified) -> {
assertThat(response, matcher().updated(modified).reasonCancelled(equalTo("by user request")).slices(hasSize(5)));
Expand All @@ -297,7 +303,7 @@ public void testUpdateByQueryCancelWithWorkers() throws Exception {

public void testDeleteByQueryCancelWithWorkers() throws Exception {
testCancel(
DeleteByQueryAction.NAME,
DeleteByQueryAction.INSTANCE,
deleteByQuery().source(INDEX).filter(QueryBuilders.matchAllQuery()).setSlices(5),
(response, total, modified) -> {
assertThat(response, matcher().deleted(modified).reasonCancelled(equalTo("by user request")).slices(hasSize(5)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testDeprecatedSort() {
int subsetSize = randomIntBetween(1, max - 1);
ReindexRequestBuilder copy = new ReindexRequestBuilder(client()).source("source").destination("dest").refresh(true);
copy.maxDocs(subsetSize);
copy.request().addSortField("foo", SortOrder.DESC);
copy.source().addSort("foo", SortOrder.DESC);
assertThat(copy.get(), matcher().created(subsetSize));

assertHitCount(client().prepareSearch("dest").setSize(0), subsetSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ public void testFilterDocFreq() throws ExecutionException, InterruptedException,
List<String> tags = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
tags.add("tag_" + i);
builders.add(prepareIndex("test").setId(i + "").setSource("tags", tags));
builders.add(prepareIndex("test").setId(i + "").setSource("tags", List.copyOf(tags)));
}
indexRandom(true, builders);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,20 @@ public void testSetSource() throws Exception {
indexRequestBuilder.setSource(source);
assertEquals(EXPECTED_SOURCE, XContentHelper.convertToJson(indexRequestBuilder.request().source(), true));

indexRequestBuilder = new IndexRequestBuilder(this.testClient);
indexRequestBuilder.setSource(source, XContentType.JSON);
assertEquals(EXPECTED_SOURCE, XContentHelper.convertToJson(indexRequestBuilder.request().source(), true));

indexRequestBuilder = new IndexRequestBuilder(this.testClient);
indexRequestBuilder.setSource("SomeKey", "SomeValue");
assertEquals(EXPECTED_SOURCE, XContentHelper.convertToJson(indexRequestBuilder.request().source(), true));

// force the Object... setter
indexRequestBuilder = new IndexRequestBuilder(this.testClient);
indexRequestBuilder.setSource((Object) "SomeKey", "SomeValue");
assertEquals(EXPECTED_SOURCE, XContentHelper.convertToJson(indexRequestBuilder.request().source(), true));

indexRequestBuilder = new IndexRequestBuilder(this.testClient);
ByteArrayOutputStream docOut = new ByteArrayOutputStream();
XContentBuilder doc = XContentFactory.jsonBuilder(docOut).startObject().field("SomeKey", "SomeValue").endObject();
doc.close();
Expand All @@ -72,6 +76,7 @@ public void testSetSource() throws Exception {
XContentHelper.convertToJson(indexRequestBuilder.request().source(), true, indexRequestBuilder.request().getContentType())
);

indexRequestBuilder = new IndexRequestBuilder(this.testClient);
doc = XContentFactory.jsonBuilder().startObject().field("SomeKey", "SomeValue").endObject();
doc.close();
indexRequestBuilder.setSource(doc);
Expand Down