Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/85850.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 85850
summary: "Aggregation Execution Context add timestamp provider"
area: TSDB
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@
public class AggregationExecutionContext {

private final CheckedSupplier<BytesRef, IOException> tsidProvider;
private final CheckedSupplier<Long, IOException> timestampProvider;
private final LeafReaderContext leafReaderContext;

public AggregationExecutionContext(LeafReaderContext leafReaderContext, CheckedSupplier<BytesRef, IOException> tsidProvider) {
public AggregationExecutionContext(
LeafReaderContext leafReaderContext,
CheckedSupplier<BytesRef, IOException> tsidProvider,
CheckedSupplier<Long, IOException> timestampProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a checked supplier? Under what condition can it ever throw IOException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, except I don't think we need IOException there. I suspect we can convert CheckedSupplier to Supplier in case of tsid as well. I think it is CheckedSupplier for purely historical reasons.

Yeah, I also think Supplier is better, I have converted CheckedSupplier to Supplier for timestamp and tsid.

) {
this.leafReaderContext = leafReaderContext;
this.tsidProvider = tsidProvider;
this.timestampProvider = timestampProvider;
}

public LeafReaderContext getLeafReaderContext() {
Expand All @@ -37,4 +43,8 @@ public LeafReaderContext getLeafReaderContext() {
public BytesRef getTsid() throws IOException {
return tsidProvider != null ? tsidProvider.get() : null;
}

public Long getTimestamp() throws IOException {
return timestampProvider.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ScoreMode scoreMode() {
// TODO: will remove it in a follow up PR
@Override
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
return getLeafCollector(new AggregationExecutionContext(ctx, null));
return getLeafCollector(new AggregationExecutionContext(ctx, null, null));
}

public abstract LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void search(Query query, BucketCollector bucketCollector) throws IOExcept
// this is needed to trigger actions in some bucketCollectors that bypass the normal iteration logic
// for example, global aggregator triggers a separate iterator that ignores the query but still needs
// to know all leaves
bucketCollector.getLeafCollector(new AggregationExecutionContext(leaf, null));
bucketCollector.getLeafCollector(new AggregationExecutionContext(leaf, null, null));
}
}

Expand Down Expand Up @@ -179,7 +179,7 @@ private static class LeafWalker {
long timestamp;

LeafWalker(LeafReaderContext context, Scorer scorer, BucketCollector bucketCollector, LeafReaderContext leaf) throws IOException {
AggregationExecutionContext aggCtx = new AggregationExecutionContext(leaf, scratch::get);
AggregationExecutionContext aggCtx = new AggregationExecutionContext(leaf, scratch::get, () -> timestamp);
this.collector = bucketCollector.getLeafCollector(aggCtx);
liveDocs = context.reader().getLiveDocs();
this.collector.setScorer(scorer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
assertTrue(timestamp.advanceExact(doc));
BytesRef latestTSID = tsid.lookupOrd(tsid.ordValue());
long latestTimestamp = timestamp.longValue();
assertEquals(latestTSID, aggCtx.getTsid());
assertEquals(latestTimestamp, aggCtx.getTimestamp().longValue());

if (currentTSID != null) {
assertTrue(
currentTSID + "->" + latestTSID.utf8ToString(),
Expand Down