Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.function.BiFunction;

import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
Expand All @@ -57,7 +59,6 @@
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.expressions.Expressions.alwaysFalse;
import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
import static org.apache.iceberg.expressions.Expressions.and;
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.apache.iceberg.expressions.Expressions.greaterThan;
import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
Expand All @@ -80,13 +81,13 @@ public static Expression toIcebergExpression(TupleDomain<IcebergColumnHandle> tu
return alwaysFalse();
}
Map<IcebergColumnHandle, Domain> domainMap = tupleDomain.getDomains().get();
Expression expression = alwaysTrue();
List<Expression> conjuncts = new ArrayList<>();
for (Map.Entry<IcebergColumnHandle, Domain> entry : domainMap.entrySet()) {
IcebergColumnHandle columnHandle = entry.getKey();
Domain domain = entry.getValue();
expression = and(expression, toIcebergExpression(columnHandle.getQualifiedName(), columnHandle.getType(), domain));
conjuncts.add(toIcebergExpression(columnHandle.getQualifiedName(), columnHandle.getType(), domain));
}
return expression;
return and(conjuncts);
}

private static Expression toIcebergExpression(String columnName, Type type, Domain domain)
Expand Down Expand Up @@ -138,35 +139,32 @@ private static Expression toIcebergExpression(String columnName, Range range)
return equal(columnName, icebergValue);
}

Expression lowBound;
if (range.isLowUnbounded()) {
lowBound = alwaysTrue();
}
else {
List<Expression> conjuncts = new ArrayList<>(2);
if (!range.isLowUnbounded()) {
Object icebergLow = getIcebergLiteralValue(type, range.getLowBoundedValue());
Expression lowBound;
if (range.isLowInclusive()) {
lowBound = greaterThanOrEqual(columnName, icebergLow);
}
else {
lowBound = greaterThan(columnName, icebergLow);
}
conjuncts.add(lowBound);
}

Expression highBound;
if (range.isHighUnbounded()) {
highBound = alwaysTrue();
}
else {
if (!range.isHighUnbounded()) {
Object icebergHigh = getIcebergLiteralValue(type, range.getHighBoundedValue());
Expression highBound;
if (range.isHighInclusive()) {
highBound = lessThanOrEqual(columnName, icebergHigh);
}
else {
highBound = lessThan(columnName, icebergHigh);
}
conjuncts.add(highBound);
}

return and(lowBound, highBound);
return and(conjuncts);
}

private static Object getIcebergLiteralValue(Type type, Object trinoNativeValue)
Expand Down Expand Up @@ -232,6 +230,14 @@ private static Object getIcebergLiteralValue(Type type, Object trinoNativeValue)
throw new UnsupportedOperationException("Unsupported type: " + type);
}

private static Expression and(List<Expression> expressions)
{
if (expressions.isEmpty()) {
return alwaysTrue();
}
return combine(expressions, Expressions::and);
}

private static Expression or(Expression left, Expression right)
{
return Expressions.or(left, right);
Expand All @@ -242,10 +248,57 @@ private static Expression or(List<Expression> expressions)
if (expressions.isEmpty()) {
return alwaysFalse();
}
if (expressions.size() == 1) {
return getOnlyElement(expressions);
return combine(expressions, Expressions::or);
}

private static Expression combine(List<Expression> expressions, BiFunction<Expression, Expression, Expression> combiner)
{
// Build balanced tree that preserves the evaluation order of the input expressions.
//
// The tree is built bottom up by combining pairs of elements into binary expressions.
//
// Example:
//
// Initial state:
// a b c d e
//
// First iteration:
//
// /\ /\ e
// a b c d
//
// Second iteration:
//
// / \ e
// /\ /\
// a b c d
//
//
// Last iteration:
//
// / \
// / \ e
// /\ /\
// a b c d

Queue<Expression> queue = new ArrayDeque<>(expressions);
while (queue.size() > 1) {
Queue<Expression> buffer = new ArrayDeque<>();

// combine pairs of elements
while (queue.size() >= 2) {
buffer.add(combiner.apply(queue.remove(), queue.remove()));
}

// if there's and odd number of elements, just append the last one
if (!queue.isEmpty()) {
buffer.add(queue.remove());
}

// continue processing the pairs that were just built
queue = buffer;
}
int mid = expressions.size() / 2;
return or(or(expressions.subList(0, mid)), or(expressions.subList(mid, expressions.size())));

return queue.remove();
}
}