diff --git a/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java new file mode 100644 index 000000000000..c9efb3d41e67 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data; + +import java.lang.reflect.Array; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.util.function.Function; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +class InternalRecordWrapper implements StructLike { + private final Function[] transforms; + private StructLike wrapped = null; + + @SuppressWarnings("unchecked") + InternalRecordWrapper(Types.StructType struct) { + this.transforms = struct.fields().stream() + .map(field -> converter(field.type())) + .toArray(length -> (Function[]) Array.newInstance(Function.class, length)); + } + + private static Function converter(Type type) { + switch (type.typeId()) { + case DATE: + return date -> DateTimeUtil.daysFromDate((LocalDate) date); + case TIME: + return time -> DateTimeUtil.microsFromTime((LocalTime) time); + case TIMESTAMP: + if (((Types.TimestampType) type).shouldAdjustToUTC()) { + return timestamp -> DateTimeUtil.microsFromTimestamptz((OffsetDateTime) timestamp); + } else { + return timestamp -> DateTimeUtil.microsFromTimestamp((LocalDateTime) timestamp); + } + case FIXED: + return bytes -> ByteBuffer.wrap((byte[]) bytes); + case STRUCT: + InternalRecordWrapper wrapper = new InternalRecordWrapper(type.asStructType()); + return struct -> wrapper.wrap((StructLike) struct); + default: + } + return null; + } + + public InternalRecordWrapper wrap(StructLike record) { + this.wrapped = record; + return this; + } + + @Override + public int size() { + return wrapped.size(); + } + + @Override + public T get(int pos, Class javaClass) { + if (transforms[pos] != null) { + return javaClass.cast(transforms[pos].apply(wrapped.get(pos, Object.class))); + } + return wrapped.get(pos, javaClass); + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot update InternalRecordWrapper"); + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java index a683c019844f..c957ab5cb3b8 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java +++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java @@ -133,11 +133,13 @@ private class ScanIterator implements Iterator, Closeable { private final boolean caseSensitive; private Closeable currentCloseable = null; private Iterator currentIterator = Collections.emptyIterator(); + private final InternalRecordWrapper recordWrapper; private ScanIterator(CloseableIterable tasks, boolean caseSensitive) { this.tasks = Lists.newArrayList(Iterables.concat( CloseableIterable.transform(tasks, CombinedScanTask::files))).iterator(); this.caseSensitive = caseSensitive; + this.recordWrapper = new InternalRecordWrapper(projection.asStruct()); } @Override @@ -161,7 +163,8 @@ public boolean hasNext() { if (task.residual() != null && task.residual() != Expressions.alwaysTrue()) { Evaluator filter = new Evaluator(projection.asStruct(), task.residual(), caseSensitive); - this.currentIterator = Iterables.filter(reader, filter::eval).iterator(); + this.currentIterator = Iterables.filter(reader, + record -> filter.eval(recordWrapper.wrap(record))).iterator(); } else { this.currentIterator = reader.iterator(); } diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java index 673673097728..b13a3dffe113 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java +++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java @@ -56,6 +56,7 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Types; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -67,6 +68,7 @@ import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; import static org.apache.iceberg.DataFiles.fromInputFile; +import static org.apache.iceberg.expressions.Expressions.equal; import static org.apache.iceberg.expressions.Expressions.lessThan; import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath; @@ -391,13 +393,17 @@ public void testAsOfTimeOlderThanFirstSnapshot() { } private DataFile writeFile(String location, String filename, List records) throws IOException { + return writeFile(location, filename, SCHEMA, records); + } + + private DataFile writeFile(String location, String filename, Schema schema, List records) throws IOException { Path path = new Path(location, filename); FileFormat fileFormat = FileFormat.fromFileName(filename); Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); switch (fileFormat) { case AVRO: FileAppender avroAppender = Avro.write(fromPath(path, CONF)) - .schema(SCHEMA) + .schema(schema) .createWriterFunc(DataWriter::create) .named(fileFormat.name()) .build(); @@ -414,7 +420,7 @@ private DataFile writeFile(String location, String filename, List record case PARQUET: FileAppender parquetAppender = Parquet.write(fromPath(path, CONF)) - .schema(SCHEMA) + .schema(schema) .createWriterFunc(GenericParquetWriter::buildWriter) .build(); try { @@ -430,7 +436,7 @@ private DataFile writeFile(String location, String filename, List record case ORC: FileAppender orcAppender = ORC.write(fromPath(path, CONF)) - .schema(SCHEMA) + .schema(schema) .createWriterFunc(GenericOrcWriter::buildWriter) .build(); try { @@ -449,6 +455,42 @@ private DataFile writeFile(String location, String filename, List record } } + @Test + public void testFilterWithDateAndTimestamp() throws IOException { + Assume.assumeFalse(format == FileFormat.ORC); + Schema schema = new Schema( + required(1, "timestamp_with_zone", Types.TimestampType.withZone()), + required(2, "timestamp_without_zone", Types.TimestampType.withoutZone()), + required(3, "date", Types.DateType.get()), + required(4, "time", Types.TimeType.get()) + ); + + File tableLocation = temp.newFolder("complex_filter_table"); + Assert.assertTrue(tableLocation.delete()); + + Table table = TABLES.create( + schema, PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()), + tableLocation.getAbsolutePath()); + + List expected = RandomGenericData.generate(schema, 100, 435691832918L); + DataFile file = writeFile(tableLocation.toString(), format.addExtension("record-file"), schema, expected); + table.newFastAppend().appendFile(file).commit(); + + for (Record r : expected) { + Iterable filterResult = IcebergGenerics.read(table) + .where(equal("timestamp_with_zone", r.getField("timestamp_with_zone").toString())) + .where(equal("timestamp_without_zone", r.getField("timestamp_without_zone").toString())) + .where(equal("date", r.getField("date").toString())) + .where(equal("time", r.getField("time").toString())) + .build(); + + Assert.assertTrue(filterResult.iterator().hasNext()); + Record readRecord = filterResult.iterator().next(); + Assert.assertEquals(r.getField("timestamp_with_zone"), readRecord.getField("timestamp_with_zone")); + } + } + private static ByteBuffer longToBuffer(long value) { return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value); }