diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java index 52b6d404a1469..990b441a1df57 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java @@ -168,20 +168,24 @@ private InputStream processAggs(Aggregations aggs) throws IOException { )); aggregationToJsonProcessor.process(aggs); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - final boolean hasAfterKey = afterKey != null && (afterKey.get(context.compositeAggDateHistogramGroupSourceName) instanceof Long); + final Long afterKeyTimeBucket = afterKey != null ? (Long)afterKey.get(context.compositeAggDateHistogramGroupSourceName) : null ; boolean cancellable = aggregationToJsonProcessor.writeAllDocsCancellable( timestamp -> { if (isCancelled) { // If we have not processed a single composite agg page yet and we are cancelled // We should not process anything - if (hasAfterKey == false) { + if (afterKeyTimeBucket == null) { return true; } + // We want to stop processing once a timestamp enters the next time bucket. + // This could occur in any page. One benefit we have is that even though the paging order is not sorted + // by max timestamp, our iteration of the page results is. So, once we cross over to the next bucket within + // a given page, we know the previous bucket has been exhausted. if (nextBucketOnCancel == 0L) { - // If we have been cancelled, record the bucket above our latest timestamp - // This indicates when we have completed the current bucket of this timestamp and thus will move to the next - // date_histogram bucket - nextBucketOnCancel = Intervals.alignToCeil(timestamp, interval); + // This simple equation handles two unique scenarios: + // If the timestamp is the current floor, this means we need to keep processing until the next timebucket + // If we are not matching the current bucket floor, then this simply aligns to the next bucket + nextBucketOnCancel = Intervals.alignToFloor(timestamp + interval, interval); LOGGER.debug(() -> new ParameterizedMessage( "[{}] set future timestamp cancel to [{}] via timestamp [{}]", context.jobId, @@ -200,7 +204,7 @@ private InputStream processAggs(Aggregations aggs) throws IOException { "[{}] cancelled before bucket [{}] on date_histogram page [{}]", context.jobId, nextBucketOnCancel, - hasAfterKey ? afterKey.get(context.compositeAggDateHistogramGroupSourceName) : "__null__" + afterKeyTimeBucket != null ? afterKeyTimeBucket : "__null__" ) ); hasNext = false; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorTests.java index a260466dc199e..910bce059cb77 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorTests.java @@ -267,7 +267,6 @@ public void testExtractionCancelOnFirstPage() throws IOException { assertThat(extractor.hasNext(), is(false)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/71212") public void testExtractionGivenCancelHalfWay() throws IOException { int numBuckets = 10; List buckets = new ArrayList<>(numBuckets);