Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
Expand Down Expand Up @@ -196,11 +195,17 @@ private static Optional<ByteBuffer> fromOrcMin(Type type, ColumnStatistics colum
min = Math.toIntExact((long) min);
}
} else if (columnStats instanceof DoubleColumnStatistics) {
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
Preconditions.checkNotNull(fieldMetrics,
"[BUG] Float or double type columns should have metrics being tracked by Iceberg Orc writers");
min = fieldMetrics.lowerBound();
if (fieldMetrics != null) {
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
min = fieldMetrics.lowerBound();
} else {
// imported files will not have metrics that were tracked by Iceberg, so fall back to the file's metrics.
min = replaceNaN(((DoubleColumnStatistics) columnStats).getMinimum(), Double.NEGATIVE_INFINITY);
if (type.typeId() == Type.TypeID.FLOAT) {
min = ((Double) min).floatValue();
}
}
} else if (columnStats instanceof StringColumnStatistics) {
min = ((StringColumnStatistics) columnStats).getMinimum();
} else if (columnStats instanceof DecimalColumnStatistics) {
Expand Down Expand Up @@ -234,11 +239,17 @@ private static Optional<ByteBuffer> fromOrcMax(Type type, ColumnStatistics colum
max = Math.toIntExact((long) max);
}
} else if (columnStats instanceof DoubleColumnStatistics) {
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
Preconditions.checkNotNull(fieldMetrics,
"[BUG] Float or double type columns should have metrics being tracked by Iceberg Orc writers");
max = fieldMetrics.upperBound();
if (fieldMetrics != null) {
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
max = fieldMetrics.upperBound();
} else {
// imported files will not have metrics that were tracked by Iceberg, so fall back to the file's metrics.
max = replaceNaN(((DoubleColumnStatistics) columnStats).getMaximum(), Double.POSITIVE_INFINITY);
if (type.typeId() == Type.TypeID.FLOAT) {
max = ((Double) max).floatValue();
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested without this fix and encountered the error, so I'm sure that the test does in fact hit the problem.

} else if (columnStats instanceof StringColumnStatistics) {
max = ((StringColumnStatistics) columnStats).getMaximum();
} else if (columnStats instanceof DecimalColumnStatistics) {
Expand All @@ -262,6 +273,10 @@ private static Optional<ByteBuffer> fromOrcMax(Type type, ColumnStatistics colum
return Optional.ofNullable(Conversions.toByteBuffer(type, truncateIfNeeded(Bound.UPPER, type, max, metricsMode)));
}

private static Object replaceNaN(double value, double replacement) {
return Double.isNaN(value) ? replacement : value;
}

private static Object truncateIfNeeded(Bound bound, Type type, Object value, MetricsMode metricsMode) {
// Out of the two types which could be truncated, string or binary, ORC only supports string bounds.
// Therefore, truncation will be applied if needed only on string type.
Expand Down
4 changes: 4 additions & 0 deletions spark/v3.0/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ project(":iceberg-spark:iceberg-spark3-extensions") {
compileOnly project(path: ':iceberg-bundled-guava', configuration: 'shadow')
compileOnly project(':iceberg-api')
compileOnly project(':iceberg-core')
compileOnly project(':iceberg-data')
compileOnly project(':iceberg-orc')
compileOnly project(':iceberg-common')
compileOnly project(':iceberg-spark')
compileOnly project(':iceberg-spark:iceberg-spark3')
Expand All @@ -137,6 +139,8 @@ project(":iceberg-spark:iceberg-spark3-extensions") {

testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')

testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-orc', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-spark', configuration: 'testArtifacts')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@
import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
Expand All @@ -51,6 +59,8 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import static org.apache.iceberg.types.Types.NestedField.optional;

public class TestAddFilesProcedure extends SparkExtensionsTestBase {

private final String sourceTableName = "source_table";
Expand Down Expand Up @@ -646,6 +656,42 @@ public void duplicateDataUnpartitionedAllowed() {

}

@Test
public void addOrcFileWithDoubleAndFloatColumns() throws Exception {
// Spark Session Catalog cannot load metadata tables
// with "The namespace in session catalog must have exactly one name part"
Assume.assumeFalse(catalogName.equals("spark_catalog"));

// Create an ORC file
File outputFile = temp.newFile("test.orc");
final int numRows = 5;
List<Record> expectedRecords = createOrcFile(outputFile, numRows);
String createIceberg =
"CREATE TABLE %s (x float, y double, z long) USING iceberg";
sql(createIceberg, tableName);

Object result = scalarSql("CALL %s.system.add_files('%s', '`orc`.`%s`')",
catalogName, tableName, outputFile.getPath());
Assert.assertEquals(1L, result);

List<Object[]> expected = expectedRecords.stream()
.map(record -> new Object[]{record.get(0), record.get(1), record.get(2)})
.collect(Collectors.toList());

// x goes 2.00, 1.99, 1.98, ...
assertEquals("Iceberg table contains correct data",
expected,
sql("SELECT * FROM %s ORDER BY x DESC", tableName));

List<Object[]> actualRecordCount = sql("select %s from %s.files",
DataFile.RECORD_COUNT.name(),
tableName);
List<Object[]> expectedRecordCount = Lists.newArrayList();
expectedRecordCount.add(new Object[]{(long) numRows});
assertEquals("Iceberg file metadata should have correct metadata count",
expectedRecordCount, actualRecordCount);
}

private static final StructField[] struct = {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("name", DataTypes.StringType, false, Metadata.empty()),
Expand Down Expand Up @@ -735,4 +781,36 @@ private void createPartitionedHiveTable() {
partitionedDF.write().insertInto(sourceTableName);
partitionedDF.write().insertInto(sourceTableName);
}

// Update this to not write a file for import using Iceberg's ID numbers
public List<Record> createOrcFile(File orcFile, int numRows) throws IOException {
// Needs to be deleted but depend on the rule to delete the file for us again at the end.
if (orcFile.exists()) {
orcFile.delete();
}
final org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema(
optional(1, "x", Types.FloatType.get()),
optional(2, "y", Types.DoubleType.get()),
optional(3, "z", Types.LongType.get())
);

List<Record> records = Lists.newArrayListWithExpectedSize(numRows);
for (int i = 0; i < numRows; i += 1) {
Record record = org.apache.iceberg.data.GenericRecord.create(icebergSchema);
record.setField("x", ((float) (100 - i)) / 100F + 1.0F); // 2.0f, 1.99f, 1.98f, ...
record.setField("y", ((double) i) / 100.0D + 2.0D); // 2.0d, 2.01d, 2.02d, ...
record.setField("z", 5_000_000_000L + i);
records.add(record);
}

OutputFile outFile = Files.localOutput(orcFile);
try (FileAppender<Record> appender = org.apache.iceberg.orc.ORC.write(outFile)
.schema(icebergSchema)
.metricsConfig(MetricsConfig.fromProperties(ImmutableMap.of("write.metadata.metrics.default", "none")))
.createWriterFunc(GenericOrcWriter::buildWriter)
.build()) {
appender.addAll(records);
}
return records;
}
}