diff --git a/build.gradle b/build.gradle index 46eeb2b52405..79bb8a327f80 100644 --- a/build.gradle +++ b/build.gradle @@ -449,6 +449,7 @@ project(':iceberg-orc') { dependencies { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') api project(':iceberg-api') + implementation project(':iceberg-common') implementation project(':iceberg-core') implementation("org.apache.avro:avro") { exclude group: 'org.tukaani' // xz compression is not supported diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index 46f997e4e7e1..dbcfe1f97e2d 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -275,9 +275,7 @@ private void openCurrent() { } private boolean shouldRollToNewFile() { - // TODO: ORC file now not support target file size before closed - return !format.equals(FileFormat.ORC) && - currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize; + return currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize; } private void closeCurrent() throws IOException { diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java index a6982cdb8153..a4acd2f7b2ef 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java @@ -24,7 +24,6 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; -import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; /** @@ -52,13 +51,7 @@ public ClusteredDataWriter(FileWriterFactory writerFactory, OutputFileFactory @Override protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { - // TODO: support ORC rolling writers - if (fileFormat == FileFormat.ORC) { - EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); - return writerFactory.newDataWriter(outputFile, spec, partition); - } else { - return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); - } + return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java index 385d1a5d6200..976165f50e4f 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java @@ -24,7 +24,6 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; -import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -53,13 +52,7 @@ public ClusteredEqualityDeleteWriter(FileWriterFactory writerFactory, OutputF @Override protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { - // TODO: support ORC rolling writers - if (fileFormat == FileFormat.ORC) { - EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); - return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition); - } else { - return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); - } + return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java index ea118388b3ba..53336fff7e16 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java @@ -25,7 +25,6 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.PositionDelete; -import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.CharSequenceSet; @@ -56,13 +55,7 @@ public ClusteredPositionDeleteWriter(FileWriterFactory writerFactory, OutputF @Override protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { - // TODO: support ORC rolling writers - if (fileFormat == FileFormat.ORC) { - EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); - return writerFactory.newPositionDeleteWriter(outputFile, spec, partition); - } else { - return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); - } + return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java index d6e16a707b36..54ccff0e327f 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java @@ -24,7 +24,6 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; -import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; /** @@ -52,13 +51,7 @@ public FanoutDataWriter(FileWriterFactory writerFactory, OutputFileFactory fi @Override protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { - // TODO: support ORC rolling writers - if (fileFormat == FileFormat.ORC) { - EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); - return writerFactory.newDataWriter(outputFile, spec, partition); - } else { - return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); - } + return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); } @Override diff --git a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java index 39771fad2f21..9bb762727ca0 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java @@ -58,6 +58,7 @@ public class TestBaseTaskWriter extends TableTestBase { public static Object[][] parameters() { return new Object[][] { {"avro"}, + {"orc"}, {"parquet"} }; } diff --git a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java index efa756f0781d..5c11d345e82f 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java @@ -30,7 +30,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,8 +38,6 @@ @RunWith(Parameterized.class) public abstract class TestRollingFileWriters extends WriterTestBase { - // TODO: add ORC once we support ORC rolling file writers - @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}") public static Object[] parameters() { return new Object[][] { @@ -48,6 +45,8 @@ public static Object[] parameters() { new Object[]{FileFormat.AVRO, true}, new Object[]{FileFormat.PARQUET, false}, new Object[]{FileFormat.PARQUET, true}, + new Object[]{FileFormat.ORC, false}, + new Object[]{FileFormat.ORC, true} }; } @@ -129,8 +128,6 @@ public void testRollingDataWriterSplitData() throws IOException { @Test public void testRollingEqualityDeleteWriterNoRecords() throws IOException { - Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); - List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); Schema equalityDeleteRowSchema = table.schema().select("id"); FileWriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java index 397bd6e88c69..03cdcd80dec1 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.CoreOptions; @@ -50,7 +51,6 @@ import org.apache.iceberg.types.Types; import org.junit.After; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -322,7 +322,6 @@ public void testRewriteLargeTableHasResiduals() throws IOException { */ @Test public void testRewriteAvoidRepeateCompress() throws IOException { - Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC)); List expected = Lists.newArrayList(); Schema schema = icebergTableUnPartitioned.schema(); GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema); @@ -331,7 +330,7 @@ public void testRewriteAvoidRepeateCompress() throws IOException { try (FileAppender fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) { long filesize = 20000; for (; fileAppender.length() < filesize; count++) { - Record record = SimpleDataUtil.createRecord(count, "iceberg"); + Record record = SimpleDataUtil.createRecord(count, UUID.randomUUID().toString()); fileAppender.add(record); expected.add(record); } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index 741977541c08..86c2f6672c3c 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -233,10 +233,6 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { @Test public void testTableWithTargetFileSize() throws Exception { - // TODO: ORC file does not support target file size before closed. - if (format == FileFormat.ORC) { - return; - } // Adjust the target-file-size in table properties. table.updateProperties() .set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index 562a75e53773..2595b098dfea 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -183,10 +183,6 @@ public void testCompleteFiles() throws IOException { @Test public void testRollingWithTargetFileSize() throws IOException { - // TODO ORC don't support target file size before closed. - if (format == FileFormat.ORC) { - return; - } try (TaskWriter taskWriter = createTaskWriter(4)) { List rows = Lists.newArrayListWithCapacity(8000); List records = Lists.newArrayListWithCapacity(8000); diff --git a/orc/src/main/java/org/apache/iceberg/orc/EstimateOrcAvgWidthVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/EstimateOrcAvgWidthVisitor.java new file mode 100644 index 000000000000..af0c3b6a2f77 --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/EstimateOrcAvgWidthVisitor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import java.util.List; +import java.util.Optional; +import org.apache.orc.TypeDescription; + +public class EstimateOrcAvgWidthVisitor extends OrcSchemaVisitor { + + @Override + public Integer record(TypeDescription record, List names, List fieldWidths) { + return fieldWidths.stream().reduce(Integer::sum).orElse(0); + } + + @Override + public Integer list(TypeDescription array, Integer elementWidth) { + return elementWidth; + } + + @Override + public Integer map(TypeDescription map, Integer keyWidth, Integer valueWidth) { + return keyWidth + valueWidth; + } + + @Override + public Integer primitive(TypeDescription primitive) { + Optional icebergIdOpt = ORCSchemaUtil.icebergID(primitive); + + if (!icebergIdOpt.isPresent()) { + return 0; + } + + switch (primitive.getCategory()) { + case BYTE: + case CHAR: + case SHORT: + case INT: + case FLOAT: + case BOOLEAN: + case LONG: + case DOUBLE: + case DATE: + return 8; + case TIMESTAMP: + case TIMESTAMP_INSTANT: + return 12; + case STRING: + case VARCHAR: + case BINARY: + return 128; + case DECIMAL: + return primitive.getPrecision() + 2; + default: + throw new IllegalArgumentException("Can't handle " + primitive); + } + } +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java index e946cda3f3a8..8261839f760e 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java @@ -20,6 +20,7 @@ package org.apache.iceberg.orc; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -30,6 +31,7 @@ import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; +import org.apache.iceberg.common.DynFields; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.FileAppender; @@ -41,16 +43,23 @@ import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; +import org.apache.orc.impl.writer.TreeWriter; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Create a file appender for ORC. */ class OrcFileAppender implements FileAppender { + private static final Logger LOG = LoggerFactory.getLogger(OrcFileAppender.class); + private final int batchSize; private final OutputFile file; private final Writer writer; + private final TreeWriter treeWriter; private final VectorizedRowBatch batch; + private final int avgRowByteSize; private final OrcRowWriter valueWriter; private boolean isClosed = false; private final Configuration conf; @@ -66,6 +75,14 @@ class OrcFileAppender implements FileAppender { this.metricsConfig = metricsConfig; TypeDescription orcSchema = ORCSchemaUtil.convert(schema); + + this.avgRowByteSize = + OrcSchemaVisitor.visitSchema(orcSchema, new EstimateOrcAvgWidthVisitor()).stream().reduce(Integer::sum) + .orElse(0); + if (avgRowByteSize == 0) { + LOG.warn("The average length of the rows appears to be zero."); + } + this.batch = orcSchema.createRowBatch(this.batchSize); OrcFile.WriterOptions options = OrcFile.writerOptions(conf).useUTCTimestamp(true); @@ -74,6 +91,9 @@ class OrcFileAppender implements FileAppender { } options.setSchema(orcSchema); this.writer = newOrcWriter(file, options, metadata); + + // TODO: Turn to access the estimateMemorySize directly after ORC 1.7.4 released with https://github.com/apache/orc/pull/1057. + this.treeWriter = treeWriterHiddenInORC(); this.valueWriter = newOrcRowWriter(schema, orcSchema, createWriterFunc); } @@ -86,7 +106,7 @@ public void add(D datum) { batch.reset(); } } catch (IOException ioe) { - throw new RuntimeIOException(ioe, "Problem writing to ORC file %s", file.location()); + throw new UncheckedIOException(String.format("Problem writing to ORC file %s", file.location()), ioe); } } @@ -99,9 +119,29 @@ public Metrics metrics() { @Override public long length() { - Preconditions.checkState(isClosed, - "Cannot return length while appending to an open file."); - return file.toInputFile().getLength(); + if (isClosed) { + return file.toInputFile().getLength(); + } + + Preconditions.checkNotNull(treeWriter, + "Cannot estimate length of file being written as the ORC writer's internal writer is not present"); + + long estimateMemory = treeWriter.estimateMemory(); + + long dataLength = 0; + try { + List stripes = writer.getStripes(); + if (!stripes.isEmpty()) { + StripeInformation stripeInformation = stripes.get(stripes.size() - 1); + dataLength = stripeInformation != null ? stripeInformation.getOffset() + stripeInformation.getLength() : 0; + } + } catch (IOException e) { + throw new UncheckedIOException(String.format("Can't get Stripe's length from the file writer with path: %s.", + file.location()), e); + } + + // This value is estimated, not actual. + return (long) Math.ceil(dataLength + (estimateMemory + (long) batch.size * avgRowByteSize) * 0.2); } @Override @@ -153,4 +193,10 @@ private static OrcRowWriter newOrcRowWriter(Schema schema, createWriterFunc) { return (OrcRowWriter) createWriterFunc.apply(schema, orcSchema); } + + private TreeWriter treeWriterHiddenInORC() { + DynFields.BoundField treeWriterFiled = + DynFields.builder().hiddenImpl(writer.getClass(), "treeWriter").build(writer); + return treeWriterFiled.get(); + } } diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestEstimateOrcAvgWidthVisitor.java b/orc/src/test/java/org/apache/iceberg/orc/TestEstimateOrcAvgWidthVisitor.java new file mode 100644 index 000000000000..aca95efd4959 --- /dev/null +++ b/orc/src/test/java/org/apache/iceberg/orc/TestEstimateOrcAvgWidthVisitor.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestEstimateOrcAvgWidthVisitor { + + // all supported fields + protected static final Types.NestedField ID_FIELD = required(1, "id", Types.IntegerType.get()); + protected static final Types.NestedField DATA_FIELD = optional(2, "data", Types.StringType.get()); + protected static final Types.NestedField FLOAT_FIELD = required(3, "float", Types.FloatType.get()); + protected static final Types.NestedField DOUBLE_FIELD = optional(4, "double", Types.DoubleType.get()); + protected static final Types.NestedField DECIMAL_FIELD = optional(5, "decimal", Types.DecimalType.of(5, 3)); + protected static final Types.NestedField FIXED_FIELD = optional(7, "fixed", Types.FixedType.ofLength(4)); + protected static final Types.NestedField BINARY_FIELD = optional(8, "binary", Types.BinaryType.get()); + protected static final Types.NestedField FLOAT_LIST_FIELD = optional(9, "floatList", + Types.ListType.ofRequired(10, Types.FloatType.get())); + protected static final Types.NestedField LONG_FIELD = optional(11, "long", Types.LongType.get()); + protected static final Types.NestedField BOOLEAN_FIELD = optional(12, "boolean", Types.BooleanType.get()); + protected static final Types.NestedField TIMESTAMP_ZONE_FIELD = optional(13, "timestampZone", + Types.TimestampType.withZone()); + protected static final Types.NestedField TIMESTAMP_FIELD = optional(14, "timestamp", + Types.TimestampType.withoutZone()); + protected static final Types.NestedField DATE_FIELD = optional(15, "date", Types.DateType.get()); + protected static final Types.NestedField UUID_FIELD = required(16, "uuid", Types.UUIDType.get()); + + protected static final Types.NestedField MAP_FIELD_1 = optional(17, "map1", + Types.MapType.ofOptional(18, 19, Types.FloatType.get(), Types.StringType.get()) + ); + protected static final Types.NestedField MAP_FIELD_2 = optional(20, "map2", + Types.MapType.ofOptional(21, 22, Types.IntegerType.get(), Types.DoubleType.get()) + ); + protected static final Types.NestedField STRUCT_FIELD = optional(23, "struct", Types.StructType.of( + required(24, "booleanField", Types.BooleanType.get()), + optional(25, "date", Types.DateType.get()), + optional(27, "timestamp", Types.TimestampType.withZone()) + )); + + @Test + public void testEstimateIntegerWidth() { + Schema integerSchema = new Schema(ID_FIELD); + TypeDescription integerOrcSchema = ORCSchemaUtil.convert(integerSchema); + long estimateLength = getEstimateLength(integerOrcSchema); + Assert.assertEquals("Estimated average length of integer must be 8.", 8, estimateLength); + } + + @Test + public void testEstimateStringWidth() { + Schema stringSchema = new Schema(DATA_FIELD); + TypeDescription stringOrcSchema = ORCSchemaUtil.convert(stringSchema); + long estimateLength = getEstimateLength(stringOrcSchema); + Assert.assertEquals("Estimated average length of string must be 128.", 128, estimateLength); + } + + @Test + public void testEstimateFloatWidth() { + Schema floatSchema = new Schema(FLOAT_FIELD); + TypeDescription floatOrcSchema = ORCSchemaUtil.convert(floatSchema); + long estimateLength = getEstimateLength(floatOrcSchema); + Assert.assertEquals("Estimated average length of float must be 8.", 8, estimateLength); + } + + @Test + public void testEstimateDoubleWidth() { + Schema doubleSchema = new Schema(DOUBLE_FIELD); + TypeDescription doubleOrcSchema = ORCSchemaUtil.convert(doubleSchema); + long estimateLength = getEstimateLength(doubleOrcSchema); + Assert.assertEquals("Estimated average length of double must be 8.", 8, estimateLength); + } + + @Test + public void testEstimateDecimalWidth() { + Schema decimalSchema = new Schema(DECIMAL_FIELD); + TypeDescription decimalOrcSchema = ORCSchemaUtil.convert(decimalSchema); + long estimateLength = getEstimateLength(decimalOrcSchema); + Assert.assertEquals("Estimated average length of decimal must be 7.", 7, estimateLength); + } + + @Test + public void testEstimateFixedWidth() { + Schema fixedSchema = new Schema(FIXED_FIELD); + TypeDescription fixedOrcSchema = ORCSchemaUtil.convert(fixedSchema); + long estimateLength = getEstimateLength(fixedOrcSchema); + Assert.assertEquals("Estimated average length of fixed must be 128.", 128, estimateLength); + } + + @Test + public void testEstimateBinaryWidth() { + Schema binarySchema = new Schema(BINARY_FIELD); + TypeDescription binaryOrcSchema = ORCSchemaUtil.convert(binarySchema); + long estimateLength = getEstimateLength(binaryOrcSchema); + Assert.assertEquals("Estimated average length of binary must be 128.", 128, estimateLength); + } + + @Test + public void testEstimateListWidth() { + Schema listSchema = new Schema(FLOAT_LIST_FIELD); + TypeDescription listOrcSchema = ORCSchemaUtil.convert(listSchema); + long estimateLength = getEstimateLength(listOrcSchema); + Assert.assertEquals("Estimated average length of list must be 8.", 8, estimateLength); + } + + @Test + public void testEstimateLongWidth() { + Schema longSchema = new Schema(LONG_FIELD); + TypeDescription longOrcSchema = ORCSchemaUtil.convert(longSchema); + long estimateLength = getEstimateLength(longOrcSchema); + Assert.assertEquals("Estimated average length of long must be 8.", 8, estimateLength); + } + + @Test + public void testEstimateBooleanWidth() { + Schema booleanSchema = new Schema(BOOLEAN_FIELD); + TypeDescription booleanOrcSchema = ORCSchemaUtil.convert(booleanSchema); + long estimateLength = getEstimateLength(booleanOrcSchema); + Assert.assertEquals("Estimated average length of boolean must be 8.", 8, estimateLength); + } + + @Test + public void testEstimateTimestampWidth() { + Schema timestampZoneSchema = new Schema(TIMESTAMP_ZONE_FIELD); + TypeDescription timestampZoneOrcSchema = ORCSchemaUtil.convert(timestampZoneSchema); + long estimateLength = getEstimateLength(timestampZoneOrcSchema); + Assert.assertEquals("Estimated average length of timestamps with zone must be 12.", 12, estimateLength); + + Schema timestampSchema = new Schema(TIMESTAMP_FIELD); + TypeDescription timestampOrcSchema = ORCSchemaUtil.convert(timestampSchema); + estimateLength = getEstimateLength(timestampOrcSchema); + Assert.assertEquals("Estimated average length of timestamp must be 12.", 12, estimateLength); + } + + @Test + public void testEstimateDateWidth() { + Schema dateSchema = new Schema(DATE_FIELD); + TypeDescription dateOrcSchema = ORCSchemaUtil.convert(dateSchema); + long estimateLength = getEstimateLength(dateOrcSchema); + Assert.assertEquals("Estimated average length of date must be 8.", 8, estimateLength); + } + + @Test + public void testEstimateUUIDWidth() { + Schema uuidSchema = new Schema(UUID_FIELD); + TypeDescription uuidOrcSchema = ORCSchemaUtil.convert(uuidSchema); + long estimateLength = getEstimateLength(uuidOrcSchema); + Assert.assertEquals("Estimated average length of uuid must be 128.", 128, estimateLength); + } + + @Test + public void testEstimateMapWidth() { + Schema mapSchema = new Schema(MAP_FIELD_1); + TypeDescription mapOrcSchema = ORCSchemaUtil.convert(mapSchema); + long estimateLength = getEstimateLength(mapOrcSchema); + Assert.assertEquals("Estimated average length of map must be 136.", 136, estimateLength); + } + + @Test + public void testEstimateStructWidth() { + Schema structSchema = new Schema(STRUCT_FIELD); + TypeDescription structOrcSchema = ORCSchemaUtil.convert(structSchema); + long estimateLength = getEstimateLength(structOrcSchema); + Assert.assertEquals("Estimated average length of struct must be 28.", 28, estimateLength); + } + + @Test + public void testEstimateFullWidth() { + Schema fullSchema = new Schema(ID_FIELD, DATA_FIELD, FLOAT_FIELD, DOUBLE_FIELD, DECIMAL_FIELD, FIXED_FIELD, + BINARY_FIELD, FLOAT_LIST_FIELD, LONG_FIELD, MAP_FIELD_1, MAP_FIELD_2, STRUCT_FIELD); + TypeDescription fullOrcSchema = ORCSchemaUtil.convert(fullSchema); + long estimateLength = getEstimateLength(fullOrcSchema); + Assert.assertEquals("Estimated average length of the row must be 611.", 611, estimateLength); + } + + private Integer getEstimateLength(TypeDescription orcSchemaWithDate) { + return OrcSchemaVisitor.visitSchema(orcSchemaWithDate, new EstimateOrcAvgWidthVisitor()) + .stream().reduce(Integer::sum).orElse(0); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index a6696221ecda..94f4614f1bde 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -42,7 +42,6 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; -import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.ClusteredDataWriter; @@ -636,13 +635,7 @@ private static class UnpartitionedDataWriter implements DataWriter private UnpartitionedDataWriter(SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, FileIO io, PartitionSpec spec, FileFormat format, long targetFileSize) { - // TODO: support ORC rolling writers - if (format == FileFormat.ORC) { - EncryptedOutputFile outputFile = fileFactory.newOutputFile(); - delegate = writerFactory.newDataWriter(outputFile, spec, null); - } else { - delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); - } + delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); this.io = io; } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index 5e99dac5c26a..f9241758cd11 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -365,11 +365,9 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws files.add(file); } } - // TODO: ORC file now not support target file size - if (!format.equals(FileFormat.ORC)) { - Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); - } + + Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); } @Test @@ -585,11 +583,9 @@ public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType opti files.add(file); } } - // TODO: ORC file now not support target file size - if (!format.equals(FileFormat.ORC)) { - Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); - } + + Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); } public enum IcebergOptionsType {