diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java index 38e7781abe53..f15ccc869513 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java @@ -44,7 +44,6 @@ import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Conversions; @@ -196,11 +195,17 @@ private static Optional fromOrcMin(Type type, ColumnStatistics colum min = Math.toIntExact((long) min); } } else if (columnStats instanceof DoubleColumnStatistics) { - // since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior, - // we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics. - Preconditions.checkNotNull(fieldMetrics, - "[BUG] Float or double type columns should have metrics being tracked by Iceberg Orc writers"); - min = fieldMetrics.lowerBound(); + if (fieldMetrics != null) { + // since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior, + // we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics. + min = fieldMetrics.lowerBound(); + } else { + // imported files will not have metrics that were tracked by Iceberg, so fall back to the file's metrics. + min = replaceNaN(((DoubleColumnStatistics) columnStats).getMinimum(), Double.NEGATIVE_INFINITY); + if (type.typeId() == Type.TypeID.FLOAT) { + min = ((Double) min).floatValue(); + } + } } else if (columnStats instanceof StringColumnStatistics) { min = ((StringColumnStatistics) columnStats).getMinimum(); } else if (columnStats instanceof DecimalColumnStatistics) { @@ -234,11 +239,17 @@ private static Optional fromOrcMax(Type type, ColumnStatistics colum max = Math.toIntExact((long) max); } } else if (columnStats instanceof DoubleColumnStatistics) { - // since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior, - // we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics. - Preconditions.checkNotNull(fieldMetrics, - "[BUG] Float or double type columns should have metrics being tracked by Iceberg Orc writers"); - max = fieldMetrics.upperBound(); + if (fieldMetrics != null) { + // since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior, + // we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics. + max = fieldMetrics.upperBound(); + } else { + // imported files will not have metrics that were tracked by Iceberg, so fall back to the file's metrics. + max = replaceNaN(((DoubleColumnStatistics) columnStats).getMaximum(), Double.POSITIVE_INFINITY); + if (type.typeId() == Type.TypeID.FLOAT) { + max = ((Double) max).floatValue(); + } + } } else if (columnStats instanceof StringColumnStatistics) { max = ((StringColumnStatistics) columnStats).getMaximum(); } else if (columnStats instanceof DecimalColumnStatistics) { @@ -262,6 +273,10 @@ private static Optional fromOrcMax(Type type, ColumnStatistics colum return Optional.ofNullable(Conversions.toByteBuffer(type, truncateIfNeeded(Bound.UPPER, type, max, metricsMode))); } + private static Object replaceNaN(double value, double replacement) { + return Double.isNaN(value) ? replacement : value; + } + private static Object truncateIfNeeded(Bound bound, Type type, Object value, MetricsMode metricsMode) { // Out of the two types which could be truncated, string or binary, ORC only supports string bounds. // Therefore, truncation will be applied if needed only on string type. diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle index c337036bc500..f630b17bc535 100644 --- a/spark/v3.0/build.gradle +++ b/spark/v3.0/build.gradle @@ -126,6 +126,8 @@ project(":iceberg-spark:iceberg-spark3-extensions") { compileOnly project(path: ':iceberg-bundled-guava', configuration: 'shadow') compileOnly project(':iceberg-api') compileOnly project(':iceberg-core') + compileOnly project(':iceberg-data') + compileOnly project(':iceberg-orc') compileOnly project(':iceberg-common') compileOnly project(':iceberg-spark') compileOnly project(':iceberg-spark:iceberg-spark3') @@ -137,6 +139,8 @@ project(":iceberg-spark:iceberg-spark3-extensions") { testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') + testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts') + testImplementation project(path: ':iceberg-orc', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-spark', configuration: 'testArtifacts') diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index 7f5c7df43101..2f7ef0db779e 100644 --- a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -33,8 +33,16 @@ import org.apache.avro.io.DatumWriter; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -51,6 +59,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static org.apache.iceberg.types.Types.NestedField.optional; + public class TestAddFilesProcedure extends SparkExtensionsTestBase { private final String sourceTableName = "source_table"; @@ -646,6 +656,42 @@ public void duplicateDataUnpartitionedAllowed() { } + @Test + public void addOrcFileWithDoubleAndFloatColumns() throws Exception { + // Spark Session Catalog cannot load metadata tables + // with "The namespace in session catalog must have exactly one name part" + Assume.assumeFalse(catalogName.equals("spark_catalog")); + + // Create an ORC file + File outputFile = temp.newFile("test.orc"); + final int numRows = 5; + List expectedRecords = createOrcFile(outputFile, numRows); + String createIceberg = + "CREATE TABLE %s (x float, y double, z long) USING iceberg"; + sql(createIceberg, tableName); + + Object result = scalarSql("CALL %s.system.add_files('%s', '`orc`.`%s`')", + catalogName, tableName, outputFile.getPath()); + Assert.assertEquals(1L, result); + + List expected = expectedRecords.stream() + .map(record -> new Object[]{record.get(0), record.get(1), record.get(2)}) + .collect(Collectors.toList()); + + // x goes 2.00, 1.99, 1.98, ... + assertEquals("Iceberg table contains correct data", + expected, + sql("SELECT * FROM %s ORDER BY x DESC", tableName)); + + List actualRecordCount = sql("select %s from %s.files", + DataFile.RECORD_COUNT.name(), + tableName); + List expectedRecordCount = Lists.newArrayList(); + expectedRecordCount.add(new Object[]{(long) numRows}); + assertEquals("Iceberg file metadata should have correct metadata count", + expectedRecordCount, actualRecordCount); + } + private static final StructField[] struct = { new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), new StructField("name", DataTypes.StringType, false, Metadata.empty()), @@ -735,4 +781,36 @@ private void createPartitionedHiveTable() { partitionedDF.write().insertInto(sourceTableName); partitionedDF.write().insertInto(sourceTableName); } + + // Update this to not write a file for import using Iceberg's ID numbers + public List createOrcFile(File orcFile, int numRows) throws IOException { + // Needs to be deleted but depend on the rule to delete the file for us again at the end. + if (orcFile.exists()) { + orcFile.delete(); + } + final org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema( + optional(1, "x", Types.FloatType.get()), + optional(2, "y", Types.DoubleType.get()), + optional(3, "z", Types.LongType.get()) + ); + + List records = Lists.newArrayListWithExpectedSize(numRows); + for (int i = 0; i < numRows; i += 1) { + Record record = org.apache.iceberg.data.GenericRecord.create(icebergSchema); + record.setField("x", ((float) (100 - i)) / 100F + 1.0F); // 2.0f, 1.99f, 1.98f, ... + record.setField("y", ((double) i) / 100.0D + 2.0D); // 2.0d, 2.01d, 2.02d, ... + record.setField("z", 5_000_000_000L + i); + records.add(record); + } + + OutputFile outFile = Files.localOutput(orcFile); + try (FileAppender appender = org.apache.iceberg.orc.ORC.write(outFile) + .schema(icebergSchema) + .metricsConfig(MetricsConfig.fromProperties(ImmutableMap.of("write.metadata.metrics.default", "none"))) + .createWriterFunc(GenericOrcWriter::buildWriter) + .build()) { + appender.addAll(records); + } + return records; + } }