From b83dea35094792dae2898e15a59aed23630a4317 Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Wed, 7 Apr 2021 17:06:44 -0700 Subject: [PATCH 1/3] exclude NaN from upper/lower bound of floating columns --- .../java/org/apache/iceberg/FieldMetrics.java | 16 +- .../org/apache/iceberg/FloatFieldMetrics.java | 79 ++++++-- .../java/org/apache/iceberg/TestMetrics.java | 45 ++--- .../iceberg/data/orc/GenericOrcWriters.java | 24 +-- .../apache/iceberg/TestMergingMetrics.java | 184 +++++++++++++----- .../org/apache/iceberg/orc/OrcMetrics.java | 36 ++-- .../apache/iceberg/parquet/ParquetUtil.java | 48 ++++- .../iceberg/parquet/ParquetValueWriters.java | 26 +-- .../spark/data/SparkOrcValueWriters.java | 26 +-- 9 files changed, 311 insertions(+), 173 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/FieldMetrics.java b/core/src/main/java/org/apache/iceberg/FieldMetrics.java index d67faa94f1ab..9a001b205a18 100644 --- a/core/src/main/java/org/apache/iceberg/FieldMetrics.java +++ b/core/src/main/java/org/apache/iceberg/FieldMetrics.java @@ -20,25 +20,23 @@ package org.apache.iceberg; -import java.nio.ByteBuffer; - /** * Iceberg internally tracked field level metrics. */ -public class FieldMetrics { +public class FieldMetrics { private final int id; private final long valueCount; private final long nullValueCount; private final long nanValueCount; - private final ByteBuffer lowerBound; - private final ByteBuffer upperBound; + private final T lowerBound; + private final T upperBound; public FieldMetrics(int id, long valueCount, long nullValueCount, long nanValueCount, - ByteBuffer lowerBound, - ByteBuffer upperBound) { + T lowerBound, + T upperBound) { this.id = id; this.valueCount = valueCount; this.nullValueCount = nullValueCount; @@ -78,14 +76,14 @@ public long nanValueCount() { /** * Returns the lower bound value of this field. */ - public ByteBuffer lowerBound() { + public T lowerBound() { return lowerBound; } /** * Returns the upper bound value of this field. */ - public ByteBuffer upperBound() { + public T upperBound() { return upperBound; } } diff --git a/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java b/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java index 873138750685..f179ffbb38f3 100644 --- a/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java +++ b/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java @@ -19,8 +19,6 @@ package org.apache.iceberg; -import java.nio.ByteBuffer; - /** * Iceberg internally tracked field level metrics, used by Parquet and ORC writers only. *

@@ -28,16 +26,10 @@ * This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing * exceptions when they are accessed. */ -public class FloatFieldMetrics extends FieldMetrics { - - /** - * Constructor for creating a FieldMetrics with only NaN counter. - * @param id field id being tracked by the writer - * @param nanValueCount number of NaN values, will only be non-0 for double or float field. - */ - public FloatFieldMetrics(int id, - long nanValueCount) { - super(id, 0L, 0L, nanValueCount, null, null); +public class FloatFieldMetrics extends FieldMetrics { + + private FloatFieldMetrics(AbstractFloatFieldMetricsContext context) { + super(context.id, 0L, 0L, context.nanValueCount, context.lowerBound, context.upperBound); } @Override @@ -50,13 +42,64 @@ public long nullValueCount() { throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); } - @Override - public ByteBuffer lowerBound() { - throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); + public static class FloatFieldMetricsContext extends AbstractFloatFieldMetricsContext { + public FloatFieldMetricsContext(int id) { + super(id); + } + + @Override + public void updateMetricsContext(Float value) { + if (Float.isNaN(value)) { + this.nanValueCount++; + } else { + if (lowerBound == null || Float.compare(value, lowerBound) < 0) { + this.lowerBound = value; + } + if (upperBound == null || Float.compare(value, upperBound) > 0) { + this.upperBound = value; + } + } + } } - @Override - public ByteBuffer upperBound() { - throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); + public static class DoubleFieldMetricsContext extends AbstractFloatFieldMetricsContext { + public DoubleFieldMetricsContext(int id) { + super(id); + } + + @Override + public void updateMetricsContext(Double value) { + if (Double.isNaN(value)) { + this.nanValueCount++; + } else { + if (lowerBound == null || Double.compare(value, lowerBound) < 0) { + this.lowerBound = value; + } + if (upperBound == null || Double.compare(value, upperBound) > 0) { + this.upperBound = value; + } + } + } + } + + @SuppressWarnings("checkstyle:VisibilityModifier") + public abstract static class AbstractFloatFieldMetricsContext { + private final int id; + protected long nanValueCount = 0; + protected T lowerBound = null; + protected T upperBound = null; + + public AbstractFloatFieldMetricsContext(int id) { + this.id = id; + } + + /** + * It is caller's responsibility to ensure input shouldn't be null + */ + public abstract void updateMetricsContext(T value); + + public FloatFieldMetrics buildMetrics() { + return new FloatFieldMetrics(this); + } } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java index 2e7cdbf5d355..47b79a655af6 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetrics.java +++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java @@ -274,7 +274,7 @@ public void testMetricsForNestedStructFields() throws IOException { assertBounds(6, BinaryType.get(), ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); assertCounts(7, 1L, 0L, 1L, metrics); - assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics); + assertBounds(7, DoubleType.get(), null, null, metrics); } private Record buildNestedTestRecord() { @@ -354,9 +354,9 @@ public void testMetricsForNaNColumns() throws IOException { Assert.assertEquals(2L, (long) metrics.recordCount()); assertCounts(1, 2L, 0L, 2L, metrics); assertCounts(2, 2L, 0L, 2L, metrics); - // below: current behavior; will be null once NaN is excluded from upper/lower bound - assertBounds(1, FloatType.get(), Float.NaN, Float.NaN, metrics); - assertBounds(2, DoubleType.get(), Double.NaN, Double.NaN, metrics); + + assertBounds(1, FloatType.get(), null, null, metrics); + assertBounds(2, DoubleType.get(), null, null, metrics); } @Test @@ -367,15 +367,8 @@ public void testColumnBoundsWithNaNValueAtFront() throws IOException { assertCounts(1, 3L, 0L, 1L, metrics); assertCounts(2, 3L, 0L, 1L, metrics); - // below: current behavior; will be non-NaN values once NaN is excluded from upper/lower bound. ORC and Parquet's - // behaviors differ due to their implementation of comparison being different. - if (fileFormat() == FileFormat.ORC) { - assertBounds(1, FloatType.get(), Float.NaN, Float.NaN, metrics); - assertBounds(2, DoubleType.get(), Double.NaN, Double.NaN, metrics); - } else { - assertBounds(1, FloatType.get(), 1.2F, Float.NaN, metrics); - assertBounds(2, DoubleType.get(), 3.4D, Double.NaN, metrics); - } + assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics); + assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics); } @Test @@ -386,15 +379,8 @@ public void testColumnBoundsWithNaNValueInMiddle() throws IOException { assertCounts(1, 3L, 0L, 1L, metrics); assertCounts(2, 3L, 0L, 1L, metrics); - // below: current behavior; will be non-NaN values once NaN is excluded from upper/lower bound. ORC and Parquet's - // behaviors differ due to their implementation of comparison being different. - if (fileFormat() == FileFormat.ORC) { - assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics); - assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics); - } else { - assertBounds(1, FloatType.get(), 1.2F, Float.NaN, metrics); - assertBounds(2, DoubleType.get(), 3.4D, Double.NaN, metrics); - } + assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics); + assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics); } @Test @@ -405,15 +391,8 @@ public void testColumnBoundsWithNaNValueAtEnd() throws IOException { assertCounts(1, 3L, 0L, 1L, metrics); assertCounts(2, 3L, 0L, 1L, metrics); - // below: current behavior; will be non-NaN values once NaN is excluded from upper/lower bound. ORC and Parquet's - // behaviors differ due to their implementation of comparison being different. - if (fileFormat() == FileFormat.ORC) { - assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics); - assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics); - } else { - assertBounds(1, FloatType.get(), 1.2F, Float.NaN, metrics); - assertBounds(2, DoubleType.get(), 3.4D, Double.NaN, metrics); - } + assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics); + assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics); } @Test @@ -506,7 +485,7 @@ public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOExce assertBounds(6, BinaryType.get(), ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); assertCounts(7, 201L, 0L, 201L, metrics); - assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics); + assertBounds(7, DoubleType.get(), null, null, metrics); } @Test @@ -567,7 +546,7 @@ public void testFullMetricsMode() throws IOException { assertBounds(6, Types.BinaryType.get(), ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); assertCounts(7, 1L, 0L, 1L, metrics); - assertBounds(7, Types.DoubleType.get(), Double.NaN, Double.NaN, metrics); + assertBounds(7, Types.DoubleType.get(), null, null, metrics); } @Test diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 658942400896..ea13005de471 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -219,12 +219,10 @@ public void nonNullWrite(int rowId, Long data, ColumnVector output) { } private static class FloatWriter implements OrcValueWriter { - private final int id; - private long nanCount; + private final FloatFieldMetrics.FloatFieldMetricsContext floatFieldMetricsContext; private FloatWriter(int id) { - this.id = id; - this.nanCount = 0; + this.floatFieldMetricsContext = new FloatFieldMetrics.FloatFieldMetricsContext(id); } @Override @@ -235,24 +233,20 @@ public Class getJavaClass() { @Override public void nonNullWrite(int rowId, Float data, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = data; - if (Float.isNaN(data)) { - nanCount++; - } + floatFieldMetricsContext.updateMetricsContext(data); } @Override public Stream metrics() { - return Stream.of(new FloatFieldMetrics(id, nanCount)); + return Stream.of(floatFieldMetricsContext.buildMetrics()); } } private static class DoubleWriter implements OrcValueWriter { - private final int id; - private long nanCount; + private final FloatFieldMetrics.DoubleFieldMetricsContext doubleFieldMetricsContext; private DoubleWriter(Integer id) { - this.id = id; - this.nanCount = 0; + this.doubleFieldMetricsContext = new FloatFieldMetrics.DoubleFieldMetricsContext(id); } @Override @@ -263,14 +257,12 @@ public Class getJavaClass() { @Override public void nonNullWrite(int rowId, Double data, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = data; - if (Double.isNaN(data)) { - nanCount++; - } + doubleFieldMetricsContext.updateMetricsContext(data); } @Override public Stream metrics() { - return Stream.of(new FloatFieldMetrics(id, nanCount)); + return Stream.of(doubleFieldMetricsContext.buildMetrics()); } } diff --git a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java index fa588302e8e2..3def5482ece5 100644 --- a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java @@ -19,15 +19,26 @@ package org.apache.iceberg; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.NaNUtil; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -37,6 +48,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public abstract class TestMergingMetrics { @@ -105,74 +117,152 @@ public TestMergingMetrics(FileFormat fileFormat) { @Test public void verifySingleRecordMetric() throws Exception { Record record = GenericRecord.create(SCHEMA); - record.setField("id", 3); - record.setField("float", Float.NaN); // FLOAT_FIELD - 1 - record.setField("double", Double.NaN); // DOUBLE_FIELD - 1 - record.setField("floatlist", ImmutableList.of(3.3F, 2.8F, Float.NaN, -25.1F, Float.NaN)); // FLOAT_LIST - 2 - record.setField("map1", ImmutableMap.of(Float.NaN, "a", 0F, "b")); // MAP_FIELD_1 - 1 - record.setField("map2", ImmutableMap.of( + record.setField(ID_FIELD.name(), 3); + record.setField(FLOAT_FIELD.name(), Float.NaN); // FLOAT_FIELD - 1 + record.setField(DOUBLE_FIELD.name(), Double.NaN); // DOUBLE_FIELD - 1 + record.setField(FLOAT_LIST.name(), ImmutableList.of(3.3F, 2.8F, Float.NaN, -25.1F, Float.NaN)); // FLOAT_LIST - 2 + record.setField(MAP_FIELD_1.name(), ImmutableMap.of(Float.NaN, "a", 0F, "b")); // MAP_FIELD_1 - 1 + record.setField(MAP_FIELD_2.name(), ImmutableMap.of( 0, 0D, 1, Double.NaN, 2, 2D, 3, Double.NaN, 4, Double.NaN)); // MAP_FIELD_2 - 3 FileAppender appender = writeAndGetAppender(ImmutableList.of(record)); - Map nanValueCount = appender.metrics().nanValueCounts(); + Metrics metrics = appender.metrics(); + Map nanValueCount = metrics.nanValueCounts(); + Map upperBounds = metrics.upperBounds(); + Map lowerBounds = metrics.lowerBounds(); assertNaNCountMatch(1L, nanValueCount, FLOAT_FIELD); assertNaNCountMatch(1L, nanValueCount, DOUBLE_FIELD); assertNaNCountMatch(2L, nanValueCount, FLOAT_LIST); assertNaNCountMatch(1L, nanValueCount, MAP_FIELD_1); assertNaNCountMatch(3L, nanValueCount, MAP_FIELD_2); - } - private void assertNaNCountMatch(Long expected, Map nanValueCount, Types.NestedField field) { - Assert.assertEquals( - String.format("NaN count for field %s does not match expected", field.name()), - expected, nanValueCount.get(FIELDS_WITH_NAN_COUNT_TO_ID.get(field))); + assertBoundValueMatch(null, upperBounds, FLOAT_FIELD); + assertBoundValueMatch(null, upperBounds, DOUBLE_FIELD); + assertBoundValueMatch(3.3F, upperBounds, FLOAT_LIST); + assertBoundValueMatch(0F, upperBounds, MAP_FIELD_1); + assertBoundValueMatch(2D, upperBounds, MAP_FIELD_2); + + assertBoundValueMatch(null, lowerBounds, FLOAT_FIELD); + assertBoundValueMatch(null, lowerBounds, DOUBLE_FIELD); + assertBoundValueMatch(-25.1F, lowerBounds, FLOAT_LIST); + assertBoundValueMatch(0F, lowerBounds, MAP_FIELD_1); + assertBoundValueMatch(0D, lowerBounds, MAP_FIELD_2); } @Test public void verifyRandomlyGeneratedRecordsMetric() throws Exception { - List recordList = RandomGenericData.generate(SCHEMA, 50, 250L); - + // too big of the record count will more likely to make all upper/lower bounds +/-infinity, + // which makes the tests easier to pass + List recordList = RandomGenericData.generate(SCHEMA, 5, 250L); FileAppender appender = writeAndGetAppender(recordList); - Map nanValueCount = appender.metrics().nanValueCounts(); - FIELDS_WITH_NAN_COUNT_TO_ID.forEach((key, value) -> Assert.assertEquals( - String.format("NaN count for field %s does not match expected", key.name()), - getExpectedNaNCount(recordList, key), - nanValueCount.get(value))); + Map> expectedUpperBounds = new HashMap<>(); + Map> expectedLowerBounds = new HashMap<>(); + Map expectedNaNCount = new HashMap<>(); + + populateExpectedValues(recordList, expectedUpperBounds, expectedLowerBounds, expectedNaNCount); + + Metrics metrics = appender.metrics(); + expectedUpperBounds.forEach((key, value) -> assertBoundValueMatch(value.get(), metrics.upperBounds(), key)); + expectedLowerBounds.forEach((key, value) -> assertBoundValueMatch(value.get(), metrics.lowerBounds(), key)); + expectedNaNCount.forEach((key, value) -> assertNaNCountMatch(value.get(), metrics.nanValueCounts(), key)); SCHEMA.columns().stream() .filter(column -> !FIELDS_WITH_NAN_COUNT_TO_ID.containsKey(column)) .map(Types.NestedField::fieldId) - .forEach(id -> Assert.assertNull("NaN count for field %s should be null", nanValueCount.get(id))); + .forEach(id -> Assert.assertNull("NaN count for field %s should be null", + metrics.nanValueCounts().get(id))); + } + + private void assertNaNCountMatch(Long expected, Map nanValueCount, Types.NestedField field) { + Assert.assertEquals( + String.format("NaN count for field %s does not match expected", field.name()), + expected, nanValueCount.get(FIELDS_WITH_NAN_COUNT_TO_ID.get(field))); + } + + private void assertBoundValueMatch(Number expected, Map boundMap, Types.NestedField field) { + if (field.type().isNestedType() && fileFormat == FileFormat.ORC) { + // we don't update floating column bounds values within ORC nested columns + return; + } + + int actualFieldId = FIELDS_WITH_NAN_COUNT_TO_ID.get(field); + ByteBuffer byteBuffer = boundMap.get(actualFieldId); + Type type = SCHEMA.findType(actualFieldId); + assertEquals(String.format("Bound value for field %s must match", field.name()), + expected, byteBuffer == null ? null : Conversions.fromByteBuffer(type, byteBuffer)); + } + + private void populateExpectedValues(List records, + Map> upperBounds, + Map> lowerBounds, + Map expectedNaNCount) { + for (Types.NestedField field : FIELDS_WITH_NAN_COUNT_TO_ID.keySet()) { + expectedNaNCount.put(field, new AtomicLong(0)); + } + + for (Record record : records) { + updateExpectedValuePerRecord(upperBounds, lowerBounds, expectedNaNCount, + FLOAT_FIELD, (Float) record.getField(FLOAT_FIELD.name())); + updateExpectedValuePerRecord(upperBounds, lowerBounds, expectedNaNCount, + DOUBLE_FIELD, (Double) record.getField(DOUBLE_FIELD.name())); + + List floatList = (List) record.getField(FLOAT_LIST.name()); + if (floatList != null) { + updateExpectedValueFromRecords(upperBounds, lowerBounds, expectedNaNCount, FLOAT_LIST, floatList); + } + + Map map1 = (Map) record.getField(MAP_FIELD_1.name()); + if (map1 != null) { + updateExpectedValueFromRecords(upperBounds, lowerBounds, expectedNaNCount, MAP_FIELD_1, map1.keySet()); + } + + Map map2 = (Map) record.getField(MAP_FIELD_2.name()); + if (map2 != null) { + updateExpectedValueFromRecords(upperBounds, lowerBounds, expectedNaNCount, MAP_FIELD_2, map2.values()); + } + } + } + + private void updateExpectedValueFromRecords( + Map> upperBounds, + Map> lowerBounds, + Map expectedNaNCount, + Types.NestedField key, Collection vals) { + List nonNullNumbers = vals.stream().filter(v -> !NaNUtil.isNaN(v)).collect(Collectors.toList()); + Optional maxOptional = nonNullNumbers.stream().filter(Objects::nonNull) + .reduce((v1, v2) -> getMinOrMax(v1, v2, true)); + Optional minOptional = nonNullNumbers.stream().filter(Objects::nonNull) + .reduce((v1, v2) -> getMinOrMax(v1, v2, false)); + + expectedNaNCount.get(key).addAndGet(vals.size() - nonNullNumbers.size()); + maxOptional.ifPresent(max -> updateBound(key, max, upperBounds, true)); + minOptional.ifPresent(min -> updateBound(key, min, lowerBounds, false)); + } + + private void updateExpectedValuePerRecord(Map> upperBounds, + Map> lowerBounds, + Map expectedNaNCount, + Types.NestedField key, Number val) { + if (NaNUtil.isNaN(val)) { + expectedNaNCount.get(key).incrementAndGet(); + } else if (val != null) { + updateBound(key, val, upperBounds, true); + updateBound(key, val, lowerBounds, false); + } + } + + private void updateBound( + Types.NestedField key, Number val, Map> bounds, boolean isMax) { + bounds.computeIfAbsent(key, k -> new AtomicReference<>(val)).updateAndGet(old -> getMinOrMax(old, val, isMax)); } - private Long getExpectedNaNCount(List expectedRecords, Types.NestedField field) { - return expectedRecords.stream() - .mapToLong(e -> { - Object value = e.getField(field.name()); - if (value == null) { - return 0; - } - if (FLOAT_FIELD.equals(field)) { - return Float.isNaN((Float) value) ? 1 : 0; - } else if (DOUBLE_FIELD.equals(field)) { - return Double.isNaN((Double) value) ? 1 : 0; - } else if (FLOAT_LIST.equals(field)) { - return ((List) value).stream() - .filter(val -> val != null && Float.isNaN(val)) - .count(); - } else if (MAP_FIELD_1.equals(field)) { - return ((Map) value).keySet().stream() - .filter(key -> Float.isNaN(key)) - .count(); - } else if (MAP_FIELD_2.equals(field)) { - return ((Map) value).values().stream() - .filter(val -> val != null && Double.isNaN(val)) - .count(); - } else { - throw new RuntimeException("unknown field name for getting expected NaN count: " + field.name()); - } - }).sum(); + private Number getMinOrMax(Number val1, Number val2, boolean isMax) { + if (val1 instanceof Double) { + return isMax ? Double.max((Double) val1, (Double) val2) : Double.min((Double) val1, (Double) val2); + } else { + return isMax ? Float.max((Float) val1, (Float) val2) : Float.min((Float) val1, (Float) val2); + } } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java index 67b608441399..f4a6a3bdaeac 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java @@ -22,11 +22,14 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.FieldMetrics; @@ -41,6 +44,7 @@ import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Conversions; @@ -129,6 +133,10 @@ private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescripti Map lowerBounds = Maps.newHashMap(); Map upperBounds = Maps.newHashMap(); + Map fieldMetricsMap = Optional.ofNullable(fieldMetricsStream) + .map(stream -> stream.collect(Collectors.toMap(FieldMetrics::id, Function.identity()))) + .orElseGet(HashMap::new); + for (int i = 0; i < colStats.length; i++) { final ColumnStatistics colStat = colStats[i]; final TypeDescription orcCol = orcSchemaWithIds.findSubtype(i); @@ -160,10 +168,10 @@ private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescripti if (metricsMode != MetricsModes.Counts.get()) { Optional orcMin = (colStat.getNumberOfValues() > 0) ? - fromOrcMin(icebergCol.type(), colStat, metricsMode) : Optional.empty(); + fromOrcMin(icebergCol.type(), colStat, metricsMode, fieldMetricsMap.get(fieldId)) : Optional.empty(); orcMin.ifPresent(byteBuffer -> lowerBounds.put(icebergCol.fieldId(), byteBuffer)); Optional orcMax = (colStat.getNumberOfValues() > 0) ? - fromOrcMax(icebergCol.type(), colStat, metricsMode) : Optional.empty(); + fromOrcMax(icebergCol.type(), colStat, metricsMode, fieldMetricsMap.get(fieldId)) : Optional.empty(); orcMax.ifPresent(byteBuffer -> upperBounds.put(icebergCol.fieldId(), byteBuffer)); } } @@ -174,13 +182,13 @@ private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescripti columnSizes, valueCounts, nullCounts, - MetricsUtil.createNanValueCounts(fieldMetricsStream, effectiveMetricsConfig, schema), + MetricsUtil.createNanValueCounts(fieldMetricsMap.values().stream(), effectiveMetricsConfig, schema), lowerBounds, upperBounds); } private static Optional fromOrcMin(Type type, ColumnStatistics columnStats, - MetricsMode metricsMode) { + MetricsMode metricsMode, FieldMetrics fieldMetrics) { Object min = null; if (columnStats instanceof IntegerColumnStatistics) { min = ((IntegerColumnStatistics) columnStats).getMinimum(); @@ -188,10 +196,11 @@ private static Optional fromOrcMin(Type type, ColumnStatistics colum min = Math.toIntExact((long) min); } } else if (columnStats instanceof DoubleColumnStatistics) { - min = ((DoubleColumnStatistics) columnStats).getMinimum(); - if (type.typeId() == Type.TypeID.FLOAT) { - min = ((Double) min).floatValue(); - } + // since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior, + // we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics. + Preconditions.checkNotNull(fieldMetrics, + "[BUG] Float or double type columns should have metrics being tracked by Iceberg Orc writers"); + min = fieldMetrics.lowerBound(); } else if (columnStats instanceof StringColumnStatistics) { min = ((StringColumnStatistics) columnStats).getMinimum(); } else if (columnStats instanceof DecimalColumnStatistics) { @@ -217,7 +226,7 @@ private static Optional fromOrcMin(Type type, ColumnStatistics colum } private static Optional fromOrcMax(Type type, ColumnStatistics columnStats, - MetricsMode metricsMode) { + MetricsMode metricsMode, FieldMetrics fieldMetrics) { Object max = null; if (columnStats instanceof IntegerColumnStatistics) { max = ((IntegerColumnStatistics) columnStats).getMaximum(); @@ -225,10 +234,11 @@ private static Optional fromOrcMax(Type type, ColumnStatistics colum max = Math.toIntExact((long) max); } } else if (columnStats instanceof DoubleColumnStatistics) { - max = ((DoubleColumnStatistics) columnStats).getMaximum(); - if (type.typeId() == Type.TypeID.FLOAT) { - max = ((Double) max).floatValue(); - } + // since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior, + // we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics. + Preconditions.checkNotNull(fieldMetrics, + "[BUG] Float or double type columns should have metrics being tracked by Iceberg Orc writers"); + max = fieldMetrics.upperBound(); } else if (columnStats instanceof StringColumnStatistics) { max = ((StringColumnStatistics) columnStats).getMaximum(); } else if (columnStats instanceof DecimalColumnStatistics) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index fb587b177a15..40c874a578bb 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -23,11 +23,15 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Metrics; @@ -85,6 +89,7 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream fieldMetrics, MetricsConfig metricsConfig, NameMapping nameMapping) { long rowCount = 0; @@ -99,6 +104,10 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream fieldMetricsMap = Optional.ofNullable(fieldMetrics) + .map(stream -> stream.collect(Collectors.toMap(FieldMetrics::id, Function.identity()))) + .orElseGet(HashMap::new); + List blocks = metadata.getBlocks(); for (BlockMetaData block : blocks) { rowCount += block.getRowCount(); @@ -125,7 +134,9 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream min = ParquetConversions.fromParquetPrimitive( @@ -147,9 +158,40 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream idToFieldMetricsMap, MetricsConfig metricsConfig, Schema schema, + Map> lowerBounds, Map> upperBounds) { + idToFieldMetricsMap.entrySet().forEach(entry -> { + int fieldId = entry.getKey(); + FieldMetrics metrics = entry.getValue(); + MetricsMode metricsMode = MetricsUtil.metricsMode(schema, metricsConfig, fieldId); + + // only check for MetricsModes.None, since we don't truncate float/double values. + if (metricsMode != MetricsModes.None.get()) { + if (metrics.upperBound() == null) { + // upper and lower bounds will both null or neither + lowerBounds.remove(fieldId); + upperBounds.remove(fieldId); + } else if (metrics.upperBound() instanceof Float) { + lowerBounds.put(fieldId, Literal.of((Float) metrics.lowerBound())); + upperBounds.put(fieldId, Literal.of((Float) metrics.upperBound())); + } else if (metrics.upperBound() instanceof Double) { + lowerBounds.put(fieldId, Literal.of((Double) metrics.lowerBound())); + upperBounds.put(fieldId, Literal.of((Double) metrics.upperBound())); + } else { + throw new IllegalStateException("[BUG] Only Float/Double column metrics should be collected by Iceberg"); + } + } + }); } private static MessageType getParquetTypeWithIds(ParquetMetadata metadata, NameMapping nameMapping) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 5413b3379387..724b7efba7ec 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -170,50 +170,44 @@ public void writeDouble(int repetitionLevel, double value) { } private static class FloatWriter extends UnboxedWriter { - private final int id; - private long nanCount; + private final FloatFieldMetrics.FloatFieldMetricsContext floatFieldMetricsContext; private FloatWriter(ColumnDescriptor desc) { super(desc); - this.id = desc.getPrimitiveType().getId().intValue(); - this.nanCount = 0; + int id = desc.getPrimitiveType().getId().intValue(); + this.floatFieldMetricsContext = new FloatFieldMetrics.FloatFieldMetricsContext(id); } @Override public void write(int repetitionLevel, Float value) { writeFloat(repetitionLevel, value); - if (Float.isNaN(value)) { - nanCount++; - } + floatFieldMetricsContext.updateMetricsContext(value); } @Override public Stream metrics() { - return Stream.of(new FloatFieldMetrics(id, nanCount)); + return Stream.of(floatFieldMetricsContext.buildMetrics()); } } private static class DoubleWriter extends UnboxedWriter { - private final int id; - private long nanCount; + private final FloatFieldMetrics.DoubleFieldMetricsContext doubleFieldMetricsContext; private DoubleWriter(ColumnDescriptor desc) { super(desc); - this.id = desc.getPrimitiveType().getId().intValue(); - this.nanCount = 0; + int id = desc.getPrimitiveType().getId().intValue(); + this.doubleFieldMetricsContext = new FloatFieldMetrics.DoubleFieldMetricsContext(id); } @Override public void write(int repetitionLevel, Double value) { writeDouble(repetitionLevel, value); - if (Double.isNaN(value)) { - nanCount++; - } + doubleFieldMetricsContext.updateMetricsContext(value); } @Override public Stream metrics() { - return Stream.of(new FloatFieldMetrics(id, nanCount)); + return Stream.of(doubleFieldMetricsContext.buildMetrics()); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index 4b4075070f6f..d77de57f7f41 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -141,52 +141,42 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV } private static class FloatWriter implements SparkOrcValueWriter { - private final int id; - private long nanCount; + private final FloatFieldMetrics.FloatFieldMetricsContext floatFieldMetricsContext; private FloatWriter(int id) { - this.id = id; - this.nanCount = 0; + this.floatFieldMetricsContext = new FloatFieldMetrics.FloatFieldMetricsContext(id); } @Override public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { float floatValue = data.getFloat(column); ((DoubleColumnVector) output).vector[rowId] = floatValue; - - if (Float.isNaN(floatValue)) { - nanCount++; - } + floatFieldMetricsContext.updateMetricsContext(floatValue); } @Override public Stream metrics() { - return Stream.of(new FloatFieldMetrics(id, nanCount)); + return Stream.of(floatFieldMetricsContext.buildMetrics()); } } private static class DoubleWriter implements SparkOrcValueWriter { - private final int id; - private long nanCount; + private final FloatFieldMetrics.DoubleFieldMetricsContext doubleFieldMetricsContext; private DoubleWriter(int id) { - this.id = id; - this.nanCount = 0; + this.doubleFieldMetricsContext = new FloatFieldMetrics.DoubleFieldMetricsContext(id); } @Override public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { double doubleValue = data.getDouble(column); ((DoubleColumnVector) output).vector[rowId] = doubleValue; - - if (Double.isNaN(doubleValue)) { - nanCount++; - } + doubleFieldMetricsContext.updateMetricsContext(doubleValue); } @Override public Stream metrics() { - return Stream.of(new FloatFieldMetrics(id, nanCount)); + return Stream.of(doubleFieldMetricsContext.buildMetrics()); } } From c7ad319e9ad3668fcc2d3d1ed5e15f392e4f081b Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Fri, 18 Jun 2021 17:35:07 -0700 Subject: [PATCH 2/3] update based on comments --- .../apache/iceberg/DoubleFieldMetrics.java | 72 +++++++++++++++++++ .../org/apache/iceberg/FloatFieldMetrics.java | 65 +++++------------ .../iceberg/data/orc/GenericOrcWriters.java | 17 ++--- .../apache/iceberg/TestMergingMetrics.java | 3 +- .../apache/iceberg/parquet/ParquetUtil.java | 21 +++--- .../iceberg/parquet/ParquetValueWriters.java | 17 ++--- .../spark/data/SparkOrcValueWriters.java | 17 ++--- 7 files changed, 128 insertions(+), 84 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/DoubleFieldMetrics.java diff --git a/core/src/main/java/org/apache/iceberg/DoubleFieldMetrics.java b/core/src/main/java/org/apache/iceberg/DoubleFieldMetrics.java new file mode 100644 index 000000000000..1231cc93a114 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/DoubleFieldMetrics.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +/** + * Iceberg internally tracked field level metrics, used by Parquet and ORC writers only. + *

+ * Parquet/ORC keeps track of most metrics in file statistics, and only NaN counter is actually tracked by writers. + * This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing + * exceptions when they are accessed. + */ +public class DoubleFieldMetrics extends FieldMetrics { + + private DoubleFieldMetrics(int id, long nanValueCount, Double lowerBound, Double upperBound) { + super(id, 0L, 0L, nanValueCount, lowerBound, upperBound); + } + + @Override + public long valueCount() { + throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); + } + + @Override + public long nullValueCount() { + throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); + } + + public static class Builder { + private final int id; + private long nanValueCount = 0; + private Double lowerBound = null; + private Double upperBound = null; + + public Builder(int id) { + this.id = id; + } + + public void addValue(double value) { + if (Double.isNaN(value)) { + this.nanValueCount++; + } else { + if (lowerBound == null || Double.compare(value, lowerBound) < 0) { + this.lowerBound = value; + } + if (upperBound == null || Double.compare(value, upperBound) > 0) { + this.upperBound = value; + } + } + } + + public DoubleFieldMetrics build() { + return new DoubleFieldMetrics(id, nanValueCount, lowerBound, upperBound); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java b/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java index f179ffbb38f3..94a78446d44d 100644 --- a/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java +++ b/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java @@ -26,10 +26,10 @@ * This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing * exceptions when they are accessed. */ -public class FloatFieldMetrics extends FieldMetrics { +public class FloatFieldMetrics extends FieldMetrics { - private FloatFieldMetrics(AbstractFloatFieldMetricsContext context) { - super(context.id, 0L, 0L, context.nanValueCount, context.lowerBound, context.upperBound); + private FloatFieldMetrics(int id, long nanValueCount, Float lowerBound, Float upperBound) { + super(id, 0L, 0L, nanValueCount, lowerBound, upperBound); } @Override @@ -42,13 +42,21 @@ public long nullValueCount() { throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); } - public static class FloatFieldMetricsContext extends AbstractFloatFieldMetricsContext { - public FloatFieldMetricsContext(int id) { - super(id); + public Builder builderFor(int id) { + return new Builder(id); + } + + public static class Builder { + private final int id; + private long nanValueCount = 0; + private Float lowerBound = null; + private Float upperBound = null; + + public Builder(int id) { + this.id = id; } - @Override - public void updateMetricsContext(Float value) { + public void addValue(float value) { if (Float.isNaN(value)) { this.nanValueCount++; } else { @@ -60,46 +68,9 @@ public void updateMetricsContext(Float value) { } } } - } - - public static class DoubleFieldMetricsContext extends AbstractFloatFieldMetricsContext { - public DoubleFieldMetricsContext(int id) { - super(id); - } - - @Override - public void updateMetricsContext(Double value) { - if (Double.isNaN(value)) { - this.nanValueCount++; - } else { - if (lowerBound == null || Double.compare(value, lowerBound) < 0) { - this.lowerBound = value; - } - if (upperBound == null || Double.compare(value, upperBound) > 0) { - this.upperBound = value; - } - } - } - } - - @SuppressWarnings("checkstyle:VisibilityModifier") - public abstract static class AbstractFloatFieldMetricsContext { - private final int id; - protected long nanValueCount = 0; - protected T lowerBound = null; - protected T upperBound = null; - - public AbstractFloatFieldMetricsContext(int id) { - this.id = id; - } - - /** - * It is caller's responsibility to ensure input shouldn't be null - */ - public abstract void updateMetricsContext(T value); - public FloatFieldMetrics buildMetrics() { - return new FloatFieldMetrics(this); + public FloatFieldMetrics build() { + return new FloatFieldMetrics(id, nanValueCount, lowerBound, upperBound); } } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index ea13005de471..0410591abcd4 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Stream; +import org.apache.iceberg.DoubleFieldMetrics; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.FloatFieldMetrics; import org.apache.iceberg.orc.OrcValueWriter; @@ -219,10 +220,10 @@ public void nonNullWrite(int rowId, Long data, ColumnVector output) { } private static class FloatWriter implements OrcValueWriter { - private final FloatFieldMetrics.FloatFieldMetricsContext floatFieldMetricsContext; + private final FloatFieldMetrics.Builder floatFieldMetricsBuilder; private FloatWriter(int id) { - this.floatFieldMetricsContext = new FloatFieldMetrics.FloatFieldMetricsContext(id); + this.floatFieldMetricsBuilder = new FloatFieldMetrics.Builder(id); } @Override @@ -233,20 +234,20 @@ public Class getJavaClass() { @Override public void nonNullWrite(int rowId, Float data, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = data; - floatFieldMetricsContext.updateMetricsContext(data); + floatFieldMetricsBuilder.addValue(data); } @Override public Stream metrics() { - return Stream.of(floatFieldMetricsContext.buildMetrics()); + return Stream.of(floatFieldMetricsBuilder.build()); } } private static class DoubleWriter implements OrcValueWriter { - private final FloatFieldMetrics.DoubleFieldMetricsContext doubleFieldMetricsContext; + private final DoubleFieldMetrics.Builder doubleFieldMetricsBuilder; private DoubleWriter(Integer id) { - this.doubleFieldMetricsContext = new FloatFieldMetrics.DoubleFieldMetricsContext(id); + this.doubleFieldMetricsBuilder = new DoubleFieldMetrics.Builder(id); } @Override @@ -257,12 +258,12 @@ public Class getJavaClass() { @Override public void nonNullWrite(int rowId, Double data, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = data; - doubleFieldMetricsContext.updateMetricsContext(data); + doubleFieldMetricsBuilder.addValue(data); } @Override public Stream metrics() { - return Stream.of(doubleFieldMetricsContext.buildMetrics()); + return Stream.of(doubleFieldMetricsBuilder.build()); } } diff --git a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java index 3def5482ece5..d1d59b3ffd74 100644 --- a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java @@ -48,7 +48,6 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; -import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public abstract class TestMergingMetrics { @@ -190,7 +189,7 @@ private void assertBoundValueMatch(Number expected, Map bou int actualFieldId = FIELDS_WITH_NAN_COUNT_TO_ID.get(field); ByteBuffer byteBuffer = boundMap.get(actualFieldId); Type type = SCHEMA.findType(actualFieldId); - assertEquals(String.format("Bound value for field %s must match", field.name()), + Assert.assertEquals(String.format("Bound value for field %s must match", field.name()), expected, byteBuffer == null ? null : Conversions.fromByteBuffer(type, byteBuffer)); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index 40c874a578bb..9e64085a49ee 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -23,12 +23,10 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -44,6 +42,7 @@ import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Conversions; @@ -92,6 +91,8 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream fieldMetrics, MetricsConfig metricsConfig, NameMapping nameMapping) { + Preconditions.checkNotNull(fieldMetrics, "fieldMetrics should not be null"); + long rowCount = 0; Map columnSizes = Maps.newHashMap(); Map valueCounts = Maps.newHashMap(); @@ -104,9 +105,8 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream fieldMetricsMap = Optional.ofNullable(fieldMetrics) - .map(stream -> stream.collect(Collectors.toMap(FieldMetrics::id, Function.identity()))) - .orElseGet(HashMap::new); + Map fieldMetricsMap = fieldMetrics.collect( + Collectors.toMap(FieldMetrics::id, Function.identity())); List blocks = metadata.getBlocks(); for (BlockMetaData block : blocks) { @@ -134,8 +134,8 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream idToFieldMetricsMap, MetricsConfig metricsConfig, Schema schema, Map> lowerBounds, Map> upperBounds) { idToFieldMetricsMap.entrySet().forEach(entry -> { @@ -188,7 +187,7 @@ private static void updateFloatingColumnsBounds( lowerBounds.put(fieldId, Literal.of((Double) metrics.lowerBound())); upperBounds.put(fieldId, Literal.of((Double) metrics.upperBound())); } else { - throw new IllegalStateException("[BUG] Only Float/Double column metrics should be collected by Iceberg"); + throw new UnsupportedOperationException("Expected only float or double column metrics"); } } }); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 724b7efba7ec..06b762b1e87c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -30,6 +30,7 @@ import java.util.function.Function; import java.util.stream.Stream; import org.apache.avro.util.Utf8; +import org.apache.iceberg.DoubleFieldMetrics; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.FloatFieldMetrics; import org.apache.iceberg.deletes.PositionDelete; @@ -170,44 +171,44 @@ public void writeDouble(int repetitionLevel, double value) { } private static class FloatWriter extends UnboxedWriter { - private final FloatFieldMetrics.FloatFieldMetricsContext floatFieldMetricsContext; + private final FloatFieldMetrics.Builder floatFieldMetricsBuilder; private FloatWriter(ColumnDescriptor desc) { super(desc); int id = desc.getPrimitiveType().getId().intValue(); - this.floatFieldMetricsContext = new FloatFieldMetrics.FloatFieldMetricsContext(id); + this.floatFieldMetricsBuilder = new FloatFieldMetrics.Builder(id); } @Override public void write(int repetitionLevel, Float value) { writeFloat(repetitionLevel, value); - floatFieldMetricsContext.updateMetricsContext(value); + floatFieldMetricsBuilder.addValue(value); } @Override public Stream metrics() { - return Stream.of(floatFieldMetricsContext.buildMetrics()); + return Stream.of(floatFieldMetricsBuilder.build()); } } private static class DoubleWriter extends UnboxedWriter { - private final FloatFieldMetrics.DoubleFieldMetricsContext doubleFieldMetricsContext; + private final DoubleFieldMetrics.Builder doubleFieldMetricsBuilder; private DoubleWriter(ColumnDescriptor desc) { super(desc); int id = desc.getPrimitiveType().getId().intValue(); - this.doubleFieldMetricsContext = new FloatFieldMetrics.DoubleFieldMetricsContext(id); + this.doubleFieldMetricsBuilder = new DoubleFieldMetrics.Builder(id); } @Override public void write(int repetitionLevel, Double value) { writeDouble(repetitionLevel, value); - doubleFieldMetricsContext.updateMetricsContext(value); + doubleFieldMetricsBuilder.addValue(value); } @Override public Stream metrics() { - return Stream.of(doubleFieldMetricsContext.buildMetrics()); + return Stream.of(doubleFieldMetricsBuilder.build()); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index d77de57f7f41..c824fc6775e5 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data; import java.util.stream.Stream; +import org.apache.iceberg.DoubleFieldMetrics; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.FloatFieldMetrics; import org.apache.orc.storage.common.type.HiveDecimal; @@ -141,42 +142,42 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV } private static class FloatWriter implements SparkOrcValueWriter { - private final FloatFieldMetrics.FloatFieldMetricsContext floatFieldMetricsContext; + private final FloatFieldMetrics.Builder floatFieldMetricsBuilder; private FloatWriter(int id) { - this.floatFieldMetricsContext = new FloatFieldMetrics.FloatFieldMetricsContext(id); + this.floatFieldMetricsBuilder = new FloatFieldMetrics.Builder(id); } @Override public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { float floatValue = data.getFloat(column); ((DoubleColumnVector) output).vector[rowId] = floatValue; - floatFieldMetricsContext.updateMetricsContext(floatValue); + floatFieldMetricsBuilder.addValue(floatValue); } @Override public Stream metrics() { - return Stream.of(floatFieldMetricsContext.buildMetrics()); + return Stream.of(floatFieldMetricsBuilder.build()); } } private static class DoubleWriter implements SparkOrcValueWriter { - private final FloatFieldMetrics.DoubleFieldMetricsContext doubleFieldMetricsContext; + private final DoubleFieldMetrics.Builder doubleFieldMetricsBuilder; private DoubleWriter(int id) { - this.doubleFieldMetricsContext = new FloatFieldMetrics.DoubleFieldMetricsContext(id); + this.doubleFieldMetricsBuilder = new DoubleFieldMetrics.Builder(id); } @Override public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { double doubleValue = data.getDouble(column); ((DoubleColumnVector) output).vector[rowId] = doubleValue; - doubleFieldMetricsContext.updateMetricsContext(doubleValue); + doubleFieldMetricsBuilder.addValue(doubleValue); } @Override public Stream metrics() { - return Stream.of(doubleFieldMetricsContext.buildMetrics()); + return Stream.of(doubleFieldMetricsBuilder.build()); } } From 43faaab237259d42178f75209cb4b386ec8cba04 Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Wed, 23 Jun 2021 16:43:32 -0700 Subject: [PATCH 3/3] address comments --- .../apache/iceberg/DoubleFieldMetrics.java | 28 ++++++-------- .../java/org/apache/iceberg/FieldMetrics.java | 7 ++++ .../org/apache/iceberg/FloatFieldMetrics.java | 28 ++++++-------- .../java/org/apache/iceberg/MetricsUtil.java | 2 +- .../iceberg/data/orc/GenericOrcWriter.java | 4 +- .../iceberg/data/orc/GenericOrcWriters.java | 32 +++++++++++++--- .../iceberg/flink/data/FlinkOrcWriter.java | 2 +- .../iceberg/flink/data/FlinkOrcWriters.java | 6 +-- .../org/apache/iceberg/orc/OrcMetrics.java | 10 ++--- .../org/apache/iceberg/orc/OrcRowWriter.java | 2 +- .../apache/iceberg/orc/OrcValueWriter.java | 7 +++- .../apache/iceberg/parquet/ParquetUtil.java | 13 +++---- .../iceberg/parquet/ParquetValueWriter.java | 2 +- .../iceberg/parquet/ParquetValueWriters.java | 37 ++++++++++++++++--- .../spark/data/SparkOrcValueWriter.java | 2 +- .../spark/data/SparkOrcValueWriters.java | 8 ++-- .../iceberg/spark/data/SparkOrcWriter.java | 4 +- 17 files changed, 119 insertions(+), 75 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/DoubleFieldMetrics.java b/core/src/main/java/org/apache/iceberg/DoubleFieldMetrics.java index 1231cc93a114..8185f3a9f2a5 100644 --- a/core/src/main/java/org/apache/iceberg/DoubleFieldMetrics.java +++ b/core/src/main/java/org/apache/iceberg/DoubleFieldMetrics.java @@ -28,45 +28,39 @@ */ public class DoubleFieldMetrics extends FieldMetrics { - private DoubleFieldMetrics(int id, long nanValueCount, Double lowerBound, Double upperBound) { - super(id, 0L, 0L, nanValueCount, lowerBound, upperBound); - } - - @Override - public long valueCount() { - throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); - } - - @Override - public long nullValueCount() { - throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); + private DoubleFieldMetrics(int id, long valueCount, long nanValueCount, Double lowerBound, Double upperBound) { + super(id, valueCount, 0L, nanValueCount, lowerBound, upperBound); } public static class Builder { private final int id; + private long valueCount = 0; private long nanValueCount = 0; - private Double lowerBound = null; - private Double upperBound = null; + private double lowerBound = Double.POSITIVE_INFINITY; + private double upperBound = Double.NEGATIVE_INFINITY; public Builder(int id) { this.id = id; } public void addValue(double value) { + this.valueCount++; if (Double.isNaN(value)) { this.nanValueCount++; } else { - if (lowerBound == null || Double.compare(value, lowerBound) < 0) { + if (Double.compare(value, lowerBound) < 0) { this.lowerBound = value; } - if (upperBound == null || Double.compare(value, upperBound) > 0) { + if (Double.compare(value, upperBound) > 0) { this.upperBound = value; } } } public DoubleFieldMetrics build() { - return new DoubleFieldMetrics(id, nanValueCount, lowerBound, upperBound); + boolean hasBound = valueCount - nanValueCount > 0; + return new DoubleFieldMetrics(id, valueCount, nanValueCount, + hasBound ? lowerBound : null, hasBound ? upperBound : null); } } } diff --git a/core/src/main/java/org/apache/iceberg/FieldMetrics.java b/core/src/main/java/org/apache/iceberg/FieldMetrics.java index 9a001b205a18..effcb78cdf58 100644 --- a/core/src/main/java/org/apache/iceberg/FieldMetrics.java +++ b/core/src/main/java/org/apache/iceberg/FieldMetrics.java @@ -86,4 +86,11 @@ public T lowerBound() { public T upperBound() { return upperBound; } + + /** + * Returns if the metrics has bounds (i.e. there is at least non-null value for this field) + */ + public boolean hasBounds() { + return upperBound != null; + } } diff --git a/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java b/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java index 94a78446d44d..5854b0c57393 100644 --- a/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java +++ b/core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java @@ -28,18 +28,8 @@ */ public class FloatFieldMetrics extends FieldMetrics { - private FloatFieldMetrics(int id, long nanValueCount, Float lowerBound, Float upperBound) { - super(id, 0L, 0L, nanValueCount, lowerBound, upperBound); - } - - @Override - public long valueCount() { - throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); - } - - @Override - public long nullValueCount() { - throw new IllegalStateException("Shouldn't access this method, as this metric is tracked in file statistics. "); + private FloatFieldMetrics(int id, long valueCount, long nanValueCount, Float lowerBound, Float upperBound) { + super(id, valueCount, 0L, nanValueCount, lowerBound, upperBound); } public Builder builderFor(int id) { @@ -48,29 +38,33 @@ public Builder builderFor(int id) { public static class Builder { private final int id; + private long valueCount = 0; private long nanValueCount = 0; - private Float lowerBound = null; - private Float upperBound = null; + private float lowerBound = Float.POSITIVE_INFINITY; + private float upperBound = Float.NEGATIVE_INFINITY; public Builder(int id) { this.id = id; } public void addValue(float value) { + this.valueCount++; if (Float.isNaN(value)) { this.nanValueCount++; } else { - if (lowerBound == null || Float.compare(value, lowerBound) < 0) { + if (Float.compare(value, lowerBound) < 0) { this.lowerBound = value; } - if (upperBound == null || Float.compare(value, upperBound) > 0) { + if (Float.compare(value, upperBound) > 0) { this.upperBound = value; } } } public FloatFieldMetrics build() { - return new FloatFieldMetrics(id, nanValueCount, lowerBound, upperBound); + boolean hasBound = valueCount - nanValueCount > 0; + return new FloatFieldMetrics(id, valueCount, nanValueCount, + hasBound ? lowerBound : null, hasBound ? upperBound : null); } } } diff --git a/core/src/main/java/org/apache/iceberg/MetricsUtil.java b/core/src/main/java/org/apache/iceberg/MetricsUtil.java index 5f5b1c659920..8cddcab0902a 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsUtil.java +++ b/core/src/main/java/org/apache/iceberg/MetricsUtil.java @@ -34,7 +34,7 @@ private MetricsUtil() { * Construct mapping relationship between column id to NaN value counts from input metrics and metrics config. */ public static Map createNanValueCounts( - Stream fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) { + Stream> fieldMetrics, MetricsConfig metricsConfig, Schema inputSchema) { Preconditions.checkNotNull(metricsConfig, "metricsConfig is required"); if (fieldMetrics == null || inputSchema == null) { diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index a04d777df1d6..29426bc97566 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -129,7 +129,7 @@ public void write(Record value, VectorizedRowBatch output) { } @Override - public Stream metrics() { + public Stream> metrics() { return writer.metrics(); } @@ -160,7 +160,7 @@ public void nonNullWrite(int rowId, Record data, ColumnVector output) { } @Override - public Stream metrics() { + public Stream> metrics() { return writers.stream().flatMap(OrcValueWriter::metrics); } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index 0410591abcd4..7efa1613de97 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -221,6 +221,7 @@ public void nonNullWrite(int rowId, Long data, ColumnVector output) { private static class FloatWriter implements OrcValueWriter { private final FloatFieldMetrics.Builder floatFieldMetricsBuilder; + private long nullValueCount = 0; private FloatWriter(int id) { this.floatFieldMetricsBuilder = new FloatFieldMetrics.Builder(id); @@ -238,13 +239,23 @@ public void nonNullWrite(int rowId, Float data, ColumnVector output) { } @Override - public Stream metrics() { - return Stream.of(floatFieldMetricsBuilder.build()); + public void nullWrite() { + nullValueCount++; + } + + @Override + public Stream> metrics() { + FieldMetrics metricsWithoutNullCount = floatFieldMetricsBuilder.build(); + return Stream.of(new FieldMetrics<>(metricsWithoutNullCount.id(), + metricsWithoutNullCount.valueCount() + nullValueCount, + nullValueCount, metricsWithoutNullCount.nanValueCount(), + metricsWithoutNullCount.lowerBound(), metricsWithoutNullCount.upperBound())); } } private static class DoubleWriter implements OrcValueWriter { private final DoubleFieldMetrics.Builder doubleFieldMetricsBuilder; + private long nullValueCount = 0; private DoubleWriter(Integer id) { this.doubleFieldMetricsBuilder = new DoubleFieldMetrics.Builder(id); @@ -262,8 +273,17 @@ public void nonNullWrite(int rowId, Double data, ColumnVector output) { } @Override - public Stream metrics() { - return Stream.of(doubleFieldMetricsBuilder.build()); + public void nullWrite() { + nullValueCount++; + } + + @Override + public Stream> metrics() { + FieldMetrics metricsWithoutNullCount = doubleFieldMetricsBuilder.build(); + return Stream.of(new FieldMetrics<>(metricsWithoutNullCount.id(), + metricsWithoutNullCount.valueCount() + nullValueCount, + nullValueCount, metricsWithoutNullCount.nanValueCount(), + metricsWithoutNullCount.lowerBound(), metricsWithoutNullCount.upperBound())); } } @@ -462,7 +482,7 @@ public void nonNullWrite(int rowId, List value, ColumnVector output) { } @Override - public Stream metrics() { + public Stream> metrics() { return element.metrics(); } } @@ -506,7 +526,7 @@ public void nonNullWrite(int rowId, Map map, ColumnVector output) { } @Override - public Stream metrics() { + public Stream> metrics() { return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java index 9aff0c127449..81f9822815b6 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java @@ -68,7 +68,7 @@ public void write(RowData row, VectorizedRowBatch output) { } @Override - public Stream metrics() { + public Stream> metrics() { return writer.metrics(); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index 758d73d87e9b..38a348995f00 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -258,7 +258,7 @@ public void nonNullWrite(int rowId, ArrayData data, ColumnVector output) { } @Override - public Stream metrics() { + public Stream> metrics() { return elementWriter.metrics(); } @@ -306,7 +306,7 @@ public void nonNullWrite(int rowId, MapData data, ColumnVector output) { } @Override - public Stream metrics() { + public Stream> metrics() { return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); } } @@ -344,7 +344,7 @@ public void nonNullWrite(int rowId, RowData data, ColumnVector output) { } @Override - public Stream metrics() { + public Stream> metrics() { return writers.stream().flatMap(OrcValueWriter::metrics); } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java index f4a6a3bdaeac..0fad4dae139e 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java @@ -96,7 +96,7 @@ static Metrics fromInputFile(InputFile file, Configuration config, MetricsConfig } } - static Metrics fromWriter(Writer writer, Stream fieldMetricsStream, MetricsConfig metricsConfig) { + static Metrics fromWriter(Writer writer, Stream> fieldMetricsStream, MetricsConfig metricsConfig) { try { return buildOrcMetrics(writer.getNumberOfRows(), writer.getSchema(), writer.getStatistics(), fieldMetricsStream, metricsConfig, null); @@ -107,7 +107,7 @@ static Metrics fromWriter(Writer writer, Stream fieldMetricsStream private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescription orcSchema, final ColumnStatistics[] colStats, - final Stream fieldMetricsStream, + final Stream> fieldMetricsStream, final MetricsConfig metricsConfig, final NameMapping mapping) { final TypeDescription orcSchemaWithIds = (!ORCSchemaUtil.hasIds(orcSchema) && mapping != null) ? @@ -133,7 +133,7 @@ private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescripti Map lowerBounds = Maps.newHashMap(); Map upperBounds = Maps.newHashMap(); - Map fieldMetricsMap = Optional.ofNullable(fieldMetricsStream) + Map> fieldMetricsMap = Optional.ofNullable(fieldMetricsStream) .map(stream -> stream.collect(Collectors.toMap(FieldMetrics::id, Function.identity()))) .orElseGet(HashMap::new); @@ -188,7 +188,7 @@ private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescripti } private static Optional fromOrcMin(Type type, ColumnStatistics columnStats, - MetricsMode metricsMode, FieldMetrics fieldMetrics) { + MetricsMode metricsMode, FieldMetrics fieldMetrics) { Object min = null; if (columnStats instanceof IntegerColumnStatistics) { min = ((IntegerColumnStatistics) columnStats).getMinimum(); @@ -226,7 +226,7 @@ private static Optional fromOrcMin(Type type, ColumnStatistics colum } private static Optional fromOrcMax(Type type, ColumnStatistics columnStats, - MetricsMode metricsMode, FieldMetrics fieldMetrics) { + MetricsMode metricsMode, FieldMetrics fieldMetrics) { Object max = null; if (columnStats instanceof IntegerColumnStatistics) { max = ((IntegerColumnStatistics) columnStats).getMaximum(); diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java index 413634e3e100..1f0ea13dc685 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java @@ -41,7 +41,7 @@ public interface OrcRowWriter { /** * Returns a stream of {@link FieldMetrics} that this OrcRowWriter keeps track of. */ - default Stream metrics() { + default Stream> metrics() { return Stream.empty(); } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java index 2f72fc20e053..b6030abb7a78 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java @@ -38,6 +38,7 @@ default void write(int rowId, T data, ColumnVector output) { if (data == null) { output.noNulls = false; output.isNull[rowId] = true; + nullWrite(); } else { output.isNull[rowId] = false; nonNullWrite(rowId, data, output); @@ -46,10 +47,14 @@ default void write(int rowId, T data, ColumnVector output) { void nonNullWrite(int rowId, T data, ColumnVector output); + default void nullWrite() { + // no op + } + /** * Returns a stream of {@link FieldMetrics} that this OrcValueWriter keeps track of. */ - default Stream metrics() { + default Stream> metrics() { return Stream.empty(); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index 9e64085a49ee..b7dae8a6a0bd 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -83,13 +83,13 @@ public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig, N } } - public static Metrics footerMetrics(ParquetMetadata metadata, Stream fieldMetrics, + public static Metrics footerMetrics(ParquetMetadata metadata, Stream> fieldMetrics, MetricsConfig metricsConfig) { return footerMetrics(metadata, fieldMetrics, metricsConfig, null); } @SuppressWarnings("checkstyle:CyclomaticComplexity") - public static Metrics footerMetrics(ParquetMetadata metadata, Stream fieldMetrics, + public static Metrics footerMetrics(ParquetMetadata metadata, Stream> fieldMetrics, MetricsConfig metricsConfig, NameMapping nameMapping) { Preconditions.checkNotNull(fieldMetrics, "fieldMetrics should not be null"); @@ -105,7 +105,7 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream fieldMetricsMap = fieldMetrics.collect( + Map> fieldMetricsMap = fieldMetrics.collect( Collectors.toMap(FieldMetrics::id, Function.identity())); List blocks = metadata.getBlocks(); @@ -167,17 +167,16 @@ public static Metrics footerMetrics(ParquetMetadata metadata, Stream idToFieldMetricsMap, MetricsConfig metricsConfig, Schema schema, + Map> idToFieldMetricsMap, MetricsConfig metricsConfig, Schema schema, Map> lowerBounds, Map> upperBounds) { idToFieldMetricsMap.entrySet().forEach(entry -> { int fieldId = entry.getKey(); - FieldMetrics metrics = entry.getValue(); + FieldMetrics metrics = entry.getValue(); MetricsMode metricsMode = MetricsUtil.metricsMode(schema, metricsConfig, fieldId); // only check for MetricsModes.None, since we don't truncate float/double values. if (metricsMode != MetricsModes.None.get()) { - if (metrics.upperBound() == null) { - // upper and lower bounds will both null or neither + if (!metrics.hasBounds()) { lowerBounds.remove(fieldId); upperBounds.remove(fieldId); } else if (metrics.upperBound() instanceof Float) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java index 7692ee58028d..fa9dcb7d0237 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java @@ -34,7 +34,7 @@ public interface ParquetValueWriter { /** * Returns a stream of {@link FieldMetrics} that this ParquetValueWriter keeps track of. */ - default Stream metrics() { + default Stream> metrics() { return Stream.empty(); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 06b762b1e87c..1911d40467c6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.util.Utf8; import org.apache.iceberg.DoubleFieldMetrics; @@ -186,7 +187,7 @@ public void write(int repetitionLevel, Float value) { } @Override - public Stream metrics() { + public Stream> metrics() { return Stream.of(floatFieldMetricsBuilder.build()); } } @@ -207,7 +208,7 @@ public void write(int repetitionLevel, Double value) { } @Override - public Stream metrics() { + public Stream> metrics() { return Stream.of(doubleFieldMetricsBuilder.build()); } } @@ -327,6 +328,7 @@ static class OptionWriter implements ParquetValueWriter { private final int definitionLevel; private final ParquetValueWriter writer; private final List> children; + private long nullValueCount = 0; OptionWriter(int definitionLevel, ParquetValueWriter writer) { this.definitionLevel = definitionLevel; @@ -340,6 +342,7 @@ public void write(int repetitionLevel, T value) { writer.write(repetitionLevel, value); } else { + nullValueCount++; for (TripleWriter column : children) { column.writeNull(repetitionLevel, definitionLevel - 1); } @@ -357,7 +360,29 @@ public void setColumnStore(ColumnWriteStore columnStore) { } @Override - public Stream metrics() { + public Stream> metrics() { + if (writer instanceof PrimitiveWriter) { + List> fieldMetricsFromWriter = writer.metrics().collect(Collectors.toList()); + + if (fieldMetricsFromWriter.size() == 0) { + // we are not tracking field metrics for this type ourselves + return Stream.empty(); + } else if (fieldMetricsFromWriter.size() == 1) { + FieldMetrics metrics = fieldMetricsFromWriter.get(0); + return Stream.of( + new FieldMetrics<>(metrics.id(), + metrics.valueCount() + nullValueCount, nullValueCount, + metrics.nanValueCount(), metrics.lowerBound(), metrics.upperBound()) + ); + } else { + throw new IllegalStateException(String.format( + "OptionWriter should only expect at most one field metric from a primitive writer." + + "Current number of fields: %s, primitive writer type: %s", + fieldMetricsFromWriter.size(), writer.getClass().getSimpleName())); + } + } + + // skipping updating null stats for non-primitive types since we don't use them today, to avoid unnecessary work return writer.metrics(); } } @@ -416,7 +441,7 @@ public void setColumnStore(ColumnWriteStore columnStore) { protected abstract Iterator elements(L value); @Override - public Stream metrics() { + public Stream> metrics() { return writer.metrics(); } } @@ -494,7 +519,7 @@ public void setColumnStore(ColumnWriteStore columnStore) { protected abstract Iterator> pairs(M value); @Override - public Stream metrics() { + public Stream> metrics() { return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); } } @@ -553,7 +578,7 @@ public void setColumnStore(ColumnWriteStore columnStore) { protected abstract Object get(S struct, int index); @Override - public Stream metrics() { + public Stream> metrics() { return Arrays.stream(writers).flatMap(ParquetValueWriter::metrics); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java index 131a93d6f9d9..b4124468687f 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java @@ -53,7 +53,7 @@ default void write(int rowId, int column, SpecializedGetters data, ColumnVector * counters, and only return non-empty stream if the writer writes double or float values either by itself or * transitively. */ - default Stream metrics() { + default Stream> metrics() { return Stream.empty(); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index c824fc6775e5..df1b079bc7fa 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -156,7 +156,7 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV } @Override - public Stream metrics() { + public Stream> metrics() { return Stream.of(floatFieldMetricsBuilder.build()); } } @@ -176,7 +176,7 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV } @Override - public Stream metrics() { + public Stream> metrics() { return Stream.of(doubleFieldMetricsBuilder.build()); } } @@ -272,7 +272,7 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV } @Override - public Stream metrics() { + public Stream> metrics() { return writer.metrics(); } } @@ -308,7 +308,7 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV } @Override - public Stream metrics() { + public Stream> metrics() { return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index ce1b2bec0ec1..73c391ad07c3 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -65,7 +65,7 @@ public void write(InternalRow value, VectorizedRowBatch output) { } @Override - public Stream metrics() { + public Stream> metrics() { return writer.metrics(); } @@ -146,7 +146,7 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV } @Override - public Stream metrics() { + public Stream> metrics() { return writers.stream().flatMap(SparkOrcValueWriter::metrics); }