diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java index a4acd2f7b2ef..9b1b7cca8d9a 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -35,16 +34,14 @@ public class ClusteredDataWriter extends ClusteredWriter private final FileWriterFactory writerFactory; private final OutputFileFactory fileFactory; private final FileIO io; - private final FileFormat fileFormat; private final long targetFileSizeInBytes; private final List dataFiles; public ClusteredDataWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, - FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + FileIO io, long targetFileSizeInBytes) { this.writerFactory = writerFactory; this.fileFactory = fileFactory; this.io = io; - this.fileFormat = fileFormat; this.targetFileSizeInBytes = targetFileSizeInBytes; this.dataFiles = Lists.newArrayList(); } diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java index 976165f50e4f..4b4b71181468 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -36,16 +35,14 @@ public class ClusteredEqualityDeleteWriter extends ClusteredWriter writerFactory; private final OutputFileFactory fileFactory; private final FileIO io; - private final FileFormat fileFormat; private final long targetFileSizeInBytes; private final List deleteFiles; public ClusteredEqualityDeleteWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, - FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + FileIO io, long targetFileSizeInBytes) { this.writerFactory = writerFactory; this.fileFactory = fileFactory; this.io = io; - this.fileFormat = fileFormat; this.targetFileSizeInBytes = targetFileSizeInBytes; this.deleteFiles = Lists.newArrayList(); } diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java index 53336fff7e16..450d427ce0b1 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.PositionDelete; @@ -37,17 +36,15 @@ public class ClusteredPositionDeleteWriter extends ClusteredWriter writerFactory; private final OutputFileFactory fileFactory; private final FileIO io; - private final FileFormat fileFormat; private final long targetFileSizeInBytes; private final List deleteFiles; private final CharSequenceSet referencedDataFiles; public ClusteredPositionDeleteWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, - FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + FileIO io, long targetFileSizeInBytes) { this.writerFactory = writerFactory; this.fileFactory = fileFactory; this.io = io; - this.fileFormat = fileFormat; this.targetFileSizeInBytes = targetFileSizeInBytes; this.deleteFiles = Lists.newArrayList(); this.referencedDataFiles = CharSequenceSet.empty(); diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java index 54ccff0e327f..eddd1864d724 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -35,16 +34,14 @@ public class FanoutDataWriter extends FanoutWriter { private final FileWriterFactory writerFactory; private final OutputFileFactory fileFactory; private final FileIO io; - private final FileFormat fileFormat; private final long targetFileSizeInBytes; private final List dataFiles; public FanoutDataWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, - FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + FileIO io, long targetFileSizeInBytes) { this.writerFactory = writerFactory; this.fileFactory = fileFactory; this.io = io; - this.fileFormat = fileFormat; this.targetFileSizeInBytes = targetFileSizeInBytes; this.dataFiles = Lists.newArrayList(); } diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java index ca9e603e5432..7253730dbcf4 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -82,8 +82,7 @@ public void setupTable() throws Exception { public void testClusteredDataWriterNoRecords() throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); ClusteredDataWriter writer = new ClusteredDataWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); writer.close(); Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); @@ -100,8 +99,7 @@ public void testClusteredDataWriterMultiplePartitions() throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); ClusteredDataWriter writer = new ClusteredDataWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); PartitionSpec spec = table.spec(); @@ -138,8 +136,7 @@ public void testClusteredDataWriterOutOfOrderPartitions() throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); ClusteredDataWriter writer = new ClusteredDataWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); PartitionSpec spec = table.spec(); @@ -162,8 +159,7 @@ public void testClusteredEqualityDeleteWriterNoRecords() throws IOException { Schema equalityDeleteRowSchema = table.schema().select("id"); FileWriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); writer.close(); Assert.assertEquals(0, writer.result().deleteFiles().size()); @@ -226,8 +222,7 @@ public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException .commit(); ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); PartitionSpec unpartitionedSpec = table.specs().get(0); PartitionSpec bucketSpec = table.specs().get(1); @@ -274,8 +269,7 @@ public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() thro .commit(); ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); PartitionSpec unpartitionedSpec = table.specs().get(0); PartitionSpec bucketSpec = table.specs().get(1); @@ -303,8 +297,7 @@ public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() thro public void testClusteredPositionDeleteWriterNoRecords() throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); writer.close(); Assert.assertEquals(0, writer.result().deleteFiles().size()); @@ -365,8 +358,7 @@ public void testClusteredPositionDeleteWriterMultipleSpecs() throws IOException .commit(); ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); PartitionSpec unpartitionedSpec = table.specs().get(0); PartitionSpec bucketSpec = table.specs().get(1); @@ -411,8 +403,7 @@ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() thro .commit(); ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); PartitionSpec unpartitionedSpec = table.specs().get(0); PartitionSpec bucketSpec = table.specs().get(1); @@ -446,8 +437,7 @@ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() thro public void testFanoutDataWriterNoRecords() throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); FanoutDataWriter writer = new FanoutDataWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); writer.close(); Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); @@ -464,8 +454,7 @@ public void testFanoutDataWriterMultiplePartitions() throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); FanoutDataWriter writer = new FanoutDataWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); PartitionSpec spec = table.spec(); diff --git a/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java index 7b6e1d6746ec..275ee5e0fc88 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java @@ -80,14 +80,11 @@ public void testPositionDeltaInsertOnly() throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); ClusteredDataWriter insertWriter = new ClusteredDataWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); ClusteredDataWriter updateWriter = new ClusteredDataWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); ClusteredPositionDeleteWriter deleteWriter = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); PositionDeltaWriter deltaWriter = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter); deltaWriter.insert(toRow(1, "aaa"), table.spec(), null); @@ -148,14 +145,11 @@ public void testPositionDeltaDeleteOnly() throws IOException { PartitionSpec partitionedSpec = table.specs().get(1); ClusteredDataWriter insertWriter = new ClusteredDataWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); ClusteredDataWriter updateWriter = new ClusteredDataWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); ClusteredPositionDeleteWriter deleteWriter = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); PositionDeltaWriter deltaWriter = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter); deltaWriter.delete(dataFile1.path(), 2L, unpartitionedSpec, null); @@ -220,14 +214,11 @@ public void testPositionDeltaMultipleSpecs() throws IOException { PartitionSpec partitionedSpec = table.specs().get(1); ClusteredDataWriter insertWriter = new ClusteredDataWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); ClusteredDataWriter updateWriter = new ClusteredDataWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); ClusteredPositionDeleteWriter deleteWriter = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table.io(), - fileFormat, TARGET_FILE_SIZE); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE); PositionDeltaWriter deltaWriter = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter); deltaWriter.delete(dataFile1.path(), 2L, unpartitionedSpec, null); diff --git a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java index b4fb243edd4f..c10890e6ac22 100644 --- a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java +++ b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.CoreOptions; @@ -50,7 +51,6 @@ import org.apache.iceberg.types.Types; import org.junit.After; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -320,7 +320,6 @@ public void testRewriteLargeTableHasResiduals() throws IOException { */ @Test public void testRewriteAvoidRepeateCompress() throws IOException { - Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC)); List expected = Lists.newArrayList(); Schema schema = icebergTableUnPartitioned.schema(); GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema); @@ -329,7 +328,7 @@ public void testRewriteAvoidRepeateCompress() throws IOException { try (FileAppender fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) { long filesize = 20000; for (; fileAppender.length() < filesize; count++) { - Record record = SimpleDataUtil.createRecord(count, "iceberg"); + Record record = SimpleDataUtil.createRecord(count, UUID.randomUUID().toString()); fileAppender.add(record); expected.add(record); } diff --git a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index 741977541c08..86c2f6672c3c 100644 --- a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -233,10 +233,6 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { @Test public void testTableWithTargetFileSize() throws Exception { - // TODO: ORC file does not support target file size before closed. - if (format == FileFormat.ORC) { - return; - } // Adjust the target-file-size in table properties. table.updateProperties() .set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger diff --git a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index 562a75e53773..2595b098dfea 100644 --- a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java +++ b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -183,10 +183,6 @@ public void testCompleteFiles() throws IOException { @Test public void testRollingWithTargetFileSize() throws IOException { - // TODO ORC don't support target file size before closed. - if (format == FileFormat.ORC) { - return; - } try (TaskWriter taskWriter = createTaskWriter(4)) { List rows = Lists.newArrayListWithCapacity(8000); List records = Lists.newArrayListWithCapacity(8000); diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java index 397bd6e88c69..03cdcd80dec1 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.CoreOptions; @@ -50,7 +51,6 @@ import org.apache.iceberg.types.Types; import org.junit.After; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -322,7 +322,6 @@ public void testRewriteLargeTableHasResiduals() throws IOException { */ @Test public void testRewriteAvoidRepeateCompress() throws IOException { - Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC)); List expected = Lists.newArrayList(); Schema schema = icebergTableUnPartitioned.schema(); GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema); @@ -331,7 +330,7 @@ public void testRewriteAvoidRepeateCompress() throws IOException { try (FileAppender fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) { long filesize = 20000; for (; fileAppender.length() < filesize; count++) { - Record record = SimpleDataUtil.createRecord(count, "iceberg"); + Record record = SimpleDataUtil.createRecord(count, UUID.randomUUID().toString()); fileAppender.add(record); expected.add(record); } diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index 741977541c08..86c2f6672c3c 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -233,10 +233,6 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception { @Test public void testTableWithTargetFileSize() throws Exception { - // TODO: ORC file does not support target file size before closed. - if (format == FileFormat.ORC) { - return; - } // Adjust the target-file-size in table properties. table.updateProperties() .set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index 562a75e53773..2595b098dfea 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -183,10 +183,6 @@ public void testCompleteFiles() throws IOException { @Test public void testRollingWithTargetFileSize() throws IOException { - // TODO ORC don't support target file size before closed. - if (format == FileFormat.ORC) { - return; - } try (TaskWriter taskWriter = createTaskWriter(4)) { List rows = Lists.newArrayListWithCapacity(8000); List records = Lists.newArrayListWithCapacity(8000); diff --git a/spark/v2.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java b/spark/v2.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java index a8521de0e0ba..06e00e3ebab7 100644 --- a/spark/v2.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java +++ b/spark/v2.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java @@ -136,8 +136,7 @@ public void writeUnpartitionedClusteredDataWriter(Blackhole blackhole) throws IO .build(); ClusteredDataWriter writer = new ClusteredDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); try (ClusteredDataWriter closeableWriter = writer) { for (InternalRow row : rows) { @@ -186,8 +185,7 @@ public void writePartitionedClusteredDataWriter(Blackhole blackhole) throws IOEx .build(); ClusteredDataWriter writer = new ClusteredDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType dataSparkType = SparkSchemaUtil.convert(table().schema()); @@ -242,8 +240,7 @@ public void writePartitionedFanoutDataWriter(Blackhole blackhole) throws IOExcep .build(); FanoutDataWriter writer = new FanoutDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType dataSparkType = SparkSchemaUtil.convert(table().schema()); @@ -301,8 +298,7 @@ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole) t .build(); ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType deleteSparkType = SparkSchemaUtil.convert(table().schema()); @@ -329,8 +325,7 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) .build(); ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PositionDelete positionDelete = PositionDelete.create(); try (ClusteredPositionDeleteWriter closeableWriter = writer) { diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index 5e99dac5c26a..f9241758cd11 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -365,11 +365,9 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws files.add(file); } } - // TODO: ORC file now not support target file size - if (!format.equals(FileFormat.ORC)) { - Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); - } + + Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); } @Test @@ -585,11 +583,9 @@ public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType opti files.add(file); } } - // TODO: ORC file now not support target file size - if (!format.equals(FileFormat.ORC)) { - Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); - } + + Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); } public enum IcebergOptionsType { diff --git a/spark/v3.0/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java b/spark/v3.0/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java index a8521de0e0ba..06e00e3ebab7 100644 --- a/spark/v3.0/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java +++ b/spark/v3.0/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java @@ -136,8 +136,7 @@ public void writeUnpartitionedClusteredDataWriter(Blackhole blackhole) throws IO .build(); ClusteredDataWriter writer = new ClusteredDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); try (ClusteredDataWriter closeableWriter = writer) { for (InternalRow row : rows) { @@ -186,8 +185,7 @@ public void writePartitionedClusteredDataWriter(Blackhole blackhole) throws IOEx .build(); ClusteredDataWriter writer = new ClusteredDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType dataSparkType = SparkSchemaUtil.convert(table().schema()); @@ -242,8 +240,7 @@ public void writePartitionedFanoutDataWriter(Blackhole blackhole) throws IOExcep .build(); FanoutDataWriter writer = new FanoutDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType dataSparkType = SparkSchemaUtil.convert(table().schema()); @@ -301,8 +298,7 @@ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole) t .build(); ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType deleteSparkType = SparkSchemaUtil.convert(table().schema()); @@ -329,8 +325,7 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) .build(); ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PositionDelete positionDelete = PositionDelete.create(); try (ClusteredPositionDeleteWriter closeableWriter = writer) { diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 896ba82fc7df..934a0349062c 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -43,7 +43,6 @@ import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.ClusteredDataWriter; @@ -562,12 +561,11 @@ public DataWriter createWriter(int partitionId, long taskId, long e .build(); if (spec.isUnpartitioned()) { - return new UnpartitionedDataWriter(writerFactory, fileFactory, io, spec, format, targetFileSize); + return new UnpartitionedDataWriter(writerFactory, fileFactory, io, spec, targetFileSize); } else { return new PartitionedDataWriter( - writerFactory, fileFactory, io, spec, writeSchema, dsSchema, - format, targetFileSize, partitionedFanoutEnabled); + writerFactory, fileFactory, io, spec, writeSchema, dsSchema, targetFileSize, partitionedFanoutEnabled); } } } @@ -584,14 +582,8 @@ private static class UnpartitionedDataWriter implements DataWriter private final FileIO io; private UnpartitionedDataWriter(SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, - FileIO io, PartitionSpec spec, FileFormat format, long targetFileSize) { - // TODO: support ORC rolling writers - if (format == FileFormat.ORC) { - EncryptedOutputFile outputFile = fileFactory.newOutputFile(); - delegate = writerFactory.newDataWriter(outputFile, spec, null); - } else { - delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); - } + FileIO io, PartitionSpec spec, long targetFileSize) { + this.delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); this.io = io; } @@ -633,12 +625,11 @@ private static class PartitionedDataWriter implements DataWriter { private PartitionedDataWriter(SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, FileIO io, PartitionSpec spec, Schema dataSchema, - StructType dataSparkType, FileFormat format, - long targetFileSize, boolean fanoutEnabled) { + StructType dataSparkType, long targetFileSize, boolean fanoutEnabled) { if (fanoutEnabled) { - this.delegate = new FanoutDataWriter<>(writerFactory, fileFactory, io, format, targetFileSize); + this.delegate = new FanoutDataWriter<>(writerFactory, fileFactory, io, targetFileSize); } else { - this.delegate = new ClusteredDataWriter<>(writerFactory, fileFactory, io, format, targetFileSize); + this.delegate = new ClusteredDataWriter<>(writerFactory, fileFactory, io, targetFileSize); } this.io = io; this.spec = spec; diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index 5e99dac5c26a..f9241758cd11 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -365,11 +365,9 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws files.add(file); } } - // TODO: ORC file now not support target file size - if (!format.equals(FileFormat.ORC)) { - Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); - } + + Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); } @Test @@ -585,11 +583,9 @@ public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType opti files.add(file); } } - // TODO: ORC file now not support target file size - if (!format.equals(FileFormat.ORC)) { - Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); - } + + Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); } public enum IcebergOptionsType { diff --git a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java index a8521de0e0ba..06e00e3ebab7 100644 --- a/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java +++ b/spark/v3.1/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java @@ -136,8 +136,7 @@ public void writeUnpartitionedClusteredDataWriter(Blackhole blackhole) throws IO .build(); ClusteredDataWriter writer = new ClusteredDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); try (ClusteredDataWriter closeableWriter = writer) { for (InternalRow row : rows) { @@ -186,8 +185,7 @@ public void writePartitionedClusteredDataWriter(Blackhole blackhole) throws IOEx .build(); ClusteredDataWriter writer = new ClusteredDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType dataSparkType = SparkSchemaUtil.convert(table().schema()); @@ -242,8 +240,7 @@ public void writePartitionedFanoutDataWriter(Blackhole blackhole) throws IOExcep .build(); FanoutDataWriter writer = new FanoutDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType dataSparkType = SparkSchemaUtil.convert(table().schema()); @@ -301,8 +298,7 @@ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole) t .build(); ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType deleteSparkType = SparkSchemaUtil.convert(table().schema()); @@ -329,8 +325,7 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) .build(); ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PositionDelete positionDelete = PositionDelete.create(); try (ClusteredPositionDeleteWriter closeableWriter = writer) { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 896ba82fc7df..934a0349062c 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -43,7 +43,6 @@ import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.ClusteredDataWriter; @@ -562,12 +561,11 @@ public DataWriter createWriter(int partitionId, long taskId, long e .build(); if (spec.isUnpartitioned()) { - return new UnpartitionedDataWriter(writerFactory, fileFactory, io, spec, format, targetFileSize); + return new UnpartitionedDataWriter(writerFactory, fileFactory, io, spec, targetFileSize); } else { return new PartitionedDataWriter( - writerFactory, fileFactory, io, spec, writeSchema, dsSchema, - format, targetFileSize, partitionedFanoutEnabled); + writerFactory, fileFactory, io, spec, writeSchema, dsSchema, targetFileSize, partitionedFanoutEnabled); } } } @@ -584,14 +582,8 @@ private static class UnpartitionedDataWriter implements DataWriter private final FileIO io; private UnpartitionedDataWriter(SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, - FileIO io, PartitionSpec spec, FileFormat format, long targetFileSize) { - // TODO: support ORC rolling writers - if (format == FileFormat.ORC) { - EncryptedOutputFile outputFile = fileFactory.newOutputFile(); - delegate = writerFactory.newDataWriter(outputFile, spec, null); - } else { - delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); - } + FileIO io, PartitionSpec spec, long targetFileSize) { + this.delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); this.io = io; } @@ -633,12 +625,11 @@ private static class PartitionedDataWriter implements DataWriter { private PartitionedDataWriter(SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, FileIO io, PartitionSpec spec, Schema dataSchema, - StructType dataSparkType, FileFormat format, - long targetFileSize, boolean fanoutEnabled) { + StructType dataSparkType, long targetFileSize, boolean fanoutEnabled) { if (fanoutEnabled) { - this.delegate = new FanoutDataWriter<>(writerFactory, fileFactory, io, format, targetFileSize); + this.delegate = new FanoutDataWriter<>(writerFactory, fileFactory, io, targetFileSize); } else { - this.delegate = new ClusteredDataWriter<>(writerFactory, fileFactory, io, format, targetFileSize); + this.delegate = new ClusteredDataWriter<>(writerFactory, fileFactory, io, targetFileSize); } this.io = io; this.spec = spec; diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index 5e99dac5c26a..f9241758cd11 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -365,11 +365,9 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws files.add(file); } } - // TODO: ORC file now not support target file size - if (!format.equals(FileFormat.ORC)) { - Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); - } + + Assert.assertEquals("Should have 4 DataFiles", 4, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); } @Test @@ -585,11 +583,9 @@ public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType opti files.add(file); } } - // TODO: ORC file now not support target file size - if (!format.equals(FileFormat.ORC)) { - Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); - } + + Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); } public enum IcebergOptionsType { diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java index 82c64b707c07..ff2964212347 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -171,8 +171,7 @@ protected void writePosDeletes(CharSequence path, List deletedPos, int num .build(); ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, table().io(), - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, table().io(), TARGET_FILE_SIZE_IN_BYTES); PartitionSpec unpartitionedSpec = table().specs().get(0); @@ -228,7 +227,7 @@ private void writeEqDeletes(List rows) throws IOException { .build(); ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( - writerFactory, fileFactory, table().io(), fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, table().io(), TARGET_FILE_SIZE_IN_BYTES); PartitionSpec unpartitionedSpec = table().specs().get(0); try (ClusteredEqualityDeleteWriter closeableWriter = writer) { diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java index 93ee5574a58d..acec471bdfd1 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java @@ -137,8 +137,7 @@ public void writeUnpartitionedClusteredDataWriter(Blackhole blackhole) throws IO .build(); ClusteredDataWriter writer = new ClusteredDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); try (ClusteredDataWriter closeableWriter = writer) { for (InternalRow row : rows) { @@ -187,8 +186,7 @@ public void writePartitionedClusteredDataWriter(Blackhole blackhole) throws IOEx .build(); ClusteredDataWriter writer = new ClusteredDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType dataSparkType = SparkSchemaUtil.convert(table().schema()); @@ -243,8 +241,7 @@ public void writePartitionedFanoutDataWriter(Blackhole blackhole) throws IOExcep .build(); FanoutDataWriter writer = new FanoutDataWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType dataSparkType = SparkSchemaUtil.convert(table().schema()); @@ -302,8 +299,7 @@ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole) t .build(); ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); StructType deleteSparkType = SparkSchemaUtil.convert(table().schema()); @@ -330,8 +326,7 @@ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) .build(); ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( - writerFactory, fileFactory, io, - fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); PositionDelete positionDelete = PositionDelete.create(); try (ClusteredPositionDeleteWriter closeableWriter = writer) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 243ba14215e0..b57defa6987a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -361,8 +361,7 @@ private static class DeleteOnlyDeltaWriter extends BaseDeltaWriter { OutputFileFactory deleteFileFactory, Context context) { this.delegate = new ClusteredPositionDeleteWriter<>( - writerFactory, deleteFileFactory, table.io(), - context.deleteFileFormat(), context.targetDeleteFileSize()); + writerFactory, deleteFileFactory, table.io(), context.targetDeleteFileSize()); this.positionDelete = PositionDelete.create(); this.io = table.io(); this.specs = table.specs(); @@ -504,13 +503,12 @@ private PartitioningWriter newInsertWriter(Table t SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, Context context) { - FileFormat fileFormat = context.dataFileFormat(); long targetFileSize = context.targetDataFileSize(); if (table.spec().isPartitioned() && context.fanoutWriterEnabled()) { - return new FanoutDataWriter<>(writerFactory, fileFactory, table.io(), fileFormat, targetFileSize); + return new FanoutDataWriter<>(writerFactory, fileFactory, table.io(), targetFileSize); } else { - return new ClusteredDataWriter<>(writerFactory, fileFactory, table.io(), fileFormat, targetFileSize); + return new ClusteredDataWriter<>(writerFactory, fileFactory, table.io(), targetFileSize); } } @@ -518,14 +516,13 @@ private PartitioningWriter newUpdateWriter(Table t SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, Context context) { - FileFormat fileFormat = context.dataFileFormat(); long targetFileSize = context.targetDataFileSize(); if (table.spec().isPartitioned()) { // use a fanout writer for partitioned tables to write updates as they may be out of order - return new FanoutDataWriter<>(writerFactory, fileFactory, table.io(), fileFormat, targetFileSize); + return new FanoutDataWriter<>(writerFactory, fileFactory, table.io(), targetFileSize); } else { - return new ClusteredDataWriter<>(writerFactory, fileFactory, table.io(), fileFormat, targetFileSize); + return new ClusteredDataWriter<>(writerFactory, fileFactory, table.io(), targetFileSize); } } @@ -533,9 +530,8 @@ private ClusteredPositionDeleteWriter newDeleteWriter(Table table, SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, Context context) { - FileFormat fileFormat = context.deleteFileFormat(); long targetFileSize = context.targetDeleteFileSize(); - return new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, table.io(), fileFormat, targetFileSize); + return new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, table.io(), targetFileSize); } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 94f4614f1bde..6a6ecd206ed9 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -612,12 +612,11 @@ public DataWriter createWriter(int partitionId, long taskId, long e .build(); if (spec.isUnpartitioned()) { - return new UnpartitionedDataWriter(writerFactory, fileFactory, io, spec, format, targetFileSize); + return new UnpartitionedDataWriter(writerFactory, fileFactory, io, spec, targetFileSize); } else { return new PartitionedDataWriter( - writerFactory, fileFactory, io, spec, writeSchema, dsSchema, - format, targetFileSize, partitionedFanoutEnabled); + writerFactory, fileFactory, io, spec, writeSchema, dsSchema, targetFileSize, partitionedFanoutEnabled); } } } @@ -634,8 +633,8 @@ private static class UnpartitionedDataWriter implements DataWriter private final FileIO io; private UnpartitionedDataWriter(SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, - FileIO io, PartitionSpec spec, FileFormat format, long targetFileSize) { - delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); + FileIO io, PartitionSpec spec, long targetFileSize) { + this.delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); this.io = io; } @@ -677,12 +676,11 @@ private static class PartitionedDataWriter implements DataWriter { private PartitionedDataWriter(SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, FileIO io, PartitionSpec spec, Schema dataSchema, - StructType dataSparkType, FileFormat format, - long targetFileSize, boolean fanoutEnabled) { + StructType dataSparkType, long targetFileSize, boolean fanoutEnabled) { if (fanoutEnabled) { - this.delegate = new FanoutDataWriter<>(writerFactory, fileFactory, io, format, targetFileSize); + this.delegate = new FanoutDataWriter<>(writerFactory, fileFactory, io, targetFileSize); } else { - this.delegate = new ClusteredDataWriter<>(writerFactory, fileFactory, io, format, targetFileSize); + this.delegate = new ClusteredDataWriter<>(writerFactory, fileFactory, io, targetFileSize); } this.io = io; this.spec = spec;