Skip to content

Commit 58a9792

Browse files
committed
Fix max boundary for rollups job that use a delay (#42158)
Rollup jobs can define how long they should wait before rolling up new documents. However if the delay is smaller or if it's not a multiple of the rollup interval the job can create incomplete buckets because the max boundary for a job is computed from the time when the job started rounded to the interval minus the delay. This change fixes this computation by applying the delay substraction before the rounding in order to ensure that we never create a boundary that falls in a middle of a bucket.
1 parent c9414bc commit 58a9792

File tree

2 files changed

+57
-8
lines changed

2 files changed

+57
-8
lines changed

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,12 @@ protected String getJobId() {
9494

9595
@Override
9696
protected void onStartJob(long now) {
97-
// this is needed to exclude buckets that can still receive new documents.
97+
// this is needed to exclude buckets that can still receive new documents
9898
DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram();
99-
long rounded = dateHisto.createRounding().round(now);
100-
if (dateHisto.getDelay() != null) {
101-
// if the job has a delay we filter all documents that appear before it.
102-
maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis();
103-
} else {
104-
maxBoundary = rounded;
105-
}
99+
// if the job has a delay we filter all documents that appear before it
100+
long delay = dateHisto.getDelay() != null ?
101+
TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis() : 0;
102+
maxBoundary = dateHisto.createRounding().round(now - delay);
106103
}
107104

108105
@Override

x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,58 @@ public void testSimpleDateHistoWithDelay() throws Exception {
324324
});
325325
}
326326

327+
public void testSimpleDateHistoWithOverlappingDelay() throws Exception {
328+
String rollupIndex = randomAlphaOfLengthBetween(5, 10);
329+
String field = "the_histo";
330+
DateHistogramGroupConfig dateHistoConfig =
331+
new DateHistogramGroupConfig(field, new DateHistogramInterval("1h"), new DateHistogramInterval("15m"), null);
332+
RollupJobConfig job = createJob(rollupIndex, new GroupConfig(dateHistoConfig), Collections.emptyList());
333+
final List<Map<String, Object>> dataset = new ArrayList<>();
334+
long now = asLong("2015-04-01T10:30:00.000Z");
335+
dataset.addAll(
336+
Arrays.asList(
337+
asMap("the_histo", now - TimeValue.timeValueMinutes(135).getMillis()),
338+
asMap("the_histo", now - TimeValue.timeValueMinutes(120).getMillis()),
339+
asMap("the_histo", now - TimeValue.timeValueMinutes(105).getMillis()),
340+
asMap("the_histo", now - TimeValue.timeValueMinutes(90).getMillis()),
341+
asMap("the_histo", now - TimeValue.timeValueMinutes(75).getMillis()),
342+
asMap("the_histo", now - TimeValue.timeValueHours(1).getMillis()),
343+
asMap("the_histo", now - TimeValue.timeValueMinutes(45).getMillis()),
344+
asMap("the_histo", now - TimeValue.timeValueMinutes(30).getMillis()),
345+
asMap("the_histo", now - TimeValue.timeValueMinutes(15).getMillis()),
346+
asMap("the_histo", now)
347+
)
348+
);
349+
final Rounding rounding = dateHistoConfig.createRounding();
350+
executeTestCase(dataset, job, now, (resp) -> {
351+
assertThat(resp.size(), equalTo(2));
352+
IndexRequest request = resp.get(0);
353+
assertThat(request.index(), equalTo(rollupIndex));
354+
assertThat(request.sourceAsMap(), equalTo(
355+
asMap(
356+
"_rollup.version", newIDScheme ? 2 : 1,
357+
"the_histo.date_histogram.timestamp", rounding.round(now - TimeValue.timeValueHours(2).getMillis()),
358+
"the_histo.date_histogram.interval", "1h",
359+
"the_histo.date_histogram._count", 3,
360+
"the_histo.date_histogram.time_zone", DateTimeZone.UTC.toString(),
361+
"_rollup.id", job.getId()
362+
)
363+
));
364+
request = resp.get(1);
365+
assertThat(request.index(), equalTo(rollupIndex));
366+
assertThat(request.sourceAsMap(), equalTo(
367+
asMap(
368+
"_rollup.version", newIDScheme ? 2 : 1,
369+
"the_histo.date_histogram.timestamp", rounding.round(now - TimeValue.timeValueHours(1).getMillis()),
370+
"the_histo.date_histogram.interval", "1h",
371+
"the_histo.date_histogram._count", 4,
372+
"the_histo.date_histogram.time_zone", DateTimeZone.UTC.toString(),
373+
"_rollup.id", job.getId()
374+
)
375+
));
376+
});
377+
}
378+
327379
public void testSimpleDateHistoWithTimeZone() throws Exception {
328380
final List<Map<String, Object>> dataset = new ArrayList<>();
329381
long now = asLong("2015-04-01T10:00:00");

0 commit comments

Comments
 (0)