diff --git a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java index c155f005b9d9..9f1e26ebcc06 100644 --- a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java @@ -31,8 +31,8 @@ public class DateTimeUtil { private DateTimeUtil() { } - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + public static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); public static LocalDate dateFromDays(int daysFromEpoch) { return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch); diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java index 1ebc157847df..63f36e4e01f3 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetrics.java +++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java @@ -87,13 +87,16 @@ public abstract class TestMetrics { required(7, "stringCol", StringType.get()), optional(8, "dateCol", DateType.get()), required(9, "timeCol", TimeType.get()), - required(10, "timestampCol", TimestampType.withoutZone()), + required(10, "timestampColAboveEpoch", TimestampType.withoutZone()), required(11, "fixedCol", FixedType.ofLength(4)), - required(12, "binaryCol", BinaryType.get()) + required(12, "binaryCol", BinaryType.get()), + required(13, "timestampColBelowEpoch", TimestampType.withoutZone()) ); private final byte[] fixed = "abcd".getBytes(StandardCharsets.UTF_8); + public abstract FileFormat fileFormat(); + public abstract Metrics getMetrics(InputFile file); public abstract InputFile writeRecords(Schema schema, Record... records) throws IOException; @@ -101,7 +104,7 @@ public abstract class TestMetrics { public abstract InputFile writeRecordsWithSmallRowGroups(Schema schema, Record... records) throws IOException; - public abstract int splitCount(InputFile parquetFile) throws IOException; + public abstract int splitCount(InputFile inputFile) throws IOException; public boolean supportsSmallRowGroups() { return false; @@ -119,9 +122,10 @@ public void testMetricsForRepeatedValues() throws IOException { firstRecord.setField("stringCol", "AAA"); firstRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500)); firstRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L)); - firstRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L)); + firstRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(0L)); firstRecord.setField("fixedCol", fixed); firstRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes())); + firstRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(0L)); Record secondRecord = GenericRecord.create(SIMPLE_SCHEMA); secondRecord.setField("booleanCol", true); secondRecord.setField("intCol", 3); @@ -132,9 +136,10 @@ public void testMetricsForRepeatedValues() throws IOException { secondRecord.setField("stringCol", "AAA"); secondRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500)); secondRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L)); - secondRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L)); + secondRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(0L)); secondRecord.setField("fixedCol", fixed); secondRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes())); + secondRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(0L)); InputFile recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord); @@ -152,6 +157,7 @@ public void testMetricsForRepeatedValues() throws IOException { assertCounts(10, 2L, 0L, metrics); assertCounts(11, 2L, 0L, metrics); assertCounts(12, 2L, 0L, metrics); + assertCounts(13, 2L, 0L, metrics); } @Test @@ -166,9 +172,10 @@ public void testMetricsForTopLevelFields() throws IOException { firstRecord.setField("stringCol", "AAA"); firstRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500)); firstRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L)); - firstRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L)); + firstRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(0L)); firstRecord.setField("fixedCol", fixed); firstRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes())); + firstRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(-1_900_300L)); Record secondRecord = GenericRecord.create(SIMPLE_SCHEMA); secondRecord.setField("booleanCol", false); secondRecord.setField("intCol", Integer.MIN_VALUE); @@ -179,9 +186,10 @@ public void testMetricsForTopLevelFields() throws IOException { secondRecord.setField("stringCol", "ZZZ"); secondRecord.setField("dateCol", null); secondRecord.setField("timeCol", DateTimeUtil.timeFromMicros(3000L)); - secondRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(1000L)); + secondRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(900L)); secondRecord.setField("fixedCol", fixed); secondRecord.setField("binaryCol", ByteBuffer.wrap("W".getBytes())); + secondRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros(0L)); InputFile recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord); @@ -206,13 +214,27 @@ public void testMetricsForTopLevelFields() throws IOException { assertCounts(9, 2L, 0L, metrics); assertBounds(9, TimeType.get(), 2000L, 3000L, metrics); assertCounts(10, 2L, 0L, metrics); - assertBounds(10, TimestampType.withoutZone(), 0L, 1000L, metrics); + if (fileFormat() == FileFormat.ORC) { + // ORC-611: ORC only supports millisecond precision, so we adjust by 1 millisecond + assertBounds(10, TimestampType.withoutZone(), 0L, 1000L, metrics); + } else { + assertBounds(10, TimestampType.withoutZone(), 0L, 900L, metrics); + } assertCounts(11, 2L, 0L, metrics); assertBounds(11, FixedType.ofLength(4), ByteBuffer.wrap(fixed), ByteBuffer.wrap(fixed), metrics); assertCounts(12, 2L, 0L, metrics); assertBounds(12, BinaryType.get(), ByteBuffer.wrap("S".getBytes()), ByteBuffer.wrap("W".getBytes()), metrics); + if (fileFormat() == FileFormat.ORC) { + // TODO: enable when ORC-342 is fixed - ORC-342: creates inaccurate timestamp/stats below epoch + // ORC-611: ORC only supports millisecond precision, so we adjust by 1 millisecond, e.g. + // assertBounds(13, TimestampType.withoutZone(), -1000L, 1000L, metrics); would fail for a value + // in the range `[1970-01-01 00:00:00.000,1970-01-01 00:00:00.999]` + assertBounds(13, TimestampType.withoutZone(), -1_901_000L, 1000L, metrics); + } else { + assertBounds(13, TimestampType.withoutZone(), -1_900_300L, 0L, metrics); + } } @Test @@ -292,14 +314,22 @@ public void testMetricsForListAndMapElements() throws IOException { Metrics metrics = getMetrics(recordsFile); Assert.assertEquals(1L, (long) metrics.recordCount()); - assertCounts(1, 1, 0, metrics); + if (fileFormat() != FileFormat.ORC) { + assertCounts(1, 1L, 0L, metrics); + assertCounts(2, 1L, 0L, metrics); + assertCounts(4, 3L, 0L, metrics); + assertCounts(6, 1L, 0L, metrics); + } else { + assertCounts(1, null, null, metrics); + assertCounts(2, null, null, metrics); + assertCounts(4, null, null, metrics); + assertCounts(6, null, null, metrics); + } assertBounds(1, IntegerType.get(), null, null, metrics); - assertCounts(2, 1, 0, metrics); assertBounds(2, StringType.get(), null, null, metrics); - assertCounts(4, 3, 0, metrics); assertBounds(4, IntegerType.get(), null, null, metrics); - assertCounts(6, 1, 0, metrics); assertBounds(6, StringType.get(), null, null, metrics); + assertBounds(7, structType, null, null, metrics); } @Test @@ -316,7 +346,7 @@ public void testMetricsForNullColumns() throws IOException { Metrics metrics = getMetrics(recordsFile); Assert.assertEquals(2L, (long) metrics.recordCount()); - assertCounts(1, 2, 2, metrics); + assertCounts(1, 2L, 2L, metrics); assertBounds(1, IntegerType.get(), null, null, metrics); } @@ -338,13 +368,14 @@ public void testMetricsForTopLevelWithMultipleRowGroup() throws Exception { newRecord.setField("stringCol", "AAA"); newRecord.setField("dateCol", DateTimeUtil.dateFromDays(i + 1)); newRecord.setField("timeCol", DateTimeUtil.timeFromMicros(i + 1L)); - newRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(i + 1L)); + newRecord.setField("timestampColAboveEpoch", DateTimeUtil.timestampFromMicros(i + 1L)); newRecord.setField("fixedCol", fixed); newRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes())); + newRecord.setField("timestampColBelowEpoch", DateTimeUtil.timestampFromMicros((i + 1L) * -1L)); records.add(newRecord); } - // create parquet file with multiple row groups. by using smaller number of bytes + // create file with multiple row groups. by using smaller number of bytes InputFile recordsFile = writeRecordsWithSmallRowGroups(SIMPLE_SCHEMA, records.toArray(new Record[0])); Assert.assertNotNull(recordsFile); @@ -387,7 +418,7 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce records.add(newRecord); } - // create parquet file with multiple row groups. by using smaller number of bytes + // create file with multiple row groups. by using smaller number of bytes InputFile recordsFile = writeRecordsWithSmallRowGroups(NESTED_SCHEMA, records.toArray(new Record[0])); Assert.assertNotNull(recordsFile); @@ -407,14 +438,14 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); } - private void assertCounts(int fieldId, long valueCount, long nullValueCount, Metrics metrics) { + private void assertCounts(int fieldId, Long valueCount, Long nullValueCount, Metrics metrics) { Map valueCounts = metrics.valueCounts(); Map nullValueCounts = metrics.nullValueCounts(); - Assert.assertEquals(valueCount, (long) valueCounts.get(fieldId)); - Assert.assertEquals(nullValueCount, (long) nullValueCounts.get(fieldId)); + Assert.assertEquals(valueCount, valueCounts.get(fieldId)); + Assert.assertEquals(nullValueCount, nullValueCounts.get(fieldId)); } - private void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) { + protected void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) { Map lowerBounds = metrics.lowerBounds(); Map upperBounds = metrics.upperBounds(); diff --git a/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java b/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java new file mode 100644 index 000000000000..82ef54b58be1 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/orc/TestOrcMetrics.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TestMetrics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Test Metrics for ORC. + */ +public class TestOrcMetrics extends TestMetrics { + + static final ImmutableSet BINARY_TYPES = ImmutableSet.of(Type.TypeID.BINARY, + Type.TypeID.FIXED, Type.TypeID.UUID); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Override + public FileFormat fileFormat() { + return FileFormat.ORC; + } + + @Override + public Metrics getMetrics(InputFile file) { + return OrcMetrics.fromInputFile(file); + } + + @Override + public InputFile writeRecordsWithSmallRowGroups(Schema schema, Record... records) throws IOException { + throw new UnsupportedOperationException("supportsSmallRowGroups = " + supportsSmallRowGroups()); + } + + @Override + public InputFile writeRecords(Schema schema, Record... records) throws IOException { + return writeRecords(schema, ImmutableMap.of(), records); + } + + private InputFile writeRecords(Schema schema, Map properties, Record... records) throws IOException { + File tmpFolder = temp.newFolder("orc"); + String filename = UUID.randomUUID().toString(); + OutputFile file = Files.localOutput(new File(tmpFolder, FileFormat.ORC.addExtension(filename))); + try (FileAppender writer = ORC.write(file) + .schema(schema) + .setAll(properties) + .createWriterFunc(GenericOrcWriter::buildWriter) + .build()) { + writer.addAll(Lists.newArrayList(records)); + } + return file.toInputFile(); + } + + @Override + public int splitCount(InputFile inputFile) throws IOException { + return 0; + } + + private boolean isBinaryType(Type type) { + return BINARY_TYPES.contains(type.typeId()); + } + + @Override + protected void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) { + if (isBinaryType(type)) { + Assert.assertFalse("ORC binary field should not have lower bounds.", + metrics.lowerBounds().containsKey(fieldId)); + Assert.assertFalse("ORC binary field should not have upper bounds.", + metrics.lowerBounds().containsKey(fieldId)); + return; + } + super.assertBounds(fieldId, type, lowerBound, upperBound, metrics); + } +} diff --git a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java index a4c28cbf3f8c..9af818a48c1e 100644 --- a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java +++ b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java @@ -50,6 +50,11 @@ public class TestParquetMetrics extends TestMetrics { @Rule public TemporaryFolder temp = new TemporaryFolder(); + @Override + public FileFormat fileFormat() { + return FileFormat.PARQUET; + } + @Override public Metrics getMetrics(InputFile file) { return ParquetUtil.fileMetrics(file); @@ -80,8 +85,8 @@ private InputFile writeRecords(Schema schema, Map properties, Re } @Override - public int splitCount(InputFile parquetFile) throws IOException { - try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(parquetFile))) { + public int splitCount(InputFile inputFile) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile))) { return reader.getRowGroups().size(); } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 07be70f79e13..970f891956cd 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -377,7 +377,7 @@ private static boolean isSameType(TypeDescription orcType, Type icebergType) { } } - private static Optional icebergID(TypeDescription orcType) { + static Optional icebergID(TypeDescription orcType) { return Optional.ofNullable(orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE)) .map(Integer::parseInt); } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java index 4e961bb0d972..acc48550688b 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java @@ -91,6 +91,8 @@ public void add(D datum) { @Override public Metrics metrics() { + Preconditions.checkState(isClosed, + "Cannot return metrics while appending to an open file."); return OrcMetrics.fromWriter(writer); } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java index 3689ec59d16b..a5bcc2f8b161 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java @@ -20,13 +20,40 @@ package org.apache.iceberg.orc; import java.io.IOException; -import java.util.Collections; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Metrics; +import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Queues; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.orc.BooleanColumnStatistics; +import org.apache.orc.ColumnStatistics; +import org.apache.orc.DateColumnStatistics; +import org.apache.orc.DecimalColumnStatistics; +import org.apache.orc.DoubleColumnStatistics; +import org.apache.orc.IntegerColumnStatistics; import org.apache.orc.Reader; +import org.apache.orc.StringColumnStatistics; +import org.apache.orc.TimestampColumnStatistics; +import org.apache.orc.TypeDescription; import org.apache.orc.Writer; public class OrcMetrics { @@ -40,30 +67,211 @@ public static Metrics fromInputFile(InputFile file) { return fromInputFile(file, config); } - public static Metrics fromInputFile(InputFile file, Configuration config) { + static Metrics fromInputFile(InputFile file, Configuration config) { try (Reader orcReader = ORC.newFileReader(file, config)) { - - // TODO: implement rest of the methods for ORC metrics - // https://github.com/apache/incubator-iceberg/pull/199 - return new Metrics(orcReader.getNumberOfRows(), - null, - null, - Collections.emptyMap(), - null, - null); + return buildOrcMetrics(orcReader.getNumberOfRows(), orcReader.getSchema(), orcReader.getStatistics()); } catch (IOException ioe) { - throw new RuntimeIOException(ioe, "Failed to read footer of file: %s", file); + throw new RuntimeIOException(ioe, "Failed to open file: %s", file.location()); } } static Metrics fromWriter(Writer writer) { - // TODO: implement rest of the methods for ORC metrics in - // https://github.com/apache/incubator-iceberg/pull/199 - return new Metrics(writer.getNumberOfRows(), - null, - null, - Collections.emptyMap(), - null, - null); + try { + return buildOrcMetrics(writer.getNumberOfRows(), writer.getSchema(), writer.getStatistics()); + } catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Failed to get statistics from writer"); + } + } + + private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescription orcSchema, + final ColumnStatistics[] colStats) { + final Schema schema = ORCSchemaUtil.convert(orcSchema); + final Set columnsInContainers = findColumnsInContainers(schema, orcSchema); + Map columnSizes = Maps.newHashMapWithExpectedSize(colStats.length); + Map valueCounts = Maps.newHashMapWithExpectedSize(colStats.length); + Map nullCounts = Maps.newHashMapWithExpectedSize(colStats.length); + Map lowerBounds = Maps.newHashMap(); + Map upperBounds = Maps.newHashMap(); + + for (int i = 0; i < colStats.length; i++) { + final ColumnStatistics colStat = colStats[i]; + final TypeDescription orcCol = orcSchema.findSubtype(i); + final Optional icebergColOpt = ORCSchemaUtil.icebergID(orcCol) + .map(schema::findField); + + if (icebergColOpt.isPresent()) { + final Types.NestedField icebergCol = icebergColOpt.get(); + final int fieldId = icebergCol.fieldId(); + + columnSizes.put(fieldId, colStat.getBytesOnDisk()); + + if (!columnsInContainers.contains(orcCol)) { + // Since ORC does not track null values nor repeated ones, the value count for columns in + // containers (maps, list) may be larger than what it actually is, however these are not + // used in experssions right now. For such cases, we use the value number of values + // directly stored in ORC. + if (colStat.hasNull()) { + nullCounts.put(fieldId, numOfRows - colStat.getNumberOfValues()); + } else { + nullCounts.put(fieldId, 0L); + } + valueCounts.put(fieldId, colStat.getNumberOfValues() + nullCounts.get(fieldId)); + + Optional orcMin = (colStat.getNumberOfValues() > 0) ? + fromOrcMin(icebergCol, colStat) : Optional.empty(); + orcMin.ifPresent(byteBuffer -> lowerBounds.put(icebergCol.fieldId(), byteBuffer)); + Optional orcMax = (colStat.getNumberOfValues() > 0) ? + fromOrcMax(icebergCol, colStat) : Optional.empty(); + orcMax.ifPresent(byteBuffer -> upperBounds.put(icebergCol.fieldId(), byteBuffer)); + } + } + } + + return new Metrics(numOfRows, + columnSizes, + valueCounts, + nullCounts, + lowerBounds, + upperBounds); + } + + private static Optional fromOrcMin(Types.NestedField column, + ColumnStatistics columnStats) { + Object min = null; + if (columnStats instanceof IntegerColumnStatistics) { + min = ((IntegerColumnStatistics) columnStats).getMinimum(); + if (column.type().typeId() == Type.TypeID.INTEGER) { + min = Math.toIntExact((long) min); + } + } else if (columnStats instanceof DoubleColumnStatistics) { + min = ((DoubleColumnStatistics) columnStats).getMinimum(); + if (column.type().typeId() == Type.TypeID.FLOAT) { + min = ((Double) min).floatValue(); + } + } else if (columnStats instanceof StringColumnStatistics) { + min = ((StringColumnStatistics) columnStats).getMinimum(); + } else if (columnStats instanceof DecimalColumnStatistics) { + min = Optional + .ofNullable(((DecimalColumnStatistics) columnStats).getMinimum()) + .map(minStats -> minStats.bigDecimalValue() + .setScale(((Types.DecimalType) column.type()).scale())) + .orElse(null); + } else if (columnStats instanceof DateColumnStatistics) { + min = Optional.ofNullable(((DateColumnStatistics) columnStats).getMinimum()) + .map(minStats -> DateTimeUtil.daysFromDate( + DateTimeUtil.EPOCH.plus(minStats.getTime(), ChronoUnit.MILLIS).toLocalDate())) + .orElse(null); + } else if (columnStats instanceof TimestampColumnStatistics) { + TimestampColumnStatistics tColStats = (TimestampColumnStatistics) columnStats; + Timestamp minValue = tColStats.getMinimumUTC(); + min = Optional.ofNullable(minValue) + .map(v -> TimeUnit.MILLISECONDS.toMicros(v.getTime())) + .orElse(null); + } else if (columnStats instanceof BooleanColumnStatistics) { + BooleanColumnStatistics booleanStats = (BooleanColumnStatistics) columnStats; + min = booleanStats.getFalseCount() <= 0; + } + return Optional.ofNullable(Conversions.toByteBuffer(column.type(), min)); + } + + private static Optional fromOrcMax(Types.NestedField column, + ColumnStatistics columnStats) { + Object max = null; + if (columnStats instanceof IntegerColumnStatistics) { + max = ((IntegerColumnStatistics) columnStats).getMaximum(); + if (column.type().typeId() == Type.TypeID.INTEGER) { + max = Math.toIntExact((long) max); + } + } else if (columnStats instanceof DoubleColumnStatistics) { + max = ((DoubleColumnStatistics) columnStats).getMaximum(); + if (column.type().typeId() == Type.TypeID.FLOAT) { + max = ((Double) max).floatValue(); + } + } else if (columnStats instanceof StringColumnStatistics) { + max = ((StringColumnStatistics) columnStats).getMaximum(); + } else if (columnStats instanceof DecimalColumnStatistics) { + max = Optional + .ofNullable(((DecimalColumnStatistics) columnStats).getMaximum()) + .map(maxStats -> maxStats.bigDecimalValue() + .setScale(((Types.DecimalType) column.type()).scale())) + .orElse(null); + } else if (columnStats instanceof DateColumnStatistics) { + max = Optional.ofNullable(((DateColumnStatistics) columnStats).getMaximum()) + .map(maxStats -> DateTimeUtil.daysFromDate( + DateTimeUtil.EPOCH.plus(maxStats.getTime(), ChronoUnit.MILLIS).toLocalDate())) + .orElse(null); + } else if (columnStats instanceof TimestampColumnStatistics) { + TimestampColumnStatistics tColStats = (TimestampColumnStatistics) columnStats; + Timestamp maxValue = tColStats.getMaximumUTC(); + max = Optional.ofNullable(maxValue) + .map(v -> TimeUnit.MILLISECONDS.toMicros(v.getTime())) + .map(v -> v + 1_000) // Add 1 millisecond to handle precision issue due to ORC-611 + .orElse(null); + } else if (columnStats instanceof BooleanColumnStatistics) { + BooleanColumnStatistics booleanStats = (BooleanColumnStatistics) columnStats; + max = booleanStats.getTrueCount() > 0; + } + return Optional.ofNullable(Conversions.toByteBuffer(column.type(), max)); + } + + private static Set findColumnsInContainers(Schema schema, + TypeDescription orcSchema) { + ColumnsInContainersVisitor visitor = new ColumnsInContainersVisitor(); + OrcSchemaWithTypeVisitor.visit(schema, orcSchema, visitor); + return visitor.getColumnsInContainers(); + } + + private static class ColumnsInContainersVisitor extends OrcSchemaWithTypeVisitor { + + private final Set columnsInContainers; + + private ColumnsInContainersVisitor() { + columnsInContainers = Sets.newHashSet(); + } + + public Set getColumnsInContainers() { + return columnsInContainers; + } + + private Set flatten(TypeDescription rootType) { + if (rootType == null) { + return ImmutableSet.of(); + } + + final Set flatTypes = Sets.newHashSetWithExpectedSize(rootType.getMaximumId()); + final Queue queue = Queues.newLinkedBlockingQueue(); + queue.add(rootType); + while (!queue.isEmpty()) { + TypeDescription type = queue.remove(); + flatTypes.add(type); + queue.addAll(Optional.ofNullable(type.getChildren()).orElse(ImmutableList.of())); + } + return flatTypes; + } + + @Override + public TypeDescription record(Types.StructType iStruct, TypeDescription record, + List names, List fields) { + return record; + } + + @Override + public TypeDescription list(Types.ListType iList, TypeDescription array, TypeDescription element) { + columnsInContainers.addAll(flatten(element)); + return array; + } + + @Override + public TypeDescription map(Types.MapType iMap, TypeDescription map, + TypeDescription key, TypeDescription value) { + columnsInContainers.addAll(flatten(key)); + columnsInContainers.addAll(flatten(value)); + return map; + } + + @Override + public TypeDescription primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + return primitive; + } } }