Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,12 @@ public void testLocalResumeReindexFromScroll_slicedN() {
final int totalDocs = randomIntBetween(200, 300);
final int numSlices = randomIntBetween(2, 5);
final int batchSize = randomIntBetween(5, 10);
// the first manual search batch creates the scroll, and is not indexed into destination
final long expectedDocsDest = totalDocs - numSlices * batchSize;

createIndex(sourceIndex);
indexRandom(true, sourceIndex, totalDocs);

Map<Integer, SliceStatus> sliceStatus = new HashMap<>();
Map<Integer, Long> sliceFirstBatchDocs = new HashMap<>();
final long startTime = System.nanoTime() - randomTimeValue(2, 10, TimeUnit.HOURS).nanos();

// Manually create scroll slices and pass their scroll IDs in resume info
Expand All @@ -197,13 +196,18 @@ public void testLocalResumeReindexFromScroll_slicedN() {
try {
String scrollId = searchResponse.getScrollId();
assertNotNull(scrollId);
assertEquals(batchSize, searchResponse.getHits().getHits().length);
// the actual search hits may be less than batch size if the slice has few docs, since doc are randomly sliced
long firstBatchDocs = searchResponse.getHits().getHits().length;
assertTrue(firstBatchDocs <= batchSize);
sliceFirstBatchDocs.put(sliceId, firstBatchDocs);
BulkByScrollTask.Status sliceStats = randomStats(sliceId, searchResponse.getHits().getTotalHits().value());
sliceStatus.put(sliceId, new SliceStatus(sliceId, new ScrollWorkerResumeInfo(scrollId, startTime, sliceStats, null), null));
} finally {
searchResponse.decRef();
}
}
// the first manual search batch creates the scroll, and is not indexed into destination
final long expectedDocsDest = totalDocs - sliceFirstBatchDocs.values().stream().mapToLong(Long::longValue).sum();

// Resume reindexing from the manual scroll search slices
ReindexRequest request = new ReindexRequest().setSourceIndices(sourceIndex)
Expand All @@ -222,7 +226,7 @@ public void testLocalResumeReindexFromScroll_slicedN() {
.get();

assertHitCount(expectedDocsDest, prepareSearch(destIndex));
assertSlicedResponse(getTaskResponse.getTask(), sliceStatus, totalDocs, batchSize);
assertSlicedResponse(getTaskResponse.getTask(), sliceStatus, sliceFirstBatchDocs, totalDocs, batchSize);
}

public void testLocalResumeReindexFromScroll_slicedN_partialCompleted() {
Expand All @@ -232,14 +236,12 @@ public void testLocalResumeReindexFromScroll_slicedN_partialCompleted() {
final int numSlices = randomIntBetween(2, 5);
final int batchSize = randomIntBetween(5, 10);
final int numCompletedSlices = randomIntBetween(1, numSlices - 1);
final int numPendingSlices = numSlices - numCompletedSlices;
// num docs in dest = manually completed slices + resumed slices (exclude first batch)
final long expectedDocsDest = totalDocs - numPendingSlices * batchSize;

createIndex(sourceIndex);
indexRandom(true, sourceIndex, totalDocs);

Map<Integer, SliceStatus> sliceStatus = new HashMap<>();
Map<Integer, Long> sliceFirstBatchDocs = new HashMap<>();
// Complete some slices with manual slicing and pass their results as completed slices in resume info
for (int sliceId = 0; sliceId < numCompletedSlices; sliceId++) {
ReindexRequest sliceRequest = new ReindexRequest().setSourceIndices(sourceIndex)
Expand All @@ -263,13 +265,18 @@ public void testLocalResumeReindexFromScroll_slicedN_partialCompleted() {
try {
String scrollId = searchResponse.getScrollId();
assertNotNull(scrollId);
assertEquals(batchSize, searchResponse.getHits().getHits().length);
// the actual search hits may be less than batch size if the slice has few docs, since doc are randomly sliced
long firstBatchDocs = searchResponse.getHits().getHits().length;
assertTrue(firstBatchDocs <= batchSize);
sliceFirstBatchDocs.put(sliceId, firstBatchDocs);
BulkByScrollTask.Status sliceStats = randomStats(sliceId, searchResponse.getHits().getTotalHits().value());
sliceStatus.put(sliceId, new SliceStatus(sliceId, new ScrollWorkerResumeInfo(scrollId, startTime, sliceStats, null), null));
} finally {
searchResponse.decRef();
}
}
// the first manual search batch creates the scroll, and is not indexed into destination
final long expectedDocsDest = totalDocs - sliceFirstBatchDocs.values().stream().mapToLong(Long::longValue).sum();

ReindexRequest request = new ReindexRequest().setSourceIndices(sourceIndex)
.setShouldStoreResult(true)
Expand All @@ -289,7 +296,7 @@ public void testLocalResumeReindexFromScroll_slicedN_partialCompleted() {

assertHitCount(expectedDocsDest, prepareSearch(destIndex));
assertEquals(0, currentNumberOfScrollContexts());
assertSlicedResponse(getTaskResponse.getTask(), sliceStatus, totalDocs, batchSize);
assertSlicedResponse(getTaskResponse.getTask(), sliceStatus, sliceFirstBatchDocs, totalDocs, batchSize);
}

public void testLocalResumeReindexFromScroll_slicedAuto() {
Expand All @@ -301,13 +308,12 @@ public void testLocalResumeReindexFromScroll_slicedAuto() {
int numSourceShards = randomIntBetween(2, 10);
// slice count differs from shard count to ensure slicing is from resume info
int numSlices = numSourceShards + 1;
// the first manual search batch creates the scroll, and is not indexed into destination
final long expectedDocsDest = totalDocs - numSlices * batchSize;

createIndex(sourceIndex, numSourceShards, 0);
indexRandom(true, sourceIndex, totalDocs);

Map<Integer, SliceStatus> sliceStatus = new HashMap<>();
Map<Integer, Long> sliceFirstBatchDocs = new HashMap<>();
final long startTime = System.nanoTime() - randomTimeValue(2, 10, TimeUnit.HOURS).nanos();

for (int sliceId = 0; sliceId < numSlices; sliceId++) {
Expand All @@ -318,13 +324,18 @@ public void testLocalResumeReindexFromScroll_slicedAuto() {
try {
String scrollId = searchResponse.getScrollId();
assertNotNull(scrollId);
assertEquals(batchSize, searchResponse.getHits().getHits().length);
// the actual search hits may be less than batch size if the slice has few docs, since doc are randomly sliced
long firstBatchDocs = searchResponse.getHits().getHits().length;
assertTrue(firstBatchDocs <= batchSize);
sliceFirstBatchDocs.put(sliceId, firstBatchDocs);
BulkByScrollTask.Status sliceStats = randomStats(sliceId, searchResponse.getHits().getTotalHits().value());
sliceStatus.put(sliceId, new SliceStatus(sliceId, new ScrollWorkerResumeInfo(scrollId, startTime, sliceStats, null), null));
} finally {
searchResponse.decRef();
}
}
// the first manual search batch per slice creates the scroll and is not indexed; resume indexes the rest
final long expectedDocsDest = totalDocs - sliceFirstBatchDocs.values().stream().mapToLong(Long::longValue).sum();

ReindexRequest request = new ReindexRequest().setSourceIndices(sourceIndex)
.setShouldStoreResult(true)
Expand All @@ -344,7 +355,7 @@ public void testLocalResumeReindexFromScroll_slicedAuto() {

assertHitCount(expectedDocsDest, prepareSearch(destIndex));
assertEquals(0, currentNumberOfScrollContexts());
assertSlicedResponse(getTaskResponse.getTask(), sliceStatus, totalDocs, batchSize);
assertSlicedResponse(getTaskResponse.getTask(), sliceStatus, sliceFirstBatchDocs, totalDocs, batchSize);

// response must have same number of slices as resume info, not auto-resolved from shard count
Map<String, Object> response = getTaskResponse.getTask().getResponseAsMap();
Expand All @@ -365,8 +376,7 @@ public void testLocalResumeReindexFromScroll_slicedManual() {
indexRandom(true, sourceIndex, totalDocs);

final long startTime = System.nanoTime() - randomTimeValue(2, 10, TimeUnit.HOURS).nanos();
// the first manual search batch per slice creates the scroll and is not indexed; resume indexes the rest
final long expectedDocsDest = totalDocs - numSlices * batchSize;
long firstBatchDocsTotal = 0;

for (int sliceId = 0; sliceId < numSlices; sliceId++) {
SearchRequest searchRequest = new SearchRequest(sourceIndex).source(
Expand All @@ -379,14 +389,17 @@ public void testLocalResumeReindexFromScroll_slicedManual() {
try {
scrollId = searchResponse.getScrollId();
assertNotNull(scrollId);
assertEquals(batchSize, searchResponse.getHits().getHits().length);
// the actual search hits may be less than batch size if the slice has few docs, since doc are randomly sliced
int firstBatchDocs = searchResponse.getHits().getHits().length;
assertTrue(firstBatchDocs <= batchSize);
firstBatchDocsTotal += firstBatchDocs;
sliceStats = randomStats(sliceId, searchResponse.getHits().getTotalHits().value());
totalHits = searchResponse.getHits().getTotalHits().value();
} finally {
searchResponse.decRef();
}

final long remainingDocs = totalHits - batchSize;
final long remainingDocs = totalHits - Math.min(totalHits, batchSize);

ReindexRequest request = new ReindexRequest().setSourceIndices(sourceIndex)
.setShouldStoreResult(true)
Expand All @@ -407,9 +420,11 @@ public void testLocalResumeReindexFromScroll_slicedManual() {
.setTimeout(TimeValue.timeValueSeconds(30))
.get();
assertTrue(getTaskResponse.getTask().isCompleted());
assertStatus(getTaskResponse.getTask(), sliceStats, totalHits, batchSize, (int) remainingDocs);
assertStatus(getTaskResponse.getTask(), sliceStats, totalHits, batchSize, remainingDocs);
}

// the first manual search batch per slice creates the scroll and is not indexed; resume indexes the rest
final long expectedDocsDest = totalDocs - firstBatchDocsTotal;
assertHitCount(expectedDocsDest, prepareSearch(destIndex));
assertEquals(0, currentNumberOfScrollContexts());
}
Expand Down Expand Up @@ -548,7 +563,13 @@ private long currentNumberOfScrollContexts() {
return total;
}

private static void assertSlicedResponse(TaskResult taskResult, Map<Integer, SliceStatus> resumeStatus, long totalDocs, int batchSize) {
private static void assertSlicedResponse(
TaskResult taskResult,
Map<Integer, SliceStatus> resumeStatus,
Map<Integer, Long> firstBatchDocsBySlice,
long totalDocs,
int batchSize
) {
assertTrue(taskResult.isCompleted());
Map<String, Object> response = taskResult.getResponseAsMap();
assertNotNull(response);
Expand All @@ -567,7 +588,9 @@ private static void assertSlicedResponse(TaskResult taskResult, Map<Integer, Sli
if (sliceStatus.resumeInfo() != null) {
status = sliceStatus.resumeInfo().status();
// ensure each resumed slice's status is updated compared to the resume info
assertSliceStatus(slice, status, batchSize);
Long firstBatchDocs = firstBatchDocsBySlice.get(sliceId);
assertNotNull(firstBatchDocs);
assertSliceStatus(slice, status, batchSize, firstBatchDocs);
} else {
assertNotNull(sliceStatus.result());
status = sliceStatus.result().getResponse().orElseThrow().getStatus();
Expand Down Expand Up @@ -609,7 +632,7 @@ private static void assertStatus(
BulkByScrollTask.Status resumeStatus,
long totalDocs,
int batchSize,
int remainingDocs
long remainingDocs
) {
assertTrue(task.isCompleted());
Map<String, Object> response = task.getResponseAsMap();
Expand All @@ -619,7 +642,7 @@ private static void assertStatus(
assertEquals(totalDocs, longFromMap(response, "total"));
// stats are updated
assertEquals(remainingDocs + resumeStatus.getCreated(), longFromMap(response, "created"));
int remainingBatches = remainingDocs / batchSize + (remainingDocs % batchSize == 0 ? 0 : 1);
long remainingBatches = remainingDocs / batchSize + (remainingDocs % batchSize == 0 ? 0 : 1);
assertEquals(remainingBatches + resumeStatus.getBatches(), intFromMap(response, "batches"));
assertTrue(longFromMap(response, "took") > TimeValue.ONE_HOUR.millis());
// other stats should be retained
Expand All @@ -635,9 +658,14 @@ private static void assertStatus(
assertEquals(resumeStatus.getRequestsPerSecond(), floatFromMap(response, "requests_per_second"), 0);
}

private static void assertSliceStatus(Map<String, Object> response, BulkByScrollTask.Status resumeStatus, int batchSize) {
private static void assertSliceStatus(
Map<String, Object> response,
BulkByScrollTask.Status resumeStatus,
int batchSize,
long firstBatchDocs
) {
assertEquals((int) resumeStatus.getSliceId(), intFromMap(response, "slice_id"));
long remainingDocs = resumeStatus.getTotal() - batchSize;
long remainingDocs = resumeStatus.getTotal() - Math.min(resumeStatus.getTotal(), firstBatchDocs);
assertEquals(resumeStatus.getCreated() + remainingDocs, longFromMap(response, "created"));
long remainingBatches = remainingDocs / batchSize + (remainingDocs % batchSize == 0 ? 0 : 1);
assertEquals(resumeStatus.getBatches() + remainingBatches, intFromMap(response, "batches"));
Expand Down
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,6 @@ tests:
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeMetricsIT
method: test
issue: https://github.com/elastic/elasticsearch/issues/142739
- class: org.elasticsearch.index.reindex.ReindexResumeIT
method: testLocalResumeReindexFromScroll_slicedAuto
issue: https://github.com/elastic/elasticsearch/issues/142749
- class: org.elasticsearch.xpack.esql.analysis.AnalyzerTests
method: testResolveExternalRelationPassesFileSet
issue: https://github.com/elastic/elasticsearch/issues/142795
Expand Down