diff --git a/api/src/main/java/org/apache/iceberg/expressions/AggregateEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/AggregateEvaluator.java new file mode 100644 index 000000000000..6aef2cce1cbf --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/expressions/AggregateEvaluator.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.expressions; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; + +public class AggregateEvaluator { + public static AggregateEvaluator create(Schema schema, Expression... aggregates) { + return create(schema, aggregates); + } + + public static AggregateEvaluator create(Schema schema, List aggregates) { + return create(schema.asStruct(), aggregates); + } + + private static AggregateEvaluator create(Types.StructType struct, List aggregates) { + List> boundAggregates = + aggregates.stream() + .map(expr -> Binder.bind(struct, expr)) + .map(bound -> (BoundAggregate) bound) + .collect(Collectors.toList()); + + return new AggregateEvaluator(boundAggregates); + } + + private final List> aggregators; + private final Types.StructType resultType; + + private AggregateEvaluator(List> aggregates) { + ImmutableList.Builder> aggregatorsBuilder = + ImmutableList.builder(); + List resultFields = Lists.newArrayList(); + for (int pos = 0; pos < aggregates.size(); pos += 1) { + BoundAggregate aggregate = aggregates.get(pos); + aggregatorsBuilder.add(aggregates.get(pos).newAggregator()); + resultFields.add(Types.NestedField.optional(pos, aggregate.describe(), aggregate.type())); + } + + this.aggregators = aggregatorsBuilder.build(); + this.resultType = Types.StructType.of(resultFields); + } + + public void update(StructLike struct) { + for (BoundAggregate.Aggregator aggregator : aggregators) { + aggregator.update(struct); + } + } + + public void update(DataFile file) { + for (BoundAggregate.Aggregator aggregator : aggregators) { + aggregator.update(file); + } + } + + public Types.StructType resultType() { + return resultType; + } + + public StructLike result() { + Object[] results = + aggregators.stream().map(BoundAggregate.Aggregator::result).toArray(Object[]::new); + return new ArrayStructLike(results); + } + + private static class ArrayStructLike implements StructLike { + private final Object[] values; + + private ArrayStructLike(Object[] values) { + this.values = values; + } + + public int size() { + return values.length; + } + + @Override + public T get(int pos, Class javaClass) { + return javaClass.cast(values[pos]); + } + + @Override + public void set(int pos, T value) { + values[pos] = value; + } + } +} diff --git a/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java b/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java index 650271b3b78a..ada9e6ed7e63 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java +++ b/api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.expressions; +import java.util.Map; +import org.apache.iceberg.DataFile; import org.apache.iceberg.StructLike; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -29,7 +31,18 @@ protected BoundAggregate(Operation op, BoundTerm term) { @Override public C eval(StructLike struct) { - throw new UnsupportedOperationException(this.getClass().getName() + " does not implement eval"); + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement eval(StructLike)"); + } + + C eval(DataFile file) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement eval(DataFile)"); + } + + Aggregator newAggregator() { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement newAggregator()"); } @Override @@ -44,4 +57,85 @@ public Type type() { return term().type(); } } + + public String describe() { + switch (op()) { + case COUNT_STAR: + return "count(*)"; + case COUNT: + return "count(" + ExpressionUtil.describe(term()) + ")"; + case MAX: + return "max(" + ExpressionUtil.describe(term()) + ")"; + case MIN: + return "min(" + ExpressionUtil.describe(term()) + ")"; + default: + throw new UnsupportedOperationException("Unsupported aggregate type: " + op()); + } + } + + V safeGet(Map map, int key) { + return safeGet(map, key, null); + } + + V safeGet(Map map, int key, V defaultValue) { + if (map != null) { + return map.getOrDefault(key, defaultValue); + } + + return null; + } + + interface Aggregator { + void update(StructLike struct); + + void update(DataFile file); + + R result(); + } + + abstract static class NullSafeAggregator implements Aggregator { + private final BoundAggregate aggregate; + private boolean isNull = false; + + NullSafeAggregator(BoundAggregate aggregate) { + this.aggregate = aggregate; + } + + protected abstract void update(R value); + + protected abstract R current(); + + @Override + public void update(StructLike struct) { + if (!isNull) { + R value = aggregate.eval(struct); + if (value == null) { + this.isNull = true; + } else { + update(value); + } + } + } + + @Override + public void update(DataFile file) { + if (!isNull) { + R value = aggregate.eval(file); + if (value == null) { + this.isNull = true; + } else { + update(value); + } + } + } + + @Override + public R result() { + if (isNull) { + return null; + } + + return result(); + } + } } diff --git a/api/src/main/java/org/apache/iceberg/expressions/CountAggregate.java b/api/src/main/java/org/apache/iceberg/expressions/CountAggregate.java new file mode 100644 index 000000000000..7af4eb5dc9d6 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/expressions/CountAggregate.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.expressions; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.StructLike; + +public class CountAggregate extends BoundAggregate { + protected CountAggregate(Operation op, BoundTerm term) { + super(op, term); + } + + @Override + public Long eval(StructLike struct) { + return countFor(struct); + } + + @Override + public Long eval(DataFile file) { + return countFor(file); + } + + protected Long countFor(StructLike row) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement countFor(StructLike)"); + } + + protected Long countFor(DataFile file) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement countFor(DataFile)"); + } + + @Override + public Aggregator newAggregator() { + return new CountAggregator<>(this); + } + + private static class CountAggregator extends NullSafeAggregator { + private Long count = 0L; + + CountAggregator(BoundAggregate aggregate) { + super(aggregate); + } + + @Override + protected void update(Long value) { + count += value; + } + + @Override + protected Long current() { + return count; + } + } +} diff --git a/api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java b/api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java new file mode 100644 index 000000000000..55535bbd26aa --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.expressions; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Types; + +public class CountNonNull extends CountAggregate { + private final int fieldId; + + protected CountNonNull(BoundTerm term) { + super(Operation.COUNT, term); + Types.NestedField field = term.ref().field(); + this.fieldId = field.fieldId(); + } + + @Override + protected Long countFor(StructLike row) { + return term().eval(row) != null ? 1L : 0L; + } + + @Override + protected Long countFor(DataFile file) { + // NaN value counts were not required in v1 and were included in value counts + return safeAdd(safeGet(file.valueCounts(), fieldId), safeGet(file.nanValueCounts(), fieldId, 0L)); + } + + private Long safeAdd(Long left, Long right) { + if (left != null && right != null) { + return left + right; + } + + return null; + } +} diff --git a/api/src/main/java/org/apache/iceberg/expressions/CountStar.java b/api/src/main/java/org/apache/iceberg/expressions/CountStar.java new file mode 100644 index 000000000000..0652dd19b971 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/expressions/CountStar.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.expressions; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.StructLike; + +public class CountStar extends CountAggregate { + protected CountStar(BoundTerm term) { + super(Operation.COUNT_STAR, term); + } + + @Override + protected Long countFor(StructLike row) { + return 1L; + } + + @Override + protected Long countFor(DataFile file) { + long count = file.recordCount(); + if (count < 0) { + return null; + } + + return count; + } +} diff --git a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java index 9d94be163586..37e1c162110b 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java +++ b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java @@ -120,6 +120,26 @@ public static boolean selectsPartitions( caseSensitive); } + public static String describe(Term term) { + if (term instanceof UnboundTransform) { + return ((UnboundTransform) term).transform() + + "(" + + describe(((UnboundTransform) term).ref()) + + ")"; + } else if (term instanceof BoundTransform) { + return ((BoundTransform) term).transform() + + "(" + + describe(((BoundTransform) term).ref()) + + ")"; + } else if (term instanceof NamedReference) { + return ((NamedReference) term).name(); + } else if (term instanceof BoundReference) { + return ((BoundReference) term).name(); + } else { + throw new UnsupportedOperationException("Unsupported term: " + term); + } + } + private static class ExpressionSanitizer extends ExpressionVisitors.ExpressionVisitor { private final long now; @@ -235,19 +255,9 @@ public String predicate(BoundPredicate pred) { throw new UnsupportedOperationException("Cannot sanitize bound predicate: " + pred); } - public String termToString(UnboundTerm term) { - if (term instanceof UnboundTransform) { - return ((UnboundTransform) term).transform() + "(" + termToString(term.ref()) + ")"; - } else if (term instanceof NamedReference) { - return ((NamedReference) term).name(); - } else { - throw new UnsupportedOperationException("Unsupported term: " + term); - } - } - @Override public String predicate(UnboundPredicate pred) { - String term = termToString(pred.term()); + String term = describe(pred.term()); switch (pred.op()) { case IS_NULL: return term + " IS NULL"; diff --git a/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java b/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java new file mode 100644 index 000000000000..afc56325414c --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.expressions; + +import java.util.Comparator; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types; + +public class MaxAggregate extends ValueAggregate { + private final int fieldId; + private final PrimitiveType type; + private final Comparator comparator; + + protected MaxAggregate(BoundTerm term) { + super(Operation.MAX, term); + Types.NestedField field = term.ref().field(); + this.fieldId = field.fieldId(); + this.type = field.type().asPrimitiveType(); + this.comparator = Comparators.forType(type); + } + + @Override + protected Object evaluateRef(DataFile file) { + return Conversions.fromByteBuffer(type, safeGet(file.upperBounds(), fieldId)); + } + + @Override + public Aggregator newAggregator() { + return new MaxAggregator<>(this, comparator); + } + + private static class MaxAggregator extends NullSafeAggregator { + private final Comparator comparator; + private T max = null; + + MaxAggregator(MaxAggregate aggregate, Comparator comparator) { + super(aggregate); + this.comparator = comparator; + } + + @Override + protected void update(T value) { + if (comparator.compare(value, max) > 0) { + this.max = value; + } + } + + @Override + protected T current() { + return max; + } + } +} diff --git a/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java b/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java new file mode 100644 index 000000000000..598fd5ba981b --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.expressions; + +import java.util.Comparator; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types; + +public class MinAggregate extends ValueAggregate { + private final int fieldId; + private final PrimitiveType type; + private final Comparator comparator; + + protected MinAggregate(BoundTerm term) { + super(Operation.MIN, term); + Types.NestedField field = term.ref().field(); + this.fieldId = field.fieldId(); + this.type = field.type().asPrimitiveType(); + this.comparator = Comparators.forType(type); + } + + @Override + protected Object evaluateRef(DataFile file) { + return Conversions.fromByteBuffer(type, safeGet(file.lowerBounds(), fieldId)); + } + + @Override + public Aggregator newAggregator() { + return new MinAggregator<>(this, comparator); + } + + private static class MinAggregator extends NullSafeAggregator { + private final Comparator comparator; + private T min = null; + + MinAggregator(MinAggregate aggregate, Comparator comparator) { + super(aggregate); + this.comparator = comparator; + } + + @Override + protected void update(T value) { + if (comparator.compare(value, min) < 0) { + this.min = value; + } + } + + @Override + protected T current() { + return min; + } + } +} diff --git a/api/src/main/java/org/apache/iceberg/expressions/UnboundAggregate.java b/api/src/main/java/org/apache/iceberg/expressions/UnboundAggregate.java index 5e4cce06c7e8..65e469a631b1 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/UnboundAggregate.java +++ b/api/src/main/java/org/apache/iceberg/expressions/UnboundAggregate.java @@ -46,12 +46,22 @@ public NamedReference ref() { */ @Override public Expression bind(Types.StructType struct, boolean caseSensitive) { - if (op() == Operation.COUNT_STAR) { - return new BoundAggregate<>(op(), null); - } else { - Preconditions.checkArgument(term() != null, "Invalid aggregate term: null"); - BoundTerm bound = term().bind(struct, caseSensitive); - return new BoundAggregate<>(op(), bound); + switch (op()) { + case COUNT_STAR: + return new CountStar<>(null); + case COUNT: + return new CountNonNull<>(boundTerm(struct, caseSensitive)); + case MAX: + return new MaxAggregate<>(boundTerm(struct, caseSensitive)); + case MIN: + return new MinAggregate<>(boundTerm(struct, caseSensitive)); + default: + throw new UnsupportedOperationException("Unsupported aggregate type: " + op()); } } + + private BoundTerm boundTerm(Types.StructType struct, boolean caseSensitive) { + Preconditions.checkArgument(term() != null, "Invalid aggregate term: null"); + return term().bind(struct, caseSensitive); + } } diff --git a/api/src/main/java/org/apache/iceberg/expressions/ValueAggregate.java b/api/src/main/java/org/apache/iceberg/expressions/ValueAggregate.java new file mode 100644 index 000000000000..be45e8d0efc9 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/expressions/ValueAggregate.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.expressions; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.StructLike; + +class ValueAggregate extends BoundAggregate { + private final SingleValueStruct valueStruct = new SingleValueStruct(); + + protected ValueAggregate(Operation op, BoundTerm term) { + super(op, term); + } + + @Override + public T eval(StructLike struct) { + return term().eval(struct); + } + + public T eval(DataFile file) { + valueStruct.setValue(evaluateRef(file)); + return term().eval(valueStruct); + } + + protected Object evaluateRef(DataFile file) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement eval(DataFile)"); + } + + /** Used to pass a referenced value through term evaluation. */ + private static class SingleValueStruct implements StructLike { + private Object value; + + private void setValue(Object value) { + this.value = value; + } + + @Override + public int size() { + return 1; + } + + @Override + @SuppressWarnings("unchecked") + public T get(int pos, Class javaClass) { + return (T) value; + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot update a read-only struct"); + } + } +}