Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,24 @@ private InputStream processAggs(Aggregations aggs) throws IOException {
));
aggregationToJsonProcessor.process(aggs);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final boolean hasAfterKey = afterKey != null && (afterKey.get(context.compositeAggDateHistogramGroupSourceName) instanceof Long);
final Long afterKeyTimeBucket = afterKey != null ? (Long)afterKey.get(context.compositeAggDateHistogramGroupSourceName) : null ;
boolean cancellable = aggregationToJsonProcessor.writeAllDocsCancellable(
timestamp -> {
if (isCancelled) {
// If we have not processed a single composite agg page yet and we are cancelled
// We should not process anything
if (hasAfterKey == false) {
if (afterKeyTimeBucket == null) {
return true;
}
// We want to stop processing once a timestamp enters the next time bucket.
// This could occur in any page. One benefit we have is that even though the paging order is not sorted
// by max timestamp, our iteration of the page results is. So, once we cross over to the next bucket within
// a given page, we know the previous bucket has been exhausted.
if (nextBucketOnCancel == 0L) {
// If we have been cancelled, record the bucket above our latest timestamp
// This indicates when we have completed the current bucket of this timestamp and thus will move to the next
// date_histogram bucket
nextBucketOnCancel = Intervals.alignToCeil(timestamp, interval);
// This simple equation handles two unique scenarios:
// If the timestamp is the current floor, this means we need to keep processing until the next timebucket
// If we are not matching the current bucket floor, then this simply aligns to the next bucket
nextBucketOnCancel = Intervals.alignToFloor(timestamp + interval, interval);
LOGGER.debug(() -> new ParameterizedMessage(
"[{}] set future timestamp cancel to [{}] via timestamp [{}]",
context.jobId,
Expand All @@ -200,7 +204,7 @@ private InputStream processAggs(Aggregations aggs) throws IOException {
"[{}] cancelled before bucket [{}] on date_histogram page [{}]",
context.jobId,
nextBucketOnCancel,
hasAfterKey ? afterKey.get(context.compositeAggDateHistogramGroupSourceName) : "__null__"
afterKeyTimeBucket != null ? afterKeyTimeBucket : "__null__"
)
);
hasNext = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ public void testExtractionCancelOnFirstPage() throws IOException {
assertThat(extractor.hasNext(), is(false));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/71212")
public void testExtractionGivenCancelHalfWay() throws IOException {
int numBuckets = 10;
List<CompositeAggregation.Bucket> buckets = new ArrayList<>(numBuckets);
Expand Down