diff --git a/core/src/main/java/com/netflix/iceberg/ScanSummary.java b/core/src/main/java/com/netflix/iceberg/ScanSummary.java index 7a94486427d8..b19ab38d7b25 100644 --- a/core/src/main/java/com/netflix/iceberg/ScanSummary.java +++ b/core/src/main/java/com/netflix/iceberg/ScanSummary.java @@ -19,17 +19,30 @@ package com.netflix.iceberg; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.netflix.iceberg.exceptions.RuntimeIOException; +import com.netflix.iceberg.expressions.And; +import com.netflix.iceberg.expressions.Expression; +import com.netflix.iceberg.expressions.Expression.Operation; +import com.netflix.iceberg.expressions.Expressions; +import com.netflix.iceberg.expressions.Literal; +import com.netflix.iceberg.expressions.NamedReference; +import com.netflix.iceberg.expressions.UnboundPredicate; import com.netflix.iceberg.io.CloseableIterable; import com.netflix.iceberg.types.Comparators; +import com.netflix.iceberg.types.Types; +import com.netflix.iceberg.util.Pair; import java.io.IOException; import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.function.Function; @@ -51,15 +64,15 @@ public static ScanSummary.Builder of(TableScan scan) { } public static class Builder { + private static final Set TIMESTAMP_NAMES = Sets.newHashSet( + "dateCreated", "lastUpdated"); private final TableScan scan; private final Table table; private final TableOperations ops; private final Map snapshotTimestamps; private int limit = Integer.MAX_VALUE; private boolean throwIfLimited = false; - private boolean filterByTimestamp = false; - private long minTimestamp = 0L; - private long maxTimestamp = Long.MAX_VALUE; + private List> timeFilters = Lists.newArrayList(); public Builder(TableScan scan) { this.scan = scan; @@ -72,17 +85,18 @@ public Builder(TableScan scan) { this.snapshotTimestamps = builder.build(); } - public Builder after(long timestampMillis) { + private void addTimestampFilter(UnboundPredicate filter) { throwIfLimited(); // ensure all partitions can be returned - this.filterByTimestamp = true; - this.minTimestamp = timestampMillis; + timeFilters.add(filter); + } + + public Builder after(long timestampMillis) { + addTimestampFilter(Expressions.greaterThanOrEqual("timestamp_ms", timestampMillis)); return this; } public Builder before(long timestampMillis) { - throwIfLimited(); // ensure all partitions can be returned - this.filterByTimestamp = true; - this.maxTimestamp = timestampMillis; + addTimestampFilter(Expressions.lessThanOrEqual("timestamp_ms", timestampMillis)); return this; } @@ -96,6 +110,28 @@ public Builder limit(int numPartitions) { return this; } + private void removeTimeFilters(List expressions, Expression expression) { + if (expression.op() == Operation.AND) { + And and = (And) expression; + removeTimeFilters(expressions, and.left()); + removeTimeFilters(expressions, and.right()); + return; + + } else if (expression instanceof UnboundPredicate) { + UnboundPredicate pred = (UnboundPredicate) expression; + NamedReference ref = (NamedReference) pred.ref(); + Literal lit = pred.literal(); + if (TIMESTAMP_NAMES.contains(ref.name())) { + Literal tsLiteral = lit.to(Types.TimestampType.withoutZone()); + long millis = toMillis(tsLiteral.value()); + addTimestampFilter(Expressions.predicate(pred.op(), "timestamp_ms", millis)); + return; + } + } + + expressions.add(expression); + } + /** * Summarizes a table scan as a map of partition key to metrics for that partition. * @@ -105,9 +141,22 @@ public Map build() { TopN topN = new TopN<>( limit, throwIfLimited, Comparators.charSequences()); + List filters = Lists.newArrayList(); + removeTimeFilters(filters, Expressions.rewriteNot(scan.filter())); + Expression rowFilter = joinFilters(filters); + + long minTimestamp = Long.MIN_VALUE; + long maxTimestamp = Long.MAX_VALUE; + boolean filterByTimestamp = !timeFilters.isEmpty(); + if (filterByTimestamp) { + Pair range = timestampRange(timeFilters); + minTimestamp = range.first(); + maxTimestamp = range.second(); + } + try (CloseableIterable entries = new ManifestGroup(ops, table.currentSnapshot().manifests()) - .filterData(scan.filter()) + .filterData(rowFilter) .ignoreDeleted() .select(SCAN_SUMMARY_COLUMNS) .entries()) { @@ -217,4 +266,74 @@ public Map get() { return ImmutableMap.copyOf(map); } } + + static Expression joinFilters(List expressions) { + Expression result = Expressions.alwaysTrue(); + for (Expression expression : expressions) { + result = Expressions.and(result, expression); + } + return result; + } + + static long toMillis(long timestamp) { + if (timestamp < 10000000000L) { + // in seconds + return timestamp * 1000; + } else if (timestamp < 10000000000000L) { + // in millis + return timestamp; + } + // in micros + return timestamp / 1000; + } + + static Pair timestampRange(List> timeFilters) { + // evaluation is inclusive + long minTimestamp = Long.MIN_VALUE; + long maxTimestamp = Long.MAX_VALUE; + + for (UnboundPredicate pred : timeFilters) { + long value = pred.literal().value(); + switch (pred.op()) { + case LT: + if (value - 1 < maxTimestamp) { + maxTimestamp = value - 1; + } + break; + case LT_EQ: + if (value < maxTimestamp) { + maxTimestamp = value; + } + break; + case GT: + if (value + 1 > minTimestamp) { + minTimestamp = value + 1; + } + break; + case GT_EQ: + if (value > minTimestamp) { + minTimestamp = value; + } + break; + case EQ: + if (value < maxTimestamp) { + maxTimestamp = value; + } + if (value > minTimestamp) { + minTimestamp = value; + } + break; + default: + throw new UnsupportedOperationException( + "Cannot filter timestamps using predicate: " + pred); + } + } + + if (maxTimestamp < minTimestamp) { + throw new IllegalArgumentException( + "No timestamps can match filters: " + Joiner.on(", ").join(timeFilters)); + } + + return Pair.of(minTimestamp, maxTimestamp); + } } diff --git a/core/src/test/java/com/netflix/iceberg/TestScanSummary.java b/core/src/test/java/com/netflix/iceberg/TestScanSummary.java new file mode 100644 index 000000000000..b1b297345cff --- /dev/null +++ b/core/src/test/java/com/netflix/iceberg/TestScanSummary.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.netflix.iceberg; + +import com.google.common.collect.ImmutableList; +import com.netflix.iceberg.util.Pair; +import org.junit.Assert; +import org.junit.Test; + +import static com.netflix.iceberg.ScanSummary.timestampRange; +import static com.netflix.iceberg.ScanSummary.toMillis; +import static com.netflix.iceberg.expressions.Expressions.equal; +import static com.netflix.iceberg.expressions.Expressions.greaterThan; +import static com.netflix.iceberg.expressions.Expressions.greaterThanOrEqual; +import static com.netflix.iceberg.expressions.Expressions.lessThan; +import static com.netflix.iceberg.expressions.Expressions.lessThanOrEqual; + +public class TestScanSummary { + @Test + public void testTimestampRanges() { + long lower = 1542750188523L; + long upper = 1542750695131L; + + Assert.assertEquals("Should use inclusive bound", + Pair.of(Long.MIN_VALUE, upper), + timestampRange(ImmutableList.of(lessThanOrEqual("ts_ms", upper)))); + + Assert.assertEquals("Should use lower value for upper bound", + Pair.of(Long.MIN_VALUE, upper), + timestampRange(ImmutableList.of( + lessThanOrEqual("ts_ms", upper + 918234), + lessThanOrEqual("ts_ms", upper)))); + + Assert.assertEquals("Should make upper bound inclusive", + Pair.of(Long.MIN_VALUE, upper - 1), + timestampRange(ImmutableList.of(lessThan("ts_ms", upper)))); + + Assert.assertEquals("Should use inclusive bound", + Pair.of(lower, Long.MAX_VALUE), + timestampRange(ImmutableList.of(greaterThanOrEqual("ts_ms", lower)))); + + Assert.assertEquals("Should use upper value for lower bound", + Pair.of(lower, Long.MAX_VALUE), + timestampRange(ImmutableList.of( + greaterThanOrEqual("ts_ms", lower - 918234), + greaterThanOrEqual("ts_ms", lower)))); + + Assert.assertEquals("Should make lower bound inclusive", + Pair.of(lower + 1, Long.MAX_VALUE), + timestampRange(ImmutableList.of(greaterThan("ts_ms", lower)))); + + Assert.assertEquals("Should set both bounds for equals", + Pair.of(lower, lower), + timestampRange(ImmutableList.of(equal("ts_ms", lower)))); + + Assert.assertEquals("Should set both bounds", + Pair.of(lower, upper - 1), + timestampRange(ImmutableList.of( + greaterThanOrEqual("ts_ms", lower), + lessThan("ts_ms", upper)))); + + // >= lower and < lower is an empty range + AssertHelpers.assertThrows("Should reject empty ranges", + IllegalArgumentException.class, "No timestamps can match filters", + () -> timestampRange(ImmutableList.of( + greaterThanOrEqual("ts_ms", lower), + lessThan("ts_ms", lower)))); + } + + @Test + public void testToMillis() { + long millis = 1542750947417L; + Assert.assertEquals(1542750947000L, toMillis(millis / 1000)); + Assert.assertEquals(1542750947417L, toMillis(millis)); + Assert.assertEquals(1542750947417L, toMillis(millis * 1000 + 918)); + } +}