diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java index e8fe7934ace6..3d59e0cb0ca9 100644 --- a/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java +++ b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java @@ -61,4 +61,8 @@ public List deleteFiles() { public CharSequenceSet referencedDataFiles() { return referencedDataFiles; } + + public boolean referencesDataFiles() { + return referencedDataFiles != null && referencedDataFiles.size() > 0; + } } diff --git a/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java new file mode 100644 index 000000000000..1642a30e41ba --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A rolling data writer that splits incoming data into multiple files within one spec/partition + * based on the target file size. + */ +public class RollingDataWriter extends RollingFileWriter, DataWriteResult> { + + private final FileWriterFactory writerFactory; + private final List dataFiles; + + public RollingDataWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, long targetFileSizeInBytes, + PartitionSpec spec, StructLike partition) { + super(fileFactory, io, targetFileSizeInBytes, spec, partition); + this.writerFactory = writerFactory; + this.dataFiles = Lists.newArrayList(); + openCurrentWriter(); + } + + @Override + protected DataWriter newWriter(EncryptedOutputFile file) { + return writerFactory.newDataWriter(file, spec(), partition()); + } + + @Override + protected void addResult(DataWriteResult result) { + dataFiles.addAll(result.dataFiles()); + } + + @Override + protected DataWriteResult aggregatedResult() { + return new DataWriteResult(dataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java new file mode 100644 index 000000000000..c12bfd31d6d5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A rolling equality delete writer that splits incoming deletes into multiple files within one spec/partition + * based on the target file size. + */ +public class RollingEqualityDeleteWriter extends RollingFileWriter, DeleteWriteResult> { + + private final FileWriterFactory writerFactory; + private final List deleteFiles; + + public RollingEqualityDeleteWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, long targetFileSizeInBytes, + PartitionSpec spec, StructLike partition) { + super(fileFactory, io, targetFileSizeInBytes, spec, partition); + this.writerFactory = writerFactory; + this.deleteFiles = Lists.newArrayList(); + openCurrentWriter(); + } + + @Override + protected EqualityDeleteWriter newWriter(EncryptedOutputFile file) { + return writerFactory.newEqualityDeleteWriter(file, spec(), partition()); + } + + @Override + protected void addResult(DeleteWriteResult result) { + Preconditions.checkArgument(!result.referencesDataFiles(), "Equality deletes cannot reference data files"); + deleteFiles.addAll(result.deleteFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java new file mode 100644 index 000000000000..ed35933313e2 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * A rolling writer capable of splitting incoming data or deletes into multiple files within one spec/partition + * based on the target file size. + */ +abstract class RollingFileWriter, R> implements FileWriter { + private static final int ROWS_DIVISOR = 1000; + + private final OutputFileFactory fileFactory; + private final FileIO io; + private final long targetFileSizeInBytes; + private final PartitionSpec spec; + private final StructLike partition; + + private EncryptedOutputFile currentFile = null; + private long currentFileRows = 0; + private W currentWriter = null; + + private boolean closed = false; + + protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes, + PartitionSpec spec, StructLike partition) { + this.fileFactory = fileFactory; + this.io = io; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.spec = spec; + this.partition = partition; + } + + protected abstract W newWriter(EncryptedOutputFile file); + + protected abstract void addResult(R result); + + protected abstract R aggregatedResult(); + + protected PartitionSpec spec() { + return spec; + } + + protected StructLike partition() { + return partition; + } + + public CharSequence currentFilePath() { + return currentFile.encryptingOutputFile().location(); + } + + public long currentFileRows() { + return currentFileRows; + } + + @Override + public long length() { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length"); + } + + @Override + public void write(T row) throws IOException { + currentWriter.write(row); + currentFileRows++; + + if (shouldRollToNewFile()) { + closeCurrentWriter(); + openCurrentWriter(); + } + } + + private boolean shouldRollToNewFile() { + return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >= targetFileSizeInBytes; + } + + protected void openCurrentWriter() { + Preconditions.checkState(currentWriter == null, "Current writer has been already initialized"); + + this.currentFile = newFile(); + this.currentFileRows = 0; + this.currentWriter = newWriter(currentFile); + } + + private EncryptedOutputFile newFile() { + if (partition == null) { + return fileFactory.newOutputFile(); + } else { + return fileFactory.newOutputFile(spec, partition); + } + } + + private void closeCurrentWriter() throws IOException { + if (currentWriter != null) { + currentWriter.close(); + + if (currentFileRows == 0L) { + io.deleteFile(currentFile.encryptingOutputFile()); + } else { + addResult(currentWriter.result()); + } + + this.currentFile = null; + this.currentFileRows = 0; + this.currentWriter = null; + } + } + + @Override + public void close() throws IOException { + if (!closed) { + closeCurrentWriter(); + this.closed = true; + } + } + + @Override + public final R result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return aggregatedResult(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java new file mode 100644 index 000000000000..001b0bb5f5d6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +/** + * A rolling position delete writer that splits incoming deletes into multiple files within one spec/partition + * based on the target file size. + */ +public class RollingPositionDeleteWriter + extends RollingFileWriter, PositionDeleteWriter, DeleteWriteResult> { + + private final FileWriterFactory writerFactory; + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public RollingPositionDeleteWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, long targetFileSizeInBytes, + PartitionSpec spec, StructLike partition) { + super(fileFactory, io, targetFileSizeInBytes, spec, partition); + this.writerFactory = writerFactory; + this.deleteFiles = Lists.newArrayList(); + this.referencedDataFiles = CharSequenceSet.empty(); + openCurrentWriter(); + } + + @Override + protected PositionDeleteWriter newWriter(EncryptedOutputFile file) { + return writerFactory.newPositionDeleteWriter(file, spec(), partition()); + } + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java new file mode 100644 index 000000000000..e11e0a7e60dc --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public abstract class TestRollingFileWriters extends TableTestBase { + + // TODO: add ORC once we support ORC rolling file writers + + @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}") + public static Object[] parameters() { + return new Object[][] { + new Object[]{FileFormat.AVRO, false}, + new Object[]{FileFormat.AVRO, true}, + new Object[]{FileFormat.PARQUET, false}, + new Object[]{FileFormat.PARQUET, true}, + }; + } + + private static final int TABLE_FORMAT_VERSION = 2; + private static final int FILE_SIZE_CHECK_ROWS_DIVISOR = 1000; + private static final long DEFAULT_FILE_SIZE = 128L * 1024 * 1024; + private static final long SMALL_FILE_SIZE = 2L; + private static final String PARTITION_VALUE = "aaa"; + + private final FileFormat fileFormat; + private final boolean partitioned; + private StructLike partition = null; + private OutputFileFactory fileFactory = null; + + public TestRollingFileWriters(FileFormat fileFormat, boolean partitioned) { + super(TABLE_FORMAT_VERSION); + this.fileFormat = fileFormat; + this.partitioned = partitioned; + } + + protected abstract FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema); + + protected abstract T toRow(Integer id, String data); + + protected FileFormat format() { + return fileFormat; + } + + @Before + public void setupTable() throws Exception { + this.tableDir = temp.newFolder(); + Assert.assertTrue(tableDir.delete()); // created during table creation + + this.metadataDir = new File(tableDir, "metadata"); + + if (partitioned) { + this.table = create(SCHEMA, SPEC); + this.partition = initPartitionKey(); + } else { + this.table = create(SCHEMA, PartitionSpec.unpartitioned()); + this.partition = null; + } + + this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build(); + } + + private PartitionKey initPartitionKey() { + Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", PARTITION_VALUE)); + + PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); + partitionKey.partition(record); + + return partitionKey; + } + + @Test + public void testRollingDataWriterNoRecords() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + RollingDataWriter writer = new RollingDataWriter<>( + writerFactory, fileFactory, table.io(), + DEFAULT_FILE_SIZE, table.spec(), partition); + + writer.close(); + Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + + writer.close(); + Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + } + + @Test + public void testRollingDataWriterSplitData() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + RollingDataWriter writer = new RollingDataWriter<>( + writerFactory, fileFactory, table.io(), + SMALL_FILE_SIZE, table.spec(), partition); + + List rows = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR); + for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) { + rows.add(toRow(index, PARTITION_VALUE)); + } + + try (RollingDataWriter closableWriter = writer) { + closableWriter.write(rows); + } + + // call close again to ensure it is idempotent + writer.close(); + + Assert.assertEquals(4, writer.result().dataFiles().size()); + } + + @Test + public void testRollingEqualityDeleteWriterNoRecords() throws IOException { + Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); + + List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); + Schema equalityDeleteRowSchema = table.schema().select("id"); + FileWriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); + RollingEqualityDeleteWriter writer = new RollingEqualityDeleteWriter<>( + writerFactory, fileFactory, table.io(), + DEFAULT_FILE_SIZE, table.spec(), partition); + + writer.close(); + Assert.assertEquals(0, writer.result().deleteFiles().size()); + Assert.assertEquals(0, writer.result().referencedDataFiles().size()); + Assert.assertFalse(writer.result().referencesDataFiles()); + + writer.close(); + Assert.assertEquals(0, writer.result().deleteFiles().size()); + Assert.assertEquals(0, writer.result().referencedDataFiles().size()); + Assert.assertFalse(writer.result().referencesDataFiles()); + } + + @Test + public void testRollingEqualityDeleteWriterSplitDeletes() throws IOException { + List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); + Schema equalityDeleteRowSchema = table.schema().select("id"); + FileWriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); + RollingEqualityDeleteWriter writer = new RollingEqualityDeleteWriter<>( + writerFactory, fileFactory, table.io(), + SMALL_FILE_SIZE, table.spec(), partition); + + List deletes = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR); + for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) { + deletes.add(toRow(index, PARTITION_VALUE)); + } + + try (RollingEqualityDeleteWriter closeableWriter = writer) { + closeableWriter.write(deletes); + } + + // call close again to ensure it is idempotent + writer.close(); + + DeleteWriteResult result = writer.result(); + Assert.assertEquals(4, result.deleteFiles().size()); + Assert.assertEquals(0, result.referencedDataFiles().size()); + Assert.assertFalse(result.referencesDataFiles()); + } + + @Test + public void testRollingPositionDeleteWriterNoRecords() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + RollingPositionDeleteWriter writer = new RollingPositionDeleteWriter<>( + writerFactory, fileFactory, table.io(), + DEFAULT_FILE_SIZE, table.spec(), partition); + + writer.close(); + Assert.assertEquals(0, writer.result().deleteFiles().size()); + Assert.assertEquals(0, writer.result().referencedDataFiles().size()); + Assert.assertFalse(writer.result().referencesDataFiles()); + + writer.close(); + Assert.assertEquals(0, writer.result().deleteFiles().size()); + Assert.assertEquals(0, writer.result().referencedDataFiles().size()); + Assert.assertFalse(writer.result().referencesDataFiles()); + } + + @Test + public void testRollingPositionDeleteWriterSplitDeletes() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + RollingPositionDeleteWriter writer = new RollingPositionDeleteWriter<>( + writerFactory, fileFactory, table.io(), + SMALL_FILE_SIZE, table.spec(), partition); + + List> deletes = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR); + for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) { + deletes.add(new PositionDelete().set("path/to/data/file-1.parquet", index, null)); + } + + try (RollingPositionDeleteWriter closeableWriter = writer) { + closeableWriter.write(deletes); + } + + // call close again to ensure it is idempotent + writer.close(); + + DeleteWriteResult result = writer.result(); + Assert.assertEquals(4, result.deleteFiles().size()); + Assert.assertEquals(1, result.referencedDataFiles().size()); + Assert.assertTrue(result.referencesDataFiles()); + } + + private FileWriterFactory newWriterFactory(Schema dataSchema) { + return newWriterFactory(dataSchema, null, null); + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java new file mode 100644 index 000000000000..a3d62d5bf1c2 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.List; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.TestRollingFileWriters; +import org.apache.iceberg.util.ArrayUtil; + +public class TestFlinkRollingFileWriters extends TestRollingFileWriters { + + public TestFlinkRollingFileWriters(FileFormat fileFormat, boolean partitioned) { + super(fileFormat, partitioned); + } + + @Override + protected FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema) { + return FlinkFileWriterFactory.builderFor(table) + .dataSchema(table.schema()) + .dataFileFormat(format()) + .deleteFileFormat(format()) + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(equalityDeleteRowSchema) + .build(); + } + + @Override + protected RowData toRow(Integer id, String data) { + return SimpleDataUtil.createRowData(id, data); + } +} diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java new file mode 100644 index 000000000000..3ea2353f2cdf --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.List; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.TestRollingFileWriters; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; + +public class TestSparkRollingFileWriters extends TestRollingFileWriters { + + public TestSparkRollingFileWriters(FileFormat fileFormat, boolean partitioned) { + super(fileFormat, partitioned); + } + + @Override + protected FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema) { + return SparkFileWriterFactory.builderFor(table) + .dataSchema(table.schema()) + .dataFileFormat(format()) + .deleteFileFormat(format()) + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(equalityDeleteRowSchema) + .build(); + } + + @Override + protected InternalRow toRow(Integer id, String data) { + InternalRow row = new GenericInternalRow(2); + row.update(0, id); + row.update(1, UTF8String.fromString(data)); + return row; + } +}