diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java index b2b102b883ef..9ea1b9cda1aa 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.deletes; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.DeleteFile; @@ -29,10 +28,12 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class EqualityDeleteWriter implements Closeable { +public class EqualityDeleteWriter implements FileWriter { private final FileAppender appender; private final FileFormat format; private final String location; @@ -56,10 +57,17 @@ public EqualityDeleteWriter(FileAppender appender, FileFormat format, String this.equalityFieldIds = equalityFieldIds; } + @Override + public void write(T row) throws IOException { + appender.add(row); + } + + @Deprecated public void deleteAll(Iterable rows) { appender.addAll(rows); } + @Deprecated public void delete(T row) { appender.add(row); } @@ -89,4 +97,9 @@ public DeleteFile toDeleteFile() { Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer"); return deleteFile; } + + @Override + public DeleteWriteResult result() { + return new DeleteWriteResult(toDeleteFile()); + } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index 8c5eecfb924e..12e8b73b8412 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.deletes; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.DeleteFile; @@ -28,11 +27,13 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.CharSequenceSet; -public class PositionDeleteWriter implements Closeable { +public class PositionDeleteWriter implements FileWriter, DeleteWriteResult> { private final FileAppender appender; private final FileFormat format; private final String location; @@ -55,15 +56,27 @@ public PositionDeleteWriter(FileAppender appender, FileFormat format this.pathSet = CharSequenceSet.empty(); } + @Override + public void write(PositionDelete positionDelete) throws IOException { + pathSet.add(positionDelete.path()); + appender.add(positionDelete); + } + + @Deprecated public void delete(CharSequence path, long pos) { delete(path, pos, null); } + @Deprecated public void delete(CharSequence path, long pos, T row) { pathSet.add(path); appender.add(delete.set(path, pos, row)); } + public long length() { + return appender.length(); + } + @Override public void close() throws IOException { if (deleteFile == null) { @@ -88,4 +101,9 @@ public DeleteFile toDeleteFile() { Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer"); return deleteFile; } + + @Override + public DeleteWriteResult result() { + return new DeleteWriteResult(toDeleteFile(), referencedDataFiles()); + } } diff --git a/core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java new file mode 100644 index 000000000000..472f3c303b8b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +abstract class BaseDeltaWriter implements DeltaWriter { + + private final List dataFiles = Lists.newArrayList(); + private final List deleteFiles = Lists.newArrayList(); + private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); + + private boolean closed = false; + + @Override + public Result result() { + Preconditions.checkState(closed, "Cannot obtain result from unclosed task writer"); + return new BaseDeltaTaskWriteResult(dataFiles, deleteFiles, referencedDataFiles); + } + + @Override + public void close() throws IOException { + if (!closed) { + closeWriters(); + this.closed = true; + } + } + + protected abstract void closeWriters() throws IOException; + + protected void closeDataWriter(PartitionAwareFileWriter writer) throws IOException { + writer.close(); + + DataWriteResult result = writer.result(); + dataFiles.addAll(result.dataFiles()); + } + + protected void closeDeleteWriter(PartitionAwareFileWriter deleteWriter) throws IOException { + deleteWriter.close(); + + DeleteWriteResult result = deleteWriter.result(); + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + protected static class BaseDeltaTaskWriteResult implements DeltaWriter.Result { + + private final List dataFiles; + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public BaseDeltaTaskWriteResult(List dataFiles, List deleteFiles, + CharSequenceSet referencedDataFiles) { + this.dataFiles = dataFiles; + this.deleteFiles = deleteFiles; + this.referencedDataFiles = referencedDataFiles; + } + + @Override + public List dataFiles() { + return dataFiles; + } + + @Override + public List deleteFiles() { + return deleteFiles; + } + + @Override + public CharSequenceSet referencedDataFiles() { + return referencedDataFiles; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/CDCWriter.java b/core/src/main/java/org/apache/iceberg/io/CDCWriter.java new file mode 100644 index 000000000000..4e37bd409284 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/CDCWriter.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Map; +import java.util.function.Function; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.util.StructLikeMap; +import org.apache.iceberg.util.StructProjection; + +public class CDCWriter extends BaseDeltaWriter { + + private final FanoutDataWriter dataWriter; + private final PartitionAwareFileWriter equalityDeleteWriter; + private final PartitionAwareFileWriter, DeleteWriteResult> positionDeleteWriter; + private final StructProjection keyProjection; + private final Map insertedRows; + private final PositionDelete positionDelete; + private final Function toStructLike; + + public CDCWriter(FanoutDataWriter dataWriter, + PartitionAwareFileWriter equalityDeleteWriter, + PartitionAwareFileWriter, DeleteWriteResult> positionDeleteWriter, + Schema schema, Schema deleteSchema, Function toStructLike) { + this.dataWriter = dataWriter; + this.equalityDeleteWriter = equalityDeleteWriter; + this.positionDeleteWriter = positionDeleteWriter; + this.positionDelete = new PositionDelete<>(); + this.keyProjection = StructProjection.create(schema, deleteSchema); + this.insertedRows = StructLikeMap.create(deleteSchema.asStruct()); + this.toStructLike = toStructLike; + } + + @Override + public void insert(T row, PartitionSpec spec, StructLike partition) throws IOException { + CharSequence currentPath = dataWriter.currentPath(spec, partition); + long currentPosition = dataWriter.currentPosition(spec, partition); + PartitionAwarePathOffset offset = new PartitionAwarePathOffset(spec, partition, currentPath, currentPosition); + + StructLike copiedKey = StructCopy.copy(keyProjection.wrap(toStructLike.apply(row))); + + PartitionAwarePathOffset previous = insertedRows.put(copiedKey, offset); + if (previous != null) { + // TODO: attach the previous row if has a position delete row schema + positionDelete.set(previous.path(), previous.rowOffset(), null); + positionDeleteWriter.write(positionDelete, spec, partition); + } + + dataWriter.write(row, spec, partition); + } + + @Override + public void delete(T row, PartitionSpec spec, StructLike partition) throws IOException { + StructLike key = keyProjection.wrap(toStructLike.apply(row)); + PartitionAwarePathOffset previous = insertedRows.remove(key); + if (previous != null) { + // TODO: attach the previous row if has a position delete row schema + positionDelete.set(previous.path(), previous.rowOffset(), null); + positionDeleteWriter.write(positionDelete, previous.spec, previous.partition); + } + + equalityDeleteWriter.write(row, spec, partition); + } + + @Override + public void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) throws IOException { + throw new IllegalArgumentException(this.getClass().getName() + " does not implement explicit position delete"); + } + + @Override + protected void closeWriters() throws IOException { + if (dataWriter != null) { + closeDataWriter(dataWriter); + } + + if (equalityDeleteWriter != null) { + closeDeleteWriter(equalityDeleteWriter); + } + + if (positionDeleteWriter != null) { + closeDeleteWriter(positionDeleteWriter); + } + } + + private static class PartitionAwarePathOffset { + private final PartitionSpec spec; + private final StructLike partition; + private final CharSequence path; + private final long rowOffset; + + private PartitionAwarePathOffset(PartitionSpec spec, StructLike partition, CharSequence path, long rowOffset) { + this.spec = spec; + this.partition = partition; + this.path = path; + this.rowOffset = rowOffset; + } + + public PartitionSpec spec() { + return spec; + } + + public StructLike partition() { + return partition; + } + + public CharSequence path() { + return path; + } + + public long rowOffset() { + return rowOffset; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java new file mode 100644 index 000000000000..e24e14a7859d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A data writer capable of writing to multiple specs and partitions that requires the incoming records + * to be clustered by partition spec and partition. + */ +public class ClusteredDataWriter extends ClusteredFileWriter { + + private final WriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List dataFiles; + + public ClusteredDataWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.dataFiles = Lists.newArrayList(); + } + + @Override + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { + return new RollingDataWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + } + + @Override + protected void addResult(DataWriteResult result) { + dataFiles.addAll(result.dataFiles()); + } + + @Override + protected DataWriteResult aggregatedResult() { + return new DataWriteResult(dataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDeleteFileWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDeleteFileWriter.java new file mode 100644 index 000000000000..7bc014b6a521 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDeleteFileWriter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +abstract class ClusteredDeleteFileWriter extends ClusteredFileWriter { + + private final List deleteFiles = Lists.newArrayList(); + private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java new file mode 100644 index 000000000000..81770b91519f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; + +/** + * An equality delete writer capable of writing to multiple specs and partitions that requires + * the incoming delete records to be properly clustered by partition spec and partition. + */ +public class ClusteredEqualityDeleteWriter extends ClusteredDeleteFileWriter { + + private final WriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + + public ClusteredEqualityDeleteWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + } + + @Override + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { + return new RollingEqualityDeleteWriter<>( + writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredFileWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredFileWriter.java new file mode 100644 index 000000000000..0b34a78b0e00 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredFileWriter.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Set; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeSet; + +/** + * A writer capable of writing to multiple specs and partitions that requires the incoming records + * to be clustered by partition spec and partition. + *

+ * As opposed to {@link FanoutFileWriter}, this writer keeps at most one file open to reduce + * the memory consumption. + */ +abstract class ClusteredFileWriter implements PartitionAwareFileWriter { + + private final Set completedSpecs = Sets.newHashSet(); + + private PartitionSpec currentSpec = null; + private Comparator partitionComparator = null; + private Set completedPartitions = null; + private StructLike currentPartition = null; + private FileWriter currentWriter = null; + + private boolean closed = false; + + protected abstract FileWriter newWriter(PartitionSpec spec, StructLike partition); + + protected abstract void addResult(R result); + + protected abstract R aggregatedResult(); + + @Override + public void write(T row, PartitionSpec spec, StructLike partition) throws IOException { + if (!spec.equals(currentSpec)) { + if (currentSpec != null) { + closeCurrent(); + completedSpecs.add(currentSpec.specId()); + completedPartitions.clear(); + } + + if (completedSpecs.contains(spec.specId())) { + throw new IllegalStateException("Already closed files for spec: " + spec.specId()); + } + + Types.StructType partitionType = spec.partitionType(); + + currentSpec = spec; + partitionComparator = Comparators.forType(partitionType); + completedPartitions = StructLikeSet.create(partitionType); + // copy the partition key as the key object is reused + currentPartition = partition != null ? StructCopy.copy(partition) : null; + currentWriter = newWriter(currentSpec, currentPartition); + + } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) { + closeCurrent(); + completedPartitions.add(currentPartition); + + if (completedPartitions.contains(partition)) { + String path = spec.partitionToPath(partition); + throw new IllegalStateException("Already closed files for partition: " + path); + } + + // copy the partition key as the key object is reused + currentPartition = partition != null ? StructCopy.copy(partition) : null; + currentWriter = newWriter(currentSpec, currentPartition); + } + + currentWriter.write(row); + } + + @Override + public void close() throws IOException { + if (!closed) { + closeCurrent(); + this.closed = true; + } + } + + private void closeCurrent() throws IOException { + if (currentWriter != null) { + currentWriter.close(); + + R result = currentWriter.result(); + addResult(result); + + this.currentWriter = null; + } + } + + @Override + public final R result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return aggregatedResult(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java new file mode 100644 index 000000000000..3856a13f2029 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; + +/** + * A position delete writer capable of writing to multiple specs and partitions that requires + * the incoming delete records to be properly clustered by partition spec and partition. + */ +public class ClusteredPositionDeleteWriter extends ClusteredDeleteFileWriter> { + + private final WriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + + public ClusteredPositionDeleteWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + } + + @Override + protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + return new RollingPositionDeleteWriter<>( + writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriteResult.java b/core/src/main/java/org/apache/iceberg/io/DataWriteResult.java new file mode 100644 index 000000000000..5b9d56c2a538 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DataWriteResult.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.DataFile; + +public class DataWriteResult { + private final List dataFiles; + + public DataWriteResult(DataFile dataFile) { + this.dataFiles = Collections.singletonList(dataFile); + } + + public DataWriteResult(List dataFiles) { + this.dataFiles = dataFiles; + } + + public List dataFiles() { + return dataFiles; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java index 113557a26434..b545909f1527 100644 --- a/core/src/main/java/org/apache/iceberg/io/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.io; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.DataFile; @@ -31,7 +30,7 @@ import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class DataWriter implements Closeable { +public class DataWriter implements FileWriter { private final FileAppender appender; private final FileFormat format; private final String location; @@ -57,6 +56,12 @@ public DataWriter(FileAppender appender, FileFormat format, String location, this.sortOrder = sortOrder; } + @Override + public void write(T row) throws IOException { + appender.add(row); + } + + @Deprecated public void add(T row) { appender.add(row); } @@ -86,4 +91,9 @@ public DataFile toDataFile() { Preconditions.checkState(dataFile != null, "Cannot create data file from unclosed writer"); return dataFile; } + + @Override + public DataWriteResult result() { + return new DataWriteResult(toDataFile()); + } } diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java new file mode 100644 index 000000000000..275a49e39f8d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.util.CharSequenceSet; + +public class DeleteWriteResult { + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public DeleteWriteResult(DeleteFile deleteFile) { + this.deleteFiles = Collections.singletonList(deleteFile); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + public DeleteWriteResult(DeleteFile deleteFile, CharSequenceSet referencedDataFiles) { + this.deleteFiles = Collections.singletonList(deleteFile); + this.referencedDataFiles = referencedDataFiles; + } + + public DeleteWriteResult(List deleteFiles) { + this.deleteFiles = deleteFiles; + this.referencedDataFiles = CharSequenceSet.empty(); + } + + public DeleteWriteResult(List deleteFiles, CharSequenceSet referencedDataFiles) { + this.deleteFiles = deleteFiles; + this.referencedDataFiles = referencedDataFiles; + } + + public List deleteFiles() { + return deleteFiles; + } + + public CharSequenceSet referencedDataFiles() { + return referencedDataFiles; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/DeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/DeltaWriter.java new file mode 100644 index 000000000000..125b32bc4f94 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DeltaWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.util.CharSequenceSet; + +public interface DeltaWriter extends Closeable { + + // insert + void insert(T row, PartitionSpec spec, StructLike partition) throws IOException; + + // equality delete + void delete(T row, PartitionSpec spec, StructLike partition) throws IOException; + + // position delete with persisting row + void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) throws IOException; + + // position delete without persisting row + default void delete(CharSequence path, long pos, PartitionSpec spec, StructLike partition) throws IOException { + delete(path, pos, null, spec, partition); + } + + Result result(); + + interface Result extends Serializable { + List dataFiles(); + List deleteFiles(); + CharSequenceSet referencedDataFiles(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java new file mode 100644 index 000000000000..5595f1429e48 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A data writer capable of writing to multiple specs and partitions that keeps data writers for each + * seen spec/partition pair open until this writer is closed. + */ +public class FanoutDataWriter extends FanoutFileWriter { + + private final WriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List dataFiles; + + public FanoutDataWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.dataFiles = Lists.newArrayList(); + } + + @Override + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { + return new RollingDataWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + } + + @Override + protected void addResult(DataWriteResult result) { + dataFiles.addAll(result.dataFiles()); + } + + @Override + protected DataWriteResult aggregatedResult() { + return new DataWriteResult(dataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDeleteFileWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDeleteFileWriter.java new file mode 100644 index 000000000000..a8a22b19aa76 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutDeleteFileWriter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +abstract class FanoutDeleteFileWriter extends FanoutFileWriter { + + private final List deleteFiles = Lists.newArrayList(); + private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutFileWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutFileWriter.java new file mode 100644 index 000000000000..f3cb80127d36 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutFileWriter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.StructLikeMap; + +/** + * A writer capable of writing to multiple specs and partitions that keeps files for each + * seen spec/partition pair open until this writer is closed. + *

+ * As opposed to {@link ClusteredFileWriter}, this writer does not requite the incoming records + * to be clustered by partition spec and partition. The downside is potentially larger memory + * consumption. + */ +abstract class FanoutFileWriter implements PartitionAwareFileWriter { + + private final Map>> writers = Maps.newHashMap(); + private boolean closed = false; + + protected abstract FileWriter newWriter(PartitionSpec spec, StructLike partition); + + protected abstract void addResult(R result); + + protected abstract R aggregatedResult(); + + public CharSequence currentPath(PartitionSpec spec, StructLike partition) { + return ((RollingFileWriter) writer(spec, partition)).currentPath(); + } + + public long currentPosition(PartitionSpec spec, StructLike partition) { + return ((RollingFileWriter) writer(spec, partition)).currentRows(); + } + + @Override + public void write(T row, PartitionSpec spec, StructLike partition) throws IOException { + FileWriter writer = writer(spec, partition); + writer.write(row); + } + + private FileWriter writer(PartitionSpec spec, StructLike partition) { + Map> specWriters = writers.computeIfAbsent( + spec.specId(), + id -> StructLikeMap.create(spec.partitionType())); + FileWriter writer = specWriters.get(partition); + + if (writer == null) { + // copy the partition key as the key object is reused + StructLike copiedPartition = partition != null ? StructCopy.copy(partition) : null; + writer = newWriter(spec, copiedPartition); + specWriters.put(copiedPartition, writer); + } + + return writer; + } + + @Override + public void close() throws IOException { + if (!closed) { + closeWriters(); + this.closed = true; + } + } + + private void closeWriters() throws IOException { + for (Map> specWriters : writers.values()) { + for (FileWriter writer : specWriters.values()) { + writer.close(); + + R result = writer.result(); + addResult(result); + } + + specWriters.clear(); + } + + writers.clear(); + } + + @Override + public final R result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return aggregatedResult(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutSortedPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutSortedPositionDeleteWriter.java new file mode 100644 index 000000000000..f6ac1fd6f004 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutSortedPositionDeleteWriter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; + +public class FanoutSortedPositionDeleteWriter extends FanoutDeleteFileWriter> { + private final FileAppenderFactory appenderFactory; + private final OutputFileFactory fileFactory; + private final FileFormat fileFormat; + + public FanoutSortedPositionDeleteWriter(FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileFormat fileFormat) { + this.appenderFactory = appenderFactory; + this.fileFactory = fileFactory; + this.fileFormat = fileFormat; + } + + @Override + protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + return new SortedPosDeleteWriter<>(appenderFactory, fileFactory, fileFormat, partition); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FileWriter.java b/core/src/main/java/org/apache/iceberg/io/FileWriter.java new file mode 100644 index 000000000000..ffdd400e8bd0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FileWriter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; + +/** + * A writer capable of writing files of a single type (i.e. data/delete) to one spec/partition. + *

+ * As opposed to {@link FileAppender}, this interface should be implemented by classes that not only + * append records to files but actually produce {@link DataFile}s or {@link DeleteFile}s objects + * with Iceberg metadata. Implementations may wrap {@link FileAppender}s with extra information + * such as spec, partition, sort order ID needed to construct {@link DataFile}s or {@link DeleteFile}s. + */ +public interface FileWriter extends Closeable { + + /** + * Writes rows to a predefined spec/partition. + * + * @param rows data or delete records + * @throws IOException in case of an error during the write process + */ + default void write(Iterable rows) throws IOException { + for (T row : rows) { + write(row); + } + } + + /** + * Writes a row to a predefined spec/partition. + * + * @param row a data or delete record + * @throws IOException in case of an error during the write process + */ + void write(T row) throws IOException; + + /** + * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. + * The result is valid only after the writer is closed. + * + * @return the file writer result + */ + R result(); +} diff --git a/core/src/main/java/org/apache/iceberg/io/MixedDeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/MixedDeltaWriter.java new file mode 100644 index 000000000000..cf8199ab2d8e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/MixedDeltaWriter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; + +public class MixedDeltaWriter extends BaseDeltaWriter { + + private final PartitionAwareFileWriter dataWriter; + private final PartitionAwareFileWriter equalityDeleteWriter; + private final PartitionAwareFileWriter, DeleteWriteResult> positionDeleteWriter; + private final PositionDelete positionDelete; + + public MixedDeltaWriter(PartitionAwareFileWriter dataWriter, + PartitionAwareFileWriter equalityDeleteWriter, + PartitionAwareFileWriter, DeleteWriteResult> positionDeleteWriter) { + this.dataWriter = dataWriter; + this.equalityDeleteWriter = equalityDeleteWriter; + this.positionDeleteWriter = positionDeleteWriter; + this.positionDelete = new PositionDelete<>(); + } + + @Override + public void insert(T row, PartitionSpec spec, StructLike partition) throws IOException { + dataWriter.write(row, spec, partition); + } + + @Override + public void delete(T row, PartitionSpec spec, StructLike partition) throws IOException { + equalityDeleteWriter.write(row, spec, partition); + } + + @Override + public void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) throws IOException { + positionDelete.set(path, pos, row); + positionDeleteWriter.write(positionDelete, spec, partition); + } + + @Override + protected void closeWriters() throws IOException { + if (dataWriter != null) { + closeDataWriter(dataWriter); + } + + if (equalityDeleteWriter != null) { + closeDeleteWriter(equalityDeleteWriter); + } + + if (positionDeleteWriter != null) { + closeDeleteWriter(positionDeleteWriter); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/PartitionAwareFileWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitionAwareFileWriter.java new file mode 100644 index 000000000000..99f1d20057c1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/PartitionAwareFileWriter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; + +/** + * A writer capable of writing files of a single type (i.e. data/delete) to multiple specs and partitions. + *

+ * As opposed to {@link FileWriter}, this interface should be implemented by writers that are not + * limited to writing to a single spec/partition. Implementations may under the hood use other + * {@link FileWriter}s for writing to a single spec/partition. + */ +public interface PartitionAwareFileWriter extends Closeable { + + /** + * Writes a row to the provided spec/partition. + * + * @param row a data or delete record + * @param spec a partition spec + * @param partition a partition or null if the spec is unpartitioned + * @throws IOException in case of an error during the write process + */ + void write(T row, PartitionSpec spec, StructLike partition) throws IOException; + + /** + * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. + * The result is valid only after the writer is closed. + * + * @return the file writer result + */ + R result(); +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java new file mode 100644 index 000000000000..c68dadc773ff --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A rolling data writer that splits incoming data into multiple files within one spec/partition. + */ +public class RollingDataWriter extends RollingFileWriter, DataWriteResult> { + + private final WriterFactory writerFactory; + private final List dataFiles; + + public RollingDataWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes, + PartitionSpec spec, StructLike partition) { + super(fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + this.writerFactory = writerFactory; + this.dataFiles = Lists.newArrayList(); + openCurrent(); + } + + @Override + protected DataWriter newWriter(EncryptedOutputFile file) { + return writerFactory.newDataWriter(file, spec(), partition()); + } + + @Override + protected long length(DataWriter writer) { + return writer.length(); + } + + @Override + protected void addResult(DataWriteResult result) { + dataFiles.addAll(result.dataFiles()); + } + + @Override + protected DataWriteResult aggregatedResult() { + return new DataWriteResult(dataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingDeleteWriter.java new file mode 100644 index 000000000000..3a939a9093cf --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingDeleteWriter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +abstract class RollingDeleteWriter> + extends RollingFileWriter { + + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + protected RollingDeleteWriter(OutputFileFactory fileFactory, FileIO io, FileFormat fileFormat, + long targetFileSizeInBytes, PartitionSpec spec, StructLike partition) { + super(fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + this.deleteFiles = Lists.newArrayList(); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java new file mode 100644 index 000000000000..3236b5741839 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; + +/** + * A rolling equality delete writer that splits incoming deletes into multiple files within one spec/partition. + */ +public class RollingEqualityDeleteWriter extends RollingDeleteWriter> { + + private final WriterFactory writerFactory; + + public RollingEqualityDeleteWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes, + PartitionSpec spec, StructLike partition) { + super(fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + this.writerFactory = writerFactory; + openCurrent(); + } + + @Override + protected EqualityDeleteWriter newWriter(EncryptedOutputFile file) { + return writerFactory.newEqualityDeleteWriter(file, spec(), partition()); + } + + @Override + protected long length(EqualityDeleteWriter writer) { + return writer.length(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java new file mode 100644 index 000000000000..b9134295e42c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition. + */ +abstract class RollingFileWriter, R> implements FileWriter { + private static final int ROWS_DIVISOR = 1000; + + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final PartitionSpec spec; + private final StructLike partition; + + private EncryptedOutputFile currentFile = null; + private W currentWriter = null; + private long currentRows = 0; + + private boolean closed = false; + + protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, FileFormat fileFormat, + long targetFileSizeInBytes, PartitionSpec spec, StructLike partition) { + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.spec = spec; + this.partition = partition; + } + + protected abstract W newWriter(EncryptedOutputFile file); + + protected abstract long length(W writer); + + protected abstract void addResult(R result); + + protected abstract R aggregatedResult(); + + protected PartitionSpec spec() { + return spec; + } + + protected StructLike partition() { + return partition; + } + + @Override + public void write(T row) throws IOException { + currentWriter.write(row); + currentRows++; + + if (shouldRollToNewFile()) { + closeCurrent(); + openCurrent(); + } + } + + public CharSequence currentPath() { + Preconditions.checkNotNull(currentFile, "The currentFile shouldn't be null"); + return currentFile.encryptingOutputFile().location(); + } + + public long currentRows() { + return currentRows; + } + + protected void openCurrent() { + if (partition == null) { + this.currentFile = fileFactory.newOutputFile(); + } else { + this.currentFile = fileFactory.newOutputFile(partition); + } + this.currentWriter = newWriter(currentFile); + this.currentRows = 0; + } + + private boolean shouldRollToNewFile() { + // TODO: ORC file now not support target file size before closed + return !fileFormat.equals(FileFormat.ORC) && + currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSizeInBytes; + } + + private void closeCurrent() throws IOException { + if (currentWriter != null) { + currentWriter.close(); + + if (currentRows == 0L) { + io.deleteFile(currentFile.encryptingOutputFile()); + } else { + R result = currentWriter.result(); + addResult(result); + } + + this.currentFile = null; + this.currentWriter = null; + this.currentRows = 0; + } + } + + @Override + public void close() throws IOException { + if (!closed) { + closeCurrent(); + this.closed = true; + } + } + + @Override + public final R result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return aggregatedResult(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java new file mode 100644 index 000000000000..324a8935aef7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; + +/** + * A rolling position delete writer that splits incoming deletes into multiple files within one spec/partition. + */ +public class RollingPositionDeleteWriter extends RollingDeleteWriter, PositionDeleteWriter> { + + private final WriterFactory writerFactory; + + public RollingPositionDeleteWriter(WriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes, + PartitionSpec spec, StructLike partition) { + super(fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition); + this.writerFactory = writerFactory; + openCurrent(); + } + + @Override + protected PositionDeleteWriter newWriter(EncryptedOutputFile file) { + return writerFactory.newPositionDeleteWriter(file, spec(), partition()); + } + + @Override + protected long length(PositionDeleteWriter writer) { + return writer.length(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index 0da20579a62a..8187a54ddb46 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -19,7 +19,6 @@ package org.apache.iceberg.io; -import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Comparator; @@ -28,15 +27,17 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.CharSequenceWrapper; -class SortedPosDeleteWriter implements Closeable { +class SortedPosDeleteWriter implements FileWriter, DeleteWriteResult> { private static final long DEFAULT_RECORDS_NUM_THRESHOLD = 100_000L; private final Map>> posDeletes = Maps.newHashMap(); @@ -51,6 +52,7 @@ class SortedPosDeleteWriter implements Closeable { private final long recordsNumThreshold; private int records = 0; + private boolean closed = false; SortedPosDeleteWriter(FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, @@ -71,6 +73,17 @@ class SortedPosDeleteWriter implements Closeable { this(appenderFactory, fileFactory, format, partition, DEFAULT_RECORDS_NUM_THRESHOLD); } + @Override + public void write(PositionDelete payload) throws IOException { + delete(payload.path(), payload.pos(), payload.row()); + } + + @Override + public DeleteWriteResult result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer"); + return new DeleteWriteResult(completedFiles, referencedDataFiles); + } + public void delete(CharSequence path, long pos) { delete(path, pos, null); } @@ -103,7 +116,10 @@ public CharSequenceSet referencedDataFiles() { @Override public void close() throws IOException { - flushDeletes(); + if (!closed) { + flushDeletes(); + this.closed = true; + } } private void flushDeletes() { diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index a3da366768de..cbed07f48d11 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -29,11 +29,13 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IsolationLevel; import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.Schema; @@ -45,9 +47,12 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.ClusteredDataWriter; +import org.apache.iceberg.io.DataWriteResult; +import org.apache.iceberg.io.FanoutDataWriter; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.io.PartitionAwareFileWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -531,68 +536,126 @@ public DataWriter createWriter(int partitionId, long taskId) { @Override public DataWriter createWriter(int partitionId, long taskId, long epochId) { Table table = tableBroadcast.value(); - - OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(format).build(); - SparkAppenderFactory appenderFactory = SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build(); - PartitionSpec spec = table.spec(); FileIO io = table.io(); + OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .build(); + SparkWriterFactory writerFactory = SparkWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .build(); + if (spec.isUnpartitioned()) { - return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize); + ClusteredDataWriter dataWriter = new ClusteredDataWriter<>( + writerFactory, fileFactory, io, + format, targetFileSize); + return new UnpartitionedDataWriter(dataWriter, io, spec); + } else if (partitionedFanoutEnabled) { - return new PartitionedFanout3Writer( - spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema); + FanoutDataWriter dataWriter = new FanoutDataWriter<>( + writerFactory, fileFactory, io, + format, targetFileSize); + return new PartitionedDataWriter(dataWriter, io, spec, writeSchema, dsSchema); + } else { - return new Partitioned3Writer( - spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema); + ClusteredDataWriter dataWriter = new ClusteredDataWriter<>( + writerFactory, fileFactory, io, + format, targetFileSize); + return new PartitionedDataWriter(dataWriter, io, spec, writeSchema, dsSchema); } } } - private static class Unpartitioned3Writer extends UnpartitionedWriter - implements DataWriter { - Unpartitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + private static > void cleanFiles(FileIO io, List files) { + Tasks.foreach(files) + .suppressFailureWhenFinished() + .noRetry() + .run(file -> io.deleteFile(file.path().toString())); + } + + private static class UnpartitionedDataWriter implements DataWriter { + private final PartitionAwareFileWriter delegate; + private final FileIO io; + private final PartitionSpec spec; + + private UnpartitionedDataWriter(PartitionAwareFileWriter delegate, + FileIO io, PartitionSpec spec) { + this.delegate = delegate; + this.io = io; + this.spec = spec; + } + + @Override + public void write(InternalRow record) throws IOException { + delegate.write(record, spec, null); } @Override public WriterCommitMessage commit() throws IOException { - this.close(); + close(); - return new TaskCommit(dataFiles()); + DataWriteResult result = delegate.result(); + return new TaskCommit(result.dataFiles().toArray(new DataFile[0])); } - } - private static class Partitioned3Writer extends SparkPartitionedWriter implements DataWriter { - Partitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, - Schema schema, StructType sparkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema); + @Override + public void abort() throws IOException { + close(); + + DataWriteResult result = delegate.result(); + cleanFiles(io, result.dataFiles()); } @Override - public WriterCommitMessage commit() throws IOException { - this.close(); - - return new TaskCommit(dataFiles()); + public void close() throws IOException { + delegate.close(); } } - private static class PartitionedFanout3Writer extends SparkPartitionedFanoutWriter - implements DataWriter { - PartitionedFanout3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, - Schema schema, StructType sparkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema); + private static class PartitionedDataWriter implements DataWriter { + private final PartitionAwareFileWriter delegate; + private final FileIO io; + private final PartitionSpec spec; + private final PartitionKey partitionKey; + private final InternalRowWrapper internalRowWrapper; + + private PartitionedDataWriter(PartitionAwareFileWriter delegate, + FileIO io, PartitionSpec spec, Schema dataSchema, StructType dataSparkType) { + this.delegate = delegate; + this.io = io; + this.spec = spec; + this.partitionKey = new PartitionKey(spec, dataSchema); + this.internalRowWrapper = new InternalRowWrapper(dataSparkType); + } + + @Override + public void write(InternalRow row) throws IOException { + partitionKey.partition(internalRowWrapper.wrap(row)); + delegate.write(row, spec, partitionKey); } @Override public WriterCommitMessage commit() throws IOException { - this.close(); + close(); - return new TaskCommit(dataFiles()); + DataWriteResult result = delegate.result(); + return new TaskCommit(result.dataFiles().toArray(new DataFile[0])); + } + + @Override + public void abort() throws IOException { + close(); + + DataWriteResult result = delegate.result(); + cleanFiles(io, result.dataFiles()); + } + + @Override + public void close() throws IOException { + delegate.close(); } } }