diff --git a/build.gradle b/build.gradle index cecf8b7e0bed..a56a03e352ef 100644 --- a/build.gradle +++ b/build.gradle @@ -115,10 +115,6 @@ subprojects { options.encoding = 'UTF-8' } - ext { - jmhVersion = '1.21' - } - sourceCompatibility = '1.8' targetCompatibility = '1.8' diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java index 4cd2d319c766..cac04ca2eba0 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java @@ -22,7 +22,7 @@ import org.apache.iceberg.StructLike; public class PositionDelete implements StructLike { - static PositionDelete create() { + public static PositionDelete create() { return new PositionDelete<>(); } @@ -30,6 +30,9 @@ static PositionDelete create() { private long pos; private R row; + private PositionDelete() { + } + public PositionDelete set(CharSequence newPath, long newPos, R newRow) { this.path = newPath; this.pos = newPos; diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java new file mode 100644 index 000000000000..a6982cdb8153 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A data writer capable of writing to multiple specs and partitions that requires the incoming records + * to be properly clustered by partition spec and by partition within each spec. + */ +public class ClusteredDataWriter extends ClusteredWriter { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List dataFiles; + + public ClusteredDataWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.dataFiles = Lists.newArrayList(); + } + + @Override + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { + // TODO: support ORC rolling writers + if (fileFormat == FileFormat.ORC) { + EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); + return writerFactory.newDataWriter(outputFile, spec, partition); + } else { + return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + } + + @Override + protected void addResult(DataWriteResult result) { + dataFiles.addAll(result.dataFiles()); + } + + @Override + protected DataWriteResult aggregatedResult() { + return new DataWriteResult(dataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java new file mode 100644 index 000000000000..385d1a5d6200 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * An equality delete writer capable of writing to multiple specs and partitions that requires + * the incoming delete records to be properly clustered by partition spec and by partition within each spec. + */ +public class ClusteredEqualityDeleteWriter extends ClusteredWriter { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List deleteFiles; + + public ClusteredEqualityDeleteWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.deleteFiles = Lists.newArrayList(); + } + + @Override + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { + // TODO: support ORC rolling writers + if (fileFormat == FileFormat.ORC) { + EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); + return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition); + } else { + return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + } + + @Override + protected void addResult(DeleteWriteResult result) { + Preconditions.checkArgument(!result.referencesDataFiles(), "Equality deletes cannot reference data files"); + deleteFiles.addAll(result.deleteFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java new file mode 100644 index 000000000000..ea118388b3ba --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +/** + * A position delete writer capable of writing to multiple specs and partitions that requires + * the incoming delete records to be properly clustered by partition spec and by partition within each spec. + */ +public class ClusteredPositionDeleteWriter extends ClusteredWriter, DeleteWriteResult> { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public ClusteredPositionDeleteWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.deleteFiles = Lists.newArrayList(); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + @Override + protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + // TODO: support ORC rolling writers + if (fileFormat == FileFormat.ORC) { + EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); + return writerFactory.newPositionDeleteWriter(outputFile, spec, partition); + } else { + return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + } + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java new file mode 100644 index 000000000000..8729fd1c503e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Set; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.StructLikeSet; + +/** + * A writer capable of writing to multiple specs and partitions that requires the incoming records + * to be clustered by partition spec and by partition within each spec. + *

+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce + * the memory consumption. Prefer using this writer whenever the incoming records can be clustered + * by spec/partition. + */ +abstract class ClusteredWriter implements PartitioningWriter { + + private static final String NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE = + "Incoming records violate the writer assumption that records are clustered by spec and " + + "by partition within each spec. Either cluster the incoming records or switch to fanout writers.\n" + + "Encountered records that belong to already closed files:\n"; + + private final Set completedSpecIds = Sets.newHashSet(); + + private PartitionSpec currentSpec = null; + private Comparator partitionComparator = null; + private Set completedPartitions = null; + private StructLike currentPartition = null; + private FileWriter currentWriter = null; + + private boolean closed = false; + + protected abstract FileWriter newWriter(PartitionSpec spec, StructLike partition); + + protected abstract void addResult(R result); + + protected abstract R aggregatedResult(); + + @Override + public void write(T row, PartitionSpec spec, StructLike partition) throws IOException { + if (!spec.equals(currentSpec)) { + if (currentSpec != null) { + closeCurrentWriter(); + completedSpecIds.add(currentSpec.specId()); + completedPartitions.clear(); + } + + if (completedSpecIds.contains(spec.specId())) { + String errorCtx = String.format("spec %s", spec); + throw new IllegalStateException(NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE + errorCtx); + } + + StructType partitionType = spec.partitionType(); + + this.currentSpec = spec; + this.partitionComparator = Comparators.forType(partitionType); + this.completedPartitions = StructLikeSet.create(partitionType); + // copy the partition key as the key object may be reused + this.currentPartition = StructCopy.copy(partition); + this.currentWriter = newWriter(currentSpec, currentPartition); + + } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) { + closeCurrentWriter(); + completedPartitions.add(currentPartition); + + if (completedPartitions.contains(partition)) { + String errorCtx = String.format("partition '%s' in spec %s", spec.partitionToPath(partition), spec); + throw new IllegalStateException(NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE + errorCtx); + } + + // copy the partition key as the key object may be reused + this.currentPartition = StructCopy.copy(partition); + this.currentWriter = newWriter(currentSpec, currentPartition); + } + + currentWriter.write(row); + } + + @Override + public void close() throws IOException { + if (!closed) { + closeCurrentWriter(); + this.closed = true; + } + } + + private void closeCurrentWriter() throws IOException { + if (currentWriter != null) { + currentWriter.close(); + + addResult(currentWriter.result()); + + this.currentWriter = null; + } + } + + @Override + public final R result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return aggregatedResult(); + } + + protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) { + Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, + "Partition must not be null when creating output file for partitioned spec"); + return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java new file mode 100644 index 000000000000..d6e16a707b36 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A data writer capable of writing to multiple specs and partitions that keeps data writers for each + * seen spec/partition pair open until this writer is closed. + */ +public class FanoutDataWriter extends FanoutWriter { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List dataFiles; + + public FanoutDataWriter(FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.dataFiles = Lists.newArrayList(); + } + + @Override + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { + // TODO: support ORC rolling writers + if (fileFormat == FileFormat.ORC) { + EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); + return writerFactory.newDataWriter(outputFile, spec, partition); + } else { + return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + } + + @Override + protected void addResult(DataWriteResult result) { + dataFiles.addAll(result.dataFiles()); + } + + @Override + protected DataWriteResult aggregatedResult() { + return new DataWriteResult(dataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java new file mode 100644 index 000000000000..122a25df27e8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.StructLikeMap; + +/** + * A writer capable of writing to multiple specs and partitions that keeps files for each + * seen spec/partition pair open until this writer is closed. + *

+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records + * to be clustered by partition spec and partition as all files are kept open. As a consequence, + * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}. + * Use this writer only when clustering by spec/partition is not possible (e.g. streaming). + */ +abstract class FanoutWriter implements PartitioningWriter { + + private final Map>> writers = Maps.newHashMap(); + private boolean closed = false; + + protected abstract FileWriter newWriter(PartitionSpec spec, StructLike partition); + + protected abstract void addResult(R result); + + protected abstract R aggregatedResult(); + + @Override + public void write(T row, PartitionSpec spec, StructLike partition) throws IOException { + FileWriter writer = writer(spec, partition); + writer.write(row); + } + + private FileWriter writer(PartitionSpec spec, StructLike partition) { + Map> specWriters = writers.computeIfAbsent( + spec.specId(), + id -> StructLikeMap.create(spec.partitionType())); + FileWriter writer = specWriters.get(partition); + + if (writer == null) { + // copy the partition key as the key object may be reused + StructLike copiedPartition = StructCopy.copy(partition); + writer = newWriter(spec, copiedPartition); + specWriters.put(copiedPartition, writer); + } + + return writer; + } + + @Override + public void close() throws IOException { + if (!closed) { + closeWriters(); + this.closed = true; + } + } + + private void closeWriters() throws IOException { + for (Map> specWriters : writers.values()) { + for (FileWriter writer : specWriters.values()) { + writer.close(); + addResult(writer.result()); + } + + specWriters.clear(); + } + + writers.clear(); + } + + @Override + public final R result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return aggregatedResult(); + } + + protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) { + Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, + "Partition must not be null when creating output file for partitioned spec"); + return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java new file mode 100644 index 000000000000..329e68cc60f3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; + +/** + * A writer capable of writing files of a single type (i.e. data/delete) to multiple specs and partitions. + *

+ * As opposed to {@link FileWriter}, this interface should be implemented by writers that are not + * limited to writing to a single spec/partition. Implementations may internally use {@link FileWriter}s + * for writing to a single spec/partition. + *

+ * Note that this writer can be used both for partitioned and unpartitioned tables. + * + * @param the row type + * @param the result type + */ +public interface PartitioningWriter extends Closeable { + + /** + * Writes a row to the provided spec/partition. + * + * @param row a data or delete record + * @param spec a partition spec + * @param partition a partition or null if the spec is unpartitioned + * @throws IOException in case of an error during the write process + */ + void write(T row, PartitionSpec spec, StructLike partition) throws IOException; + + /** + * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. + * The result is valid only after the writer is closed. + * + * @return the writer result + */ + R result(); +} diff --git a/core/src/main/java/org/apache/iceberg/io/StructCopy.java b/core/src/main/java/org/apache/iceberg/io/StructCopy.java index 6cf461d64a80..7d4373348ccb 100644 --- a/core/src/main/java/org/apache/iceberg/io/StructCopy.java +++ b/core/src/main/java/org/apache/iceberg/io/StructCopy.java @@ -26,7 +26,7 @@ */ class StructCopy implements StructLike { static StructLike copy(StructLike struct) { - return new StructCopy(struct); + return struct != null ? new StructCopy(struct) : null; } private final Object[] values; diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 9ef28b9d4d3b..7325cfe6032e 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.UUID; import java.util.stream.LongStream; +import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -454,6 +455,11 @@ protected DeleteFile newDeleteFile(int specId, String partitionPath) { .build(); } + protected PositionDelete positionDelete(CharSequence path, long pos, T row) { + PositionDelete positionDelete = PositionDelete.create(); + return positionDelete.set(path, pos, row); + } + static void validateManifestEntries(ManifestFile manifest, Iterator ids, Iterator expectedFiles, diff --git a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java index 7df402e61f5e..0a88cbcea193 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java +++ b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java @@ -273,9 +273,9 @@ public void testPosDeleteWriterWithRowSchema() throws IOException { DataFile dataFile = prepareDataFile(rowSet, appenderFactory); List> deletes = Lists.newArrayList( - new PositionDelete().set(dataFile.path(), 0, rowSet.get(0)), - new PositionDelete().set(dataFile.path(), 2, rowSet.get(2)), - new PositionDelete().set(dataFile.path(), 4, rowSet.get(4)) + positionDelete(dataFile.path(), 0, rowSet.get(0)), + positionDelete(dataFile.path(), 2, rowSet.get(2)), + positionDelete(dataFile.path(), 4, rowSet.get(4)) ); EncryptedOutputFile out = createEncryptedOutputFile(); diff --git a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java index 734e5d3da8de..c3c46916e74a 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java +++ b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java @@ -26,14 +26,11 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.TableTestBase; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -59,7 +56,7 @@ import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME; @RunWith(Parameterized.class) -public abstract class TestFileWriterFactory extends TableTestBase { +public abstract class TestFileWriterFactory extends WriterTestBase { @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}") public static Object[] parameters() { return new Object[][] { @@ -73,6 +70,7 @@ public static Object[] parameters() { } private static final int TABLE_FORMAT_VERSION = 2; + private static final String PARTITION_VALUE = "aaa"; private final FileFormat fileFormat; private final boolean partitioned; @@ -94,12 +92,6 @@ public TestFileWriterFactory(FileFormat fileFormat, boolean partitioned) { ); } - protected abstract FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, - Schema equalityDeleteRowSchema, - Schema positionDeleteRowSchema); - - protected abstract T toRow(Integer id, String data); - protected abstract StructLikeSet toSet(Iterable records); protected FileFormat format() { @@ -115,7 +107,7 @@ public void setupTable() throws Exception { if (partitioned) { this.table = create(SCHEMA, SPEC); - this.partition = initPartitionKey(); + this.partition = partitionKey(table.spec(), PARTITION_VALUE); } else { this.table = create(SCHEMA, PartitionSpec.unpartitioned()); this.partition = null; @@ -222,7 +214,7 @@ public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException { .addField("data") .commit(); - partition = initPartitionKey(); + partition = partitionKey(table.spec(), PARTITION_VALUE); // write a partitioned data file DataFile secondDataFile = writeData(writerFactory, dataRows, table.spec(), partition); @@ -259,9 +251,9 @@ public void testPositionDeleteWriter() throws IOException { // write a position delete file List> deletes = ImmutableList.of( - new PositionDelete().set(dataFile.path(), 0L, null), - new PositionDelete().set(dataFile.path(), 2L, null), - new PositionDelete().set(dataFile.path(), 4L, null) + positionDelete(dataFile.path(), 0L, null), + positionDelete(dataFile.path(), 2L, null), + positionDelete(dataFile.path(), 4L, null) ); Pair result = writePositionDeletes(writerFactory, deletes, table.spec(), partition); DeleteFile deleteFile = result.first(); @@ -305,7 +297,7 @@ public void testPositionDeleteWriterWithRow() throws IOException { // write a position delete file and persist the deleted row List> deletes = ImmutableList.of( - new PositionDelete().set(dataFile.path(), 0, dataRows.get(0)) + positionDelete(dataFile.path(), 0, dataRows.get(0)) ); Pair result = writePositionDeletes(writerFactory, deletes, table.spec(), partition); DeleteFile deleteFile = result.first(); @@ -343,28 +335,6 @@ public void testPositionDeleteWriterWithRow() throws IOException { Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); } - private PartitionKey initPartitionKey() { - Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa")); - - PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); - partitionKey.partition(record); - - return partitionKey; - } - - private FileWriterFactory newWriterFactory(Schema dataSchema) { - return newWriterFactory(dataSchema, null, null, null); - } - - private FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, - Schema equalityDeleteRowSchema) { - return newWriterFactory(dataSchema, equalityFieldIds, equalityDeleteRowSchema, null); - } - - private FileWriterFactory newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) { - return newWriterFactory(dataSchema, null, null, positionDeleteRowSchema); - } - private DataFile writeData(FileWriterFactory writerFactory, List rows, PartitionSpec spec, StructLike partitionKey) throws IOException { @@ -435,14 +405,6 @@ private List readFile(Schema schema, InputFile inputFile) throws IOExcep } } - private StructLikeSet actualRowSet(String... columns) throws IOException { - StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); - try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { - reader.forEach(set::add); - } - return set; - } - private EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partitionKey) { return fileFactory.newOutputFile(spec, partitionKey); } diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java new file mode 100644 index 000000000000..b01ccc5af621 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public abstract class TestPartitioningWriters extends WriterTestBase { + + @Parameterized.Parameters(name = "FileFormat={0}") + public static Object[] parameters() { + return new Object[][] { + new Object[]{FileFormat.AVRO}, + new Object[]{FileFormat.PARQUET}, + new Object[]{FileFormat.ORC}, + }; + } + + private static final int TABLE_FORMAT_VERSION = 2; + private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024; + + private final FileFormat fileFormat; + private OutputFileFactory fileFactory = null; + + public TestPartitioningWriters(FileFormat fileFormat) { + super(TABLE_FORMAT_VERSION); + this.fileFormat = fileFormat; + } + + protected abstract StructLikeSet toSet(Iterable records); + + protected FileFormat format() { + return fileFormat; + } + + @Before + public void setupTable() throws Exception { + this.tableDir = temp.newFolder(); + Assert.assertTrue(tableDir.delete()); // created during table creation + + this.metadataDir = new File(tableDir, "metadata"); + this.table = create(SCHEMA, PartitionSpec.unpartitioned()); + this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build(); + } + + @Test + public void testClusteredDataWriterNoRecords() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + ClusteredDataWriter writer = new ClusteredDataWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + + writer.close(); + Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + + writer.close(); + Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + } + + @Test + public void testClusteredDataWriterMultiplePartitions() throws IOException { + table.updateSpec() + .addField(Expressions.ref("data")) + .commit(); + + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + ClusteredDataWriter writer = new ClusteredDataWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + + PartitionSpec spec = table.spec(); + + writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa")); + writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa")); + writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb")); + writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb")); + writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc")); + + writer.close(); + + DataWriteResult result = writer.result(); + Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size()); + + RowDelta rowDelta = table.newRowDelta(); + result.dataFiles().forEach(rowDelta::addRows); + rowDelta.commit(); + + List expectedRows = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "aaa"), + toRow(3, "bbb"), + toRow(4, "bbb"), + toRow(5, "ccc") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } + + @Test + public void testClusteredDataWriterOutOfOrderPartitions() throws IOException { + table.updateSpec() + .addField(Expressions.ref("data")) + .commit(); + + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + ClusteredDataWriter writer = new ClusteredDataWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + + PartitionSpec spec = table.spec(); + + writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa")); + writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa")); + writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb")); + writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb")); + writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc")); + + AssertHelpers.assertThrows("Should fail to write out of order partitions", + IllegalStateException.class, "Encountered records that belong to already closed files", + () -> { + try { + writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa")); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + writer.close(); + } + + @Test + public void testClusteredEqualityDeleteWriterNoRecords() throws IOException { + Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); + + List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); + Schema equalityDeleteRowSchema = table.schema().select("id"); + FileWriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); + ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + + writer.close(); + Assert.assertEquals(0, writer.result().deleteFiles().size()); + Assert.assertEquals(0, writer.result().referencedDataFiles().size()); + Assert.assertFalse(writer.result().referencesDataFiles()); + + writer.close(); + Assert.assertEquals(0, writer.result().deleteFiles().size()); + Assert.assertEquals(0, writer.result().referencedDataFiles().size()); + Assert.assertFalse(writer.result().referencesDataFiles()); + } + + @Test + public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException { + Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); + + List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); + Schema equalityDeleteRowSchema = table.schema().select("id"); + FileWriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); + + // add an unpartitioned data file + ImmutableList rows1 = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "aaa"), + toRow(11, "aaa") + ); + DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null); + table.newFastAppend() + .appendFile(dataFile1) + .commit(); + + // partition by bucket + table.updateSpec() + .addField(Expressions.bucket("data", 16)) + .commit(); + + // add a data file partitioned by bucket + ImmutableList rows2 = ImmutableList.of( + toRow(3, "bbb"), + toRow(4, "bbb"), + toRow(12, "bbb") + ); + DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), partitionKey(table.spec(), "bbb")); + table.newFastAppend() + .appendFile(dataFile2) + .commit(); + + // partition by data + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + // add a data file partitioned by data + ImmutableList rows3 = ImmutableList.of( + toRow(5, "ccc"), + toRow(13, "ccc") + ); + DataFile dataFile3 = writeData(writerFactory, fileFactory, rows3, table.spec(), partitionKey(table.spec(), "ccc")); + table.newFastAppend() + .appendFile(dataFile3) + .commit(); + + ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + + PartitionSpec unpartitionedSpec = table.specs().get(0); + PartitionSpec bucketSpec = table.specs().get(1); + PartitionSpec identitySpec = table.specs().get(2); + + writer.write(toRow(1, "aaa"), unpartitionedSpec, null); + writer.write(toRow(2, "aaa"), unpartitionedSpec, null); + writer.write(toRow(3, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb")); + writer.write(toRow(4, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb")); + writer.write(toRow(5, "ccc"), identitySpec, partitionKey(identitySpec, "ccc")); + + writer.close(); + + DeleteWriteResult result = writer.result(); + Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size()); + Assert.assertEquals("Must not reference data files", 0, writer.result().referencedDataFiles().size()); + Assert.assertFalse("Must not reference data files", writer.result().referencesDataFiles()); + + RowDelta rowDelta = table.newRowDelta(); + result.deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.commit(); + + List expectedRows = ImmutableList.of( + toRow(11, "aaa"), + toRow(12, "bbb"), + toRow(13, "ccc") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } + + @Test + public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() throws IOException { + Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); + + List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); + Schema equalityDeleteRowSchema = table.schema().select("id"); + FileWriterFactory writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema); + + table.updateSpec() + .addField(Expressions.bucket("data", 16)) + .commit(); + + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + + PartitionSpec unpartitionedSpec = table.specs().get(0); + PartitionSpec bucketSpec = table.specs().get(1); + PartitionSpec identitySpec = table.specs().get(2); + + writer.write(toRow(1, "aaa"), unpartitionedSpec, null); + writer.write(toRow(2, "aaa"), unpartitionedSpec, null); + writer.write(toRow(3, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb")); + writer.write(toRow(4, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb")); + writer.write(toRow(5, "ccc"), identitySpec, partitionKey(identitySpec, "ccc")); + writer.write(toRow(6, "ddd"), identitySpec, partitionKey(identitySpec, "ddd")); + + AssertHelpers.assertThrows("Should fail to write out of order partitions", + IllegalStateException.class, "Encountered records that belong to already closed files", + () -> { + try { + writer.write(toRow(7, "ccc"), identitySpec, partitionKey(identitySpec, "ccc")); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + AssertHelpers.assertThrows("Should fail to write out of order specs", + IllegalStateException.class, "Encountered records that belong to already closed files", + () -> { + try { + writer.write(toRow(7, "aaa"), unpartitionedSpec, null); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + writer.close(); + } + + @Test + public void testClusteredPositionDeleteWriterNoRecords() throws IOException { + Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); + + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + + writer.close(); + Assert.assertEquals(0, writer.result().deleteFiles().size()); + Assert.assertEquals(0, writer.result().referencedDataFiles().size()); + Assert.assertFalse(writer.result().referencesDataFiles()); + + writer.close(); + Assert.assertEquals(0, writer.result().deleteFiles().size()); + Assert.assertEquals(0, writer.result().referencedDataFiles().size()); + Assert.assertFalse(writer.result().referencesDataFiles()); + } + + @Test + public void testClusteredPositionDeleteWriterMultipleSpecs() throws IOException { + Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); + + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + + // add an unpartitioned data file + ImmutableList rows1 = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "aaa"), + toRow(11, "aaa") + ); + DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null); + table.newFastAppend() + .appendFile(dataFile1) + .commit(); + + // partition by bucket + table.updateSpec() + .addField(Expressions.bucket("data", 16)) + .commit(); + + // add a data file partitioned by bucket + ImmutableList rows2 = ImmutableList.of( + toRow(3, "bbb"), + toRow(4, "bbb"), + toRow(12, "bbb") + ); + DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), partitionKey(table.spec(), "bbb")); + table.newFastAppend() + .appendFile(dataFile2) + .commit(); + + // partition by data + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + // add a data file partitioned by data + ImmutableList rows3 = ImmutableList.of( + toRow(5, "ccc"), + toRow(13, "ccc") + ); + DataFile dataFile3 = writeData(writerFactory, fileFactory, rows3, table.spec(), partitionKey(table.spec(), "ccc")); + table.newFastAppend() + .appendFile(dataFile3) + .commit(); + + ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + + PartitionSpec unpartitionedSpec = table.specs().get(0); + PartitionSpec bucketSpec = table.specs().get(1); + PartitionSpec identitySpec = table.specs().get(2); + + writer.write(positionDelete(dataFile1.path(), 0L, null), unpartitionedSpec, null); + writer.write(positionDelete(dataFile1.path(), 1L, null), unpartitionedSpec, null); + writer.write(positionDelete(dataFile2.path(), 0L, null), bucketSpec, partitionKey(bucketSpec, "bbb")); + writer.write(positionDelete(dataFile2.path(), 1L, null), bucketSpec, partitionKey(bucketSpec, "bbb")); + writer.write(positionDelete(dataFile3.path(), 0L, null), identitySpec, partitionKey(identitySpec, "ccc")); + + writer.close(); + + DeleteWriteResult result = writer.result(); + Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size()); + Assert.assertEquals("Must reference 3 data files", 3, writer.result().referencedDataFiles().size()); + Assert.assertTrue("Must reference data files", writer.result().referencesDataFiles()); + + RowDelta rowDelta = table.newRowDelta(); + result.deleteFiles().forEach(rowDelta::addDeletes); + rowDelta.commit(); + + List expectedRows = ImmutableList.of( + toRow(11, "aaa"), + toRow(12, "bbb"), + toRow(13, "ccc") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } + + @Test + public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() throws IOException { + Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); + + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + + table.updateSpec() + .addField(Expressions.bucket("data", 16)) + .commit(); + + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + + PartitionSpec unpartitionedSpec = table.specs().get(0); + PartitionSpec bucketSpec = table.specs().get(1); + PartitionSpec identitySpec = table.specs().get(2); + + writer.write(positionDelete("file-1.parquet", 0L, null), unpartitionedSpec, null); + writer.write(positionDelete("file-1.parquet", 1L, null), unpartitionedSpec, null); + writer.write(positionDelete("file-2.parquet", 0L, null), bucketSpec, partitionKey(bucketSpec, "bbb")); + writer.write(positionDelete("file-2.parquet", 1L, null), bucketSpec, partitionKey(bucketSpec, "bbb")); + writer.write(positionDelete("file-3.parquet", 0L, null), identitySpec, partitionKey(identitySpec, "ccc")); + writer.write(positionDelete("file-4.parquet", 0L, null), identitySpec, partitionKey(identitySpec, "ddd")); + + AssertHelpers.assertThrows("Should fail to write out of order partitions", + IllegalStateException.class, "Encountered records that belong to already closed files", + () -> { + try { + PositionDelete positionDelete = positionDelete("file-5.parquet", 1L, null); + writer.write(positionDelete, identitySpec, partitionKey(identitySpec, "ccc")); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + AssertHelpers.assertThrows("Should fail to write out of order specs", + IllegalStateException.class, "Encountered records that belong to already closed files", + () -> { + try { + PositionDelete positionDelete = positionDelete("file-1.parquet", 3L, null); + writer.write(positionDelete, unpartitionedSpec, null); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + writer.close(); + } + + @Test + public void testFanoutDataWriterNoRecords() throws IOException { + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + FanoutDataWriter writer = new FanoutDataWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + + writer.close(); + Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + + writer.close(); + Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size()); + } + + @Test + public void testFanoutDataWriterMultiplePartitions() throws IOException { + table.updateSpec() + .addField(Expressions.ref("data")) + .commit(); + + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + FanoutDataWriter writer = new FanoutDataWriter<>( + writerFactory, fileFactory, table.io(), + fileFormat, TARGET_FILE_SIZE); + + PartitionSpec spec = table.spec(); + + writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa")); + writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb")); + writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa")); + writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb")); + writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc")); + + writer.close(); + + DataWriteResult result = writer.result(); + Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size()); + + RowDelta rowDelta = table.newRowDelta(); + result.dataFiles().forEach(rowDelta::addRows); + rowDelta.commit(); + + List expectedRows = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "aaa"), + toRow(3, "bbb"), + toRow(4, "bbb"), + toRow(5, "ccc") + ); + Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*")); + } +} diff --git a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java index e11e0a7e60dc..a62ac3e7aa3e 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java @@ -23,16 +23,11 @@ import java.io.IOException; import java.util.List; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.TableTestBase; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Assume; @@ -42,7 +37,7 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) -public abstract class TestRollingFileWriters extends TableTestBase { +public abstract class TestRollingFileWriters extends WriterTestBase { // TODO: add ORC once we support ORC rolling file writers @@ -73,11 +68,6 @@ public TestRollingFileWriters(FileFormat fileFormat, boolean partitioned) { this.partitioned = partitioned; } - protected abstract FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, - Schema equalityDeleteRowSchema); - - protected abstract T toRow(Integer id, String data); - protected FileFormat format() { return fileFormat; } @@ -91,7 +81,7 @@ public void setupTable() throws Exception { if (partitioned) { this.table = create(SCHEMA, SPEC); - this.partition = initPartitionKey(); + this.partition = partitionKey(table.spec(), PARTITION_VALUE); } else { this.table = create(SCHEMA, PartitionSpec.unpartitioned()); this.partition = null; @@ -100,15 +90,6 @@ public void setupTable() throws Exception { this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build(); } - private PartitionKey initPartitionKey() { - Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", PARTITION_VALUE)); - - PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); - partitionKey.partition(record); - - return partitionKey; - } - @Test public void testRollingDataWriterNoRecords() throws IOException { FileWriterFactory writerFactory = newWriterFactory(table.schema()); @@ -221,7 +202,7 @@ public void testRollingPositionDeleteWriterSplitDeletes() throws IOException { List> deletes = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR); for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) { - deletes.add(new PositionDelete().set("path/to/data/file-1.parquet", index, null)); + deletes.add(positionDelete("path/to/data/file-1.parquet", index, null)); } try (RollingPositionDeleteWriter closeableWriter = writer) { @@ -236,8 +217,4 @@ public void testRollingPositionDeleteWriterSplitDeletes() throws IOException { Assert.assertEquals(1, result.referencedDataFiles().size()); Assert.assertTrue(result.referencesDataFiles()); } - - private FileWriterFactory newWriterFactory(Schema dataSchema) { - return newWriterFactory(dataSchema, null, null); - } } diff --git a/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java b/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java new file mode 100644 index 000000000000..661ab642f516 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.StructLikeSet; + +public abstract class WriterTestBase extends TableTestBase { + + public WriterTestBase(int formatVersion) { + super(formatVersion); + } + + protected abstract FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema); + + protected FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema) { + return newWriterFactory(dataSchema, equalityFieldIds, equalityDeleteRowSchema, null); + } + + protected FileWriterFactory newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) { + return newWriterFactory(dataSchema, null, null, positionDeleteRowSchema); + } + + protected FileWriterFactory newWriterFactory(Schema dataSchema) { + return newWriterFactory(dataSchema, null, null, null); + } + + protected abstract T toRow(Integer id, String data); + + protected PartitionKey partitionKey(PartitionSpec spec, String value) { + Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", value)); + + PartitionKey partitionKey = new PartitionKey(spec, table.schema()); + partitionKey.partition(record); + + return partitionKey; + } + + protected StructLikeSet actualRowSet(String... columns) throws IOException { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { + reader.forEach(set::add); + } + return set; + } + + protected DataFile writeData(FileWriterFactory writerFactory, OutputFileFactory fileFactory, + List rows, PartitionSpec spec, StructLike partitionKey) throws IOException { + + EncryptedOutputFile file = fileFactory.newOutputFile(spec, partitionKey); + DataWriter writer = writerFactory.newDataWriter(file, spec, partitionKey); + + try (DataWriter closeableWriter = writer) { + for (T row : rows) { + closeableWriter.write(row); + } + } + + return writer.toDataFile(); + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java new file mode 100644 index 000000000000..934b5a0d75de --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.List; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.TestPartitioningWriters; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; + +public class TestFlinkPartitioningWriters extends TestPartitioningWriters { + + public TestFlinkPartitioningWriters(FileFormat fileFormat) { + super(fileFormat); + } + + @Override + protected FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema) { + return FlinkFileWriterFactory.builderFor(table) + .dataSchema(table.schema()) + .dataFileFormat(format()) + .deleteFileFormat(format()) + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(equalityDeleteRowSchema) + .positionDeleteRowSchema(positionDeleteRowSchema) + .build(); + } + + @Override + protected RowData toRow(Integer id, String data) { + return SimpleDataUtil.createRowData(id, data); + } + + @Override + protected StructLikeSet toSet(Iterable rows) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + RowType flinkType = FlinkSchemaUtil.convert(table.schema()); + for (RowData row : rows) { + RowDataWrapper wrapper = new RowDataWrapper(flinkType, table.schema().asStruct()); + set.add(wrapper.wrap(row)); + } + return set; + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java index a3d62d5bf1c2..9339e5ac2c3e 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java @@ -36,13 +36,15 @@ public TestFlinkRollingFileWriters(FileFormat fileFormat, boolean partitioned) { @Override protected FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, - Schema equalityDeleteRowSchema) { + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema) { return FlinkFileWriterFactory.builderFor(table) .dataSchema(table.schema()) .dataFileFormat(format()) .deleteFileFormat(format()) .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) .equalityDeleteRowSchema(equalityDeleteRowSchema) + .positionDeleteRowSchema(positionDeleteRowSchema) .build(); } diff --git a/jmh.gradle b/jmh.gradle index 543b07f08635..edcb39c795fb 100644 --- a/jmh.gradle +++ b/jmh.gradle @@ -30,7 +30,7 @@ configure(jmhProjects) { apply plugin: 'me.champeau.gradle.jmh' jmh { - jmhVersion = jmhVersion + jmhVersion = '1.32' failOnError = true forceGC = true includeTests = true diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java new file mode 100644 index 000000000000..a8521de0e0ba --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.ClusteredDataWriter; +import org.apache.iceberg.io.ClusteredEqualityDeleteWriter; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FanoutDataWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.infra.Blackhole; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public abstract class WritersBenchmark extends IcebergSourceBenchmark { + + private static final int NUM_ROWS = 2500000; + private static final long TARGET_FILE_SIZE_IN_BYTES = 50L * 1024 * 1024; + + private static final Schema SCHEMA = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(5, "decimalCol", Types.DecimalType.of(20, 5)), + optional(6, "timestampCol", Types.TimestampType.withZone()), + optional(7, "stringCol", Types.StringType.get()) + ); + + private Iterable rows; + private Iterable positionDeleteRows; + private PartitionSpec unpartitionedSpec; + private PartitionSpec partitionedSpec; + + protected abstract FileFormat fileFormat(); + + @Setup + public void setupBenchmark() { + setupSpark(); + + List data = Lists.newArrayList(RandomData.generateSpark(SCHEMA, NUM_ROWS, 0L)); + Transform transform = Transforms.bucket(Types.IntegerType.get(), 32); + data.sort(Comparator.comparingInt(row -> transform.apply(row.getInt(1)))); + this.rows = data; + + this.positionDeleteRows = RandomData.generateSpark(DeleteSchemaUtil.pathPosSchema(), NUM_ROWS, 0L); + + this.unpartitionedSpec = table().specs().get(0); + Preconditions.checkArgument(unpartitionedSpec.isUnpartitioned()); + this.partitionedSpec = table().specs().get(1); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Override + protected Configuration initHadoopConf() { + return new Configuration(); + } + + @Override + protected final Table initTable() { + HadoopTables tables = new HadoopTables(hadoopConf()); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map properties = Maps.newHashMap(); + Table table = tables.create(SCHEMA, spec, properties, newTableLocation()); + + // add a partitioned spec to the table + table.updateSpec() + .addField(Expressions.bucket("intCol", 32)) + .commit(); + + return table; + } + + @Benchmark + @Threads(1) + public void writeUnpartitionedClusteredDataWriter(Blackhole blackhole) throws IOException { + FileIO io = table().io(); + + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) + .dataFileFormat(fileFormat()) + .dataSchema(table().schema()) + .build(); + + ClusteredDataWriter writer = new ClusteredDataWriter<>( + writerFactory, fileFactory, io, + fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + + try (ClusteredDataWriter closeableWriter = writer) { + for (InternalRow row : rows) { + closeableWriter.write(row, unpartitionedSpec, null); + } + } + + blackhole.consume(writer); + } + + @Benchmark + @Threads(1) + public void writeUnpartitionedLegacyDataWriter(Blackhole blackhole) throws IOException { + FileIO io = table().io(); + + OutputFileFactory fileFactory = newFileFactory(); + + Schema writeSchema = table().schema(); + StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema); + SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType) + .spec(unpartitionedSpec) + .build(); + + TaskWriter writer = new UnpartitionedWriter<>( + unpartitionedSpec, fileFormat(), appenders, + fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); + + try (TaskWriter closableWriter = writer) { + for (InternalRow row : rows) { + closableWriter.write(row); + } + } + + blackhole.consume(writer.complete()); + } + + @Benchmark + @Threads(1) + public void writePartitionedClusteredDataWriter(Blackhole blackhole) throws IOException { + FileIO io = table().io(); + + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) + .dataFileFormat(fileFormat()) + .dataSchema(table().schema()) + .build(); + + ClusteredDataWriter writer = new ClusteredDataWriter<>( + writerFactory, fileFactory, io, + fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + + PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); + StructType dataSparkType = SparkSchemaUtil.convert(table().schema()); + InternalRowWrapper internalRowWrapper = new InternalRowWrapper(dataSparkType); + + try (ClusteredDataWriter closeableWriter = writer) { + for (InternalRow row : rows) { + partitionKey.partition(internalRowWrapper.wrap(row)); + closeableWriter.write(row, partitionedSpec, partitionKey); + } + } + + blackhole.consume(writer); + } + + @Benchmark + @Threads(1) + public void writePartitionedLegacyDataWriter(Blackhole blackhole) throws IOException { + FileIO io = table().io(); + + OutputFileFactory fileFactory = newFileFactory(); + + Schema writeSchema = table().schema(); + StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema); + SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType) + .spec(partitionedSpec) + .build(); + + TaskWriter writer = new SparkPartitionedWriter( + partitionedSpec, fileFormat(), appenders, + fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, + writeSchema, sparkWriteType); + + try (TaskWriter closableWriter = writer) { + for (InternalRow row : rows) { + closableWriter.write(row); + } + } + + blackhole.consume(writer.complete()); + } + + @Benchmark + @Threads(1) + public void writePartitionedFanoutDataWriter(Blackhole blackhole) throws IOException { + FileIO io = table().io(); + + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) + .dataFileFormat(fileFormat()) + .dataSchema(table().schema()) + .build(); + + FanoutDataWriter writer = new FanoutDataWriter<>( + writerFactory, fileFactory, io, + fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + + PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); + StructType dataSparkType = SparkSchemaUtil.convert(table().schema()); + InternalRowWrapper internalRowWrapper = new InternalRowWrapper(dataSparkType); + + try (FanoutDataWriter closeableWriter = writer) { + for (InternalRow row : rows) { + partitionKey.partition(internalRowWrapper.wrap(row)); + closeableWriter.write(row, partitionedSpec, partitionKey); + } + } + + blackhole.consume(writer); + } + + @Benchmark + @Threads(1) + public void writePartitionedLegacyFanoutDataWriter(Blackhole blackhole) throws IOException { + FileIO io = table().io(); + + OutputFileFactory fileFactory = newFileFactory(); + + Schema writeSchema = table().schema(); + StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema); + SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType) + .spec(partitionedSpec) + .build(); + + TaskWriter writer = new SparkPartitionedFanoutWriter( + partitionedSpec, fileFormat(), appenders, + fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, + writeSchema, sparkWriteType); + + try (TaskWriter closableWriter = writer) { + for (InternalRow row : rows) { + closableWriter.write(row); + } + } + + blackhole.consume(writer.complete()); + } + + @Benchmark + @Threads(1) + public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole) throws IOException { + FileIO io = table().io(); + + int equalityFieldId = table().schema().findField("longCol").fieldId(); + + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) + .dataFileFormat(fileFormat()) + .equalityDeleteRowSchema(table().schema()) + .equalityFieldIds(new int[]{equalityFieldId}) + .build(); + + ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( + writerFactory, fileFactory, io, + fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + + PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema()); + StructType deleteSparkType = SparkSchemaUtil.convert(table().schema()); + InternalRowWrapper internalRowWrapper = new InternalRowWrapper(deleteSparkType); + + try (ClusteredEqualityDeleteWriter closeableWriter = writer) { + for (InternalRow row : rows) { + partitionKey.partition(internalRowWrapper.wrap(row)); + closeableWriter.write(row, partitionedSpec, partitionKey); + } + } + + blackhole.consume(writer); + } + + @Benchmark + @Threads(1) + public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) throws IOException { + FileIO io = table().io(); + + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) + .dataFileFormat(fileFormat()) + .build(); + + ClusteredPositionDeleteWriter writer = new ClusteredPositionDeleteWriter<>( + writerFactory, fileFactory, io, + fileFormat(), TARGET_FILE_SIZE_IN_BYTES); + + PositionDelete positionDelete = PositionDelete.create(); + try (ClusteredPositionDeleteWriter closeableWriter = writer) { + for (InternalRow row : positionDeleteRows) { + String path = row.getString(0); + long pos = row.getLong(1); + positionDelete.set(path, pos, null); + closeableWriter.write(positionDelete, unpartitionedSpec, null); + } + } + + blackhole.consume(writer); + } + + private OutputFileFactory newFileFactory() { + return OutputFileFactory.builderFor(table(), 1, 1) + .format(fileFormat()) + .build(); + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java new file mode 100644 index 000000000000..5d970d066b74 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.avro; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.WritersBenchmark; + +/** + * A benchmark that evaluates the performance of various Iceberg writers for Avro data. + * + * To run this benchmark for either spark-2 or spark-3: + * + * ./gradlew :iceberg-spark[2|3]:jmh + * -PjmhIncludeRegex=AvroWritersBenchmark + * -PjmhOutputPath=benchmark/avro-writers-benchmark-result.txt + * + */ +public class AvroWritersBenchmark extends WritersBenchmark { + + @Override + protected FileFormat fileFormat() { + return FileFormat.AVRO; + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java new file mode 100644 index 000000000000..ff354bab7825 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.spark.source.WritersBenchmark; + +/** + * A benchmark that evaluates the performance of various Iceberg writers for Parquet data. + * + * To run this benchmark for either spark-2 or spark-3: + * + * ./gradlew :iceberg-spark[2|3]:jmh + * -PjmhIncludeRegex=ParquetWritersBenchmark + * -PjmhOutputPath=benchmark/parquet-writers-benchmark-result.txt + * + */ +public class ParquetWritersBenchmark extends WritersBenchmark { + + @Override + protected FileFormat fileFormat() { + return FileFormat.PARQUET; + } +} diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java new file mode 100644 index 000000000000..4d07cfbe86ea --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.List; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.TestPartitioningWriters; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +public class TestSparkPartitioningWriters extends TestPartitioningWriters { + + public TestSparkPartitioningWriters(FileFormat fileFormat) { + super(fileFormat); + } + + @Override + protected FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema) { + return SparkFileWriterFactory.builderFor(table) + .dataSchema(table.schema()) + .dataFileFormat(format()) + .deleteFileFormat(format()) + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(equalityDeleteRowSchema) + .positionDeleteRowSchema(positionDeleteRowSchema) + .build(); + } + + @Override + protected InternalRow toRow(Integer id, String data) { + InternalRow row = new GenericInternalRow(2); + row.update(0, id); + row.update(1, UTF8String.fromString(data)); + return row; + } + + @Override + protected StructLikeSet toSet(Iterable rows) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + StructType sparkType = SparkSchemaUtil.convert(table.schema()); + for (InternalRow row : rows) { + InternalRowWrapper wrapper = new InternalRowWrapper(sparkType); + set.add(wrapper.wrap(row)); + } + return set; + } +} diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java index 3ea2353f2cdf..9023195dcc6a 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java @@ -37,13 +37,15 @@ public TestSparkRollingFileWriters(FileFormat fileFormat, boolean partitioned) { @Override protected FileWriterFactory newWriterFactory(Schema dataSchema, List equalityFieldIds, - Schema equalityDeleteRowSchema) { + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema) { return SparkFileWriterFactory.builderFor(table) .dataSchema(table.schema()) .dataFileFormat(format()) .deleteFileFormat(format()) .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) .equalityDeleteRowSchema(equalityDeleteRowSchema) + .positionDeleteRowSchema(positionDeleteRowSchema) .build(); }