From dcdaccf3ce5c566b58656cdb535f05e4a2ea880a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BA=B7ng=20Minh=20D=C5=A9ng?= Date: Fri, 11 Feb 2022 10:12:56 +0700 Subject: [PATCH 01/10] Core: add `FanoutEqualityDeleteWriter` and `FanoutPositionDeleteWriter` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Đặng Minh Dũng --- .../io/FanoutEqualityDeleteWriter.java | 76 ++++++++++++++++++ .../io/FanoutPositionDeleteWriter.java | 79 +++++++++++++++++++ 2 files changed, 155 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java create mode 100644 core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java new file mode 100644 index 000000000000..fd3f6326b485 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * An equality delete writer capable of writing to multiple specs and partitions that keeps + * delete writers for each seen spec/partition pair open until this writer is closed. + */ +public class FanoutEqualityDeleteWriter extends FanoutWriter { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List deleteFiles; + + public FanoutEqualityDeleteWriter( + FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.deleteFiles = Lists.newArrayList(); + } + + @Override + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { + // TODO: support ORC rolling writers + if (fileFormat == FileFormat.ORC) { + EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); + return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition); + } else { + return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + } + + @Override + protected void addResult(DeleteWriteResult result) { + Preconditions.checkArgument(!result.referencesDataFiles(), "Equality deletes cannot reference data files"); + deleteFiles.addAll(result.deleteFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java new file mode 100644 index 000000000000..5a85baf7051a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +/** + * A position delete writer capable of writing to multiple specs and partitions that keeps + * delete writers for each seen spec/partition pair open until this writer is closed. + */ +public class FanoutPositionDeleteWriter extends FanoutWriter, DeleteWriteResult> { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public FanoutPositionDeleteWriter( + FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.deleteFiles = Lists.newArrayList(); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + @Override + protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + // TODO: support ORC rolling writers + if (fileFormat == FileFormat.ORC) { + EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); + return writerFactory.newPositionDeleteWriter(outputFile, spec, partition); + } else { + return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + } + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} From 6e6b519df87b23d8a4cefc35f641513f974e210e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BA=B7ng=20Minh=20D=C5=A9ng?= Date: Tue, 15 Feb 2022 21:44:55 +0700 Subject: [PATCH 02/10] Core: make `StructCopy` public MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Đặng Minh Dũng --- core/src/main/java/org/apache/iceberg/io/StructCopy.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/StructCopy.java b/core/src/main/java/org/apache/iceberg/io/StructCopy.java index 7d4373348ccb..c475b2986ae5 100644 --- a/core/src/main/java/org/apache/iceberg/io/StructCopy.java +++ b/core/src/main/java/org/apache/iceberg/io/StructCopy.java @@ -24,8 +24,8 @@ /** * Copy the StructLike's values into a new one. It does not handle list or map values now. */ -class StructCopy implements StructLike { - static StructLike copy(StructLike struct) { +public class StructCopy implements StructLike { + public static StructLike copy(StructLike struct) { return struct != null ? new StructCopy(struct) : null; } From 6747c61d8548da169092a25c75d09ae32c3142d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BA=B7ng=20Minh=20D=C5=A9ng?= Date: Tue, 15 Feb 2022 22:16:13 +0700 Subject: [PATCH 03/10] Core: make `FileWriter.write` return `PathOffset` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Đặng Minh Dũng --- .../iceberg/deletes/EqualityDeleteWriter.java | 6 +- .../iceberg/deletes/PositionDeleteWriter.java | 6 +- .../org/apache/iceberg/io/DataWriter.java | 5 +- .../org/apache/iceberg/io/FileWriter.java | 3 +- .../org/apache/iceberg/io/PathOffset.java | 78 +++++++++++++++++++ .../apache/iceberg/io/RollingFileWriter.java | 5 +- .../iceberg/io/SortedPosDeleteWriter.java | 3 +- 7 files changed, 99 insertions(+), 7 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/io/PathOffset.java diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java index c914ad224f30..bea416014684 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java @@ -31,6 +31,7 @@ import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileWriter; +import org.apache.iceberg.io.PathOffset; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class EqualityDeleteWriter implements FileWriter { @@ -43,6 +44,7 @@ public class EqualityDeleteWriter implements FileWriter private final int[] equalityFieldIds; private final SortOrder sortOrder; private DeleteFile deleteFile = null; + private long recordCount = 0; public EqualityDeleteWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata, @@ -58,8 +60,10 @@ public EqualityDeleteWriter(FileAppender appender, FileFormat format, String } @Override - public void write(T row) { + public PathOffset write(T row) { appender.add(row); + long offset = recordCount++; + return PathOffset.of(location, offset); } /** diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index a7dff07e7105..08fd0f8989ab 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -30,6 +30,7 @@ import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileWriter; +import org.apache.iceberg.io.PathOffset; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.CharSequenceSet; @@ -43,6 +44,7 @@ public class PositionDeleteWriter implements FileWriter, De private final PositionDelete delete; private final CharSequenceSet referencedDataFiles; private DeleteFile deleteFile = null; + private long recordCount = 0; public PositionDeleteWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { @@ -57,9 +59,11 @@ public PositionDeleteWriter(FileAppender appender, FileFormat format } @Override - public void write(PositionDelete positionDelete) { + public PathOffset write(PositionDelete positionDelete) { referencedDataFiles.add(positionDelete.path()); appender.add(positionDelete); + long offset = recordCount++; + return PathOffset.of(location, offset); } /** diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java index 090ccebfa80f..b7c4c62c8b3f 100644 --- a/core/src/main/java/org/apache/iceberg/io/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -39,6 +39,7 @@ public class DataWriter implements FileWriter { private final ByteBuffer keyMetadata; private final SortOrder sortOrder; private DataFile dataFile = null; + private long recordCount = 0; public DataWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { @@ -57,8 +58,10 @@ public DataWriter(FileAppender appender, FileFormat format, String location, } @Override - public void write(T row) { + public PathOffset write(T row) { appender.add(row); + long offset = recordCount++; + return PathOffset.of(location, offset); } /** diff --git a/core/src/main/java/org/apache/iceberg/io/FileWriter.java b/core/src/main/java/org/apache/iceberg/io/FileWriter.java index 6f0c4ab2194a..261d582a9a4d 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FileWriter.java @@ -51,8 +51,9 @@ default void write(Iterable rows) { * Writes a row to a predefined spec/partition. * * @param row a data or delete record + * @return PathOffset of written row */ - void write(T row); + PathOffset write(T row); /** * Returns the number of bytes that were currently written by this writer. diff --git a/core/src/main/java/org/apache/iceberg/io/PathOffset.java b/core/src/main/java/org/apache/iceberg/io/PathOffset.java new file mode 100644 index 000000000000..ffae56ddee15 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/PathOffset.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.JavaHashes; + +public final class PathOffset { + private final CharSequence path; + private final long offset; + + private PathOffset(CharSequence path, long offset) { + this.path = path; + this.offset = offset; + } + + public static PathOffset of(CharSequence path, long offset) { + return new PathOffset(path, offset); + } + + public CharSequence path() { + return path; + } + + public long offset() { + return offset; + } + + public PositionDelete setTo(PositionDelete positionDelete) { + return positionDelete.set(path, offset, null); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("path", path) + .add("offset", offset) + .toString(); + } + + @Override + public int hashCode() { + return (int) (31 * offset + JavaHashes.hashCode(path)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + PathOffset that = (PathOffset) o; + return this.offset == that.offset && + Comparators.charSequences().compare(this.path, that.path) == 0; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java index 80a589b18095..207def30283b 100644 --- a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java @@ -82,14 +82,15 @@ public long length() { } @Override - public void write(T row) { - currentWriter.write(row); + public PathOffset write(T row) { + PathOffset pathOffset = currentWriter.write(row); currentFileRows++; if (shouldRollToNewFile()) { closeCurrentWriter(); openCurrentWriter(); } + return pathOffset; } private boolean shouldRollToNewFile() { diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index 36a0313a4e41..692de183306c 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -79,8 +79,9 @@ public long length() { } @Override - public void write(PositionDelete payload) { + public PathOffset write(PositionDelete payload) { delete(payload.path(), payload.pos(), payload.row()); + return null; } public void delete(CharSequence path, long pos) { From 343272316735e294d403317ce91106b09668ee35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BA=B7ng=20Minh=20D=C5=A9ng?= Date: Tue, 15 Feb 2022 22:21:37 +0700 Subject: [PATCH 04/10] Core: make `PartitioningWriter.write` return `PathOffset` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Đặng Minh Dũng --- .../main/java/org/apache/iceberg/io/ClusteredWriter.java | 4 ++-- core/src/main/java/org/apache/iceberg/io/FanoutWriter.java | 7 ++++--- .../java/org/apache/iceberg/io/PartitioningWriter.java | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java index 61a6f9f9164d..5fd3abc64e70 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java @@ -64,7 +64,7 @@ abstract class ClusteredWriter implements PartitioningWriter { protected abstract R aggregatedResult(); @Override - public void write(T row, PartitionSpec spec, StructLike partition) { + public PathOffset write(T row, PartitionSpec spec, StructLike partition) { if (!spec.equals(currentSpec)) { if (currentSpec != null) { closeCurrentWriter(); @@ -100,7 +100,7 @@ public void write(T row, PartitionSpec spec, StructLike partition) { this.currentWriter = newWriter(currentSpec, currentPartition); } - currentWriter.write(row); + return currentWriter.write(row); } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java index 631fc0a6d4ea..f4d0a9158d92 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java @@ -49,9 +49,9 @@ abstract class FanoutWriter implements PartitioningWriter { protected abstract R aggregatedResult(); @Override - public void write(T row, PartitionSpec spec, StructLike partition) { + public PathOffset write(T row, PartitionSpec spec, StructLike partition) { FileWriter writer = writer(spec, partition); - writer.write(row); + return writer.write(row); } private FileWriter writer(PartitionSpec spec, StructLike partition) { @@ -98,7 +98,8 @@ public final R result() { } protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) { - Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, "Partition must not be null when creating output file for partitioned spec"); return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition); } diff --git a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java index 4afdd2162f8b..c1be23879ead 100644 --- a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java @@ -46,7 +46,7 @@ public interface PartitioningWriter extends Closeable { * @param spec a partition spec * @param partition a partition or null if the spec is unpartitioned */ - void write(T row, PartitionSpec spec, StructLike partition); + PathOffset write(T row, PartitionSpec spec, StructLike partition); /** * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. From 78ff4a0d08dd2801587f83b16fd8688078d014e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BA=B7ng=20Minh=20D=C5=A9ng?= Date: Tue, 15 Feb 2022 22:39:07 +0700 Subject: [PATCH 05/10] Core: implement `PartitioningWriterFactory` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Đặng Minh Dũng --- .../io/DefaultPartitioningWriterFactory.java | 69 ++++++++++++++ .../iceberg/io/PartitioningWriterFactory.java | 91 +++++++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/io/DefaultPartitioningWriterFactory.java create mode 100644 core/src/main/java/org/apache/iceberg/io/PartitioningWriterFactory.java diff --git a/core/src/main/java/org/apache/iceberg/io/DefaultPartitioningWriterFactory.java b/core/src/main/java/org/apache/iceberg/io/DefaultPartitioningWriterFactory.java new file mode 100644 index 000000000000..9639babd52c0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DefaultPartitioningWriterFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.deletes.PositionDelete; + +class DefaultPartitioningWriterFactory implements PartitioningWriterFactory { + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final Type type; + + DefaultPartitioningWriterFactory( + FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes, Type type) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.type = type; + } + + @Override + public PartitioningWriter newDataWriter() { + return type == Type.CLUSTERED ? + new ClusteredDataWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes) : + new FanoutDataWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes); + } + + @Override + public PartitioningWriter newEqualityDeleteWriter() { + return type == Type.CLUSTERED ? + new ClusteredEqualityDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes) : + new FanoutEqualityDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes); + } + + @Override + public PartitioningWriter, DeleteWriteResult> newPositionDeleteWriter() { + return type == Type.CLUSTERED ? + new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes) : + new FanoutPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes); + } + + enum Type { + CLUSTERED, + FANOUT, + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/PartitioningWriterFactory.java b/core/src/main/java/org/apache/iceberg/io/PartitioningWriterFactory.java new file mode 100644 index 000000000000..4c0081d57f1f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/PartitioningWriterFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.DefaultPartitioningWriterFactory.Type; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Factory to create a new {@link PartitioningWriter} + */ +public interface PartitioningWriterFactory { + PartitioningWriter newDataWriter(); + + PartitioningWriter newEqualityDeleteWriter(); + + PartitioningWriter, DeleteWriteResult> newPositionDeleteWriter(); + + static Builder builder(FileWriterFactory writerFactory) { + return new Builder<>(writerFactory); + } + + class Builder { + private final FileWriterFactory writerFactory; + private OutputFileFactory fileFactory = null; + private FileIO io = null; + private FileFormat fileFormat = null; + private long targetFileSizeInBytes = 0; + + public Builder(FileWriterFactory writerFactory) { + this.writerFactory = writerFactory; + } + + private void checkArguments() { + Preconditions.checkArgument(writerFactory != null, "writerFactory is required non-null"); + Preconditions.checkArgument(fileFactory != null, "fileFactory is required non-null"); + Preconditions.checkArgument(io != null, "io is required non-null"); + Preconditions.checkArgument(fileFormat != null, "fileFormat is required non-null"); + } + + public PartitioningWriterFactory buildForClusteredPartition() { + checkArguments(); + return new DefaultPartitioningWriterFactory<>(writerFactory, fileFactory, io, fileFormat, + targetFileSizeInBytes, Type.CLUSTERED); + } + + public PartitioningWriterFactory buildForFanoutPartition() { + checkArguments(); + return new DefaultPartitioningWriterFactory<>(writerFactory, fileFactory, io, fileFormat, + targetFileSizeInBytes, Type.FANOUT); + } + + public Builder fileFactory(OutputFileFactory newFileFactory) { + this.fileFactory = newFileFactory; + return this; + } + + public Builder io(FileIO newFileIO) { + this.io = newFileIO; + return this; + } + + public Builder fileFormat(FileFormat newFileFormat) { + this.fileFormat = newFileFormat; + return this; + } + + public Builder targetFileSizeInBytes(long newTargetFileSizeInBytes) { + this.targetFileSizeInBytes = newTargetFileSizeInBytes; + return this; + } + } +} From f2b60cf4236e776ba2c83eb4be2928bfa8cec299 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BA=B7ng=20Minh=20D=C5=A9ng?= Date: Tue, 15 Feb 2022 22:31:48 +0700 Subject: [PATCH 06/10] Core: implement `DirectTaskWriter` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Đặng Minh Dũng --- .../apache/iceberg/io/DirectTaskWriter.java | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/io/DirectTaskWriter.java diff --git a/core/src/main/java/org/apache/iceberg/io/DirectTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/DirectTaskWriter.java new file mode 100644 index 000000000000..3e6e4f5c5a2d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/DirectTaskWriter.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.function.Function; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.util.Tasks; + +public class DirectTaskWriter implements TaskWriter { + @SuppressWarnings("rawtypes") + private static final Function UNPARTITION = s -> null; + + @SuppressWarnings("unchecked") + public static Function unpartition() { + return UNPARTITION; + } + + private final PartitioningWriter writer; + private final Function partitioner; + private final PartitionSpec spec; + private final FileIO io; + + public DirectTaskWriter( + PartitioningWriterFactory partitioningWriterFactory, + Function partitioner, PartitionSpec spec, + FileIO io) { + this.writer = partitioningWriterFactory.newDataWriter(); + this.partitioner = partitioner; + this.spec = spec; + this.io = io; + } + + @Override + public void write(T row) throws IOException { + StructLike partition = partitioner.apply(row); + writer.write(row, spec, partition); + } + + @Override + public void abort() throws IOException { + close(); + + // clean up files created by this writer + WriteResult result = writeResult(); + Tasks.foreach(result.dataFiles()) + .throwFailureWhenFinished() + .noRetry() + .run(file -> io.deleteFile(file.path().toString())); + } + + @Override + public WriteResult complete() throws IOException { + close(); + return writeResult(); + } + + @Override + public void close() throws IOException { + writer.close(); + } + + private WriteResult writeResult() { + DataWriteResult result = writer.result(); + + return WriteResult.builder() + .addDataFiles(result.dataFiles()) + .build(); + } +} From 76fd2e98ff795e59fe6c2e89726f565e5506dec8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BA=B7ng=20Minh=20D=C5=A9ng?= Date: Tue, 15 Feb 2022 22:35:18 +0700 Subject: [PATCH 07/10] Flink: implement `FlinkTaskWriter` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Đặng Minh Dũng --- .../iceberg/flink/sink/FlinkTaskWriter.java | 214 ++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkTaskWriter.java diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkTaskWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkTaskWriter.java new file mode 100644 index 000000000000..5b6c06ad8a5b --- /dev/null +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkTaskWriter.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.io.DataWriteResult; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.PartitioningWriter; +import org.apache.iceberg.io.PartitioningWriterFactory; +import org.apache.iceberg.io.PathOffset; +import org.apache.iceberg.io.StructCopy; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructLikeMap; +import org.apache.iceberg.util.StructProjection; +import org.apache.iceberg.util.Tasks; + +public class FlinkTaskWriter implements TaskWriter { + public static Function partitionerFor( + PartitionSpec spec, + Schema schema, + RowType flinkSchema) { + return new Function() { + private final RowDataWrapper wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + private final PartitionKey partitionKey = new PartitionKey(spec, schema); + + @Override + public StructLike apply(RowData row) { + partitionKey.partition(wrapper.wrap(row)); + return partitionKey; + } + }; + } + + public static Function equalityKeyerFor( + Set equalityFieldIds, + Schema schema, + RowType flinkSchema) { + return new Function() { + private final RowDataWrapper wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + private final StructProjection structProjection = StructProjection.create(schema, equalityFieldIds); + + @Override + public StructLike apply(RowData row) { + structProjection.wrap(wrapper.wrap(row)); + return structProjection; + } + }; + } + + private final PartitioningWriter insertWriter; + private final PartitioningWriter equalityDeleteWriter; + private final PartitioningWriter, DeleteWriteResult> positionDeleteWriter; + + private final PartitionSpec spec; + private final FileIO io; + private final boolean upsert; + private final Function partitioner; + private final Function equalityKeyer; + + private StructLike partition; + private StructLike equalityKey; + private final PositionDelete positionDelete; + private final Map insertedRowMap; // map row Key to PathOffset + + protected FlinkTaskWriter( + PartitioningWriterFactory partitioningWriterFactory, + Function partitioner, + Schema schema, PartitionSpec spec, Set equalityFieldIds, + FileIO io, RowType flinkSchema, boolean upsert) { + this.insertWriter = partitioningWriterFactory.newDataWriter(); + this.equalityDeleteWriter = partitioningWriterFactory.newEqualityDeleteWriter(); + this.positionDeleteWriter = partitioningWriterFactory.newPositionDeleteWriter(); + + this.spec = spec; + this.io = io; + this.upsert = upsert; + + this.insertedRowMap = StructLikeMap.create(TypeUtil.select(schema.asStruct(), equalityFieldIds)); + this.positionDelete = PositionDelete.create(); + this.partitioner = partitioner; + this.equalityKeyer = equalityKeyerFor(equalityFieldIds, schema, flinkSchema); + } + + @Override + @SuppressWarnings("DuplicateBranchesInSwitch") + public void write(RowData row) throws IOException { + partition = partitioner.apply(row); + equalityKey = equalityKeyer.apply(row); + + switch (row.getRowKind()) { + case INSERT: + if (upsert) { + delete(row); + } + insert(row); + break; + + case UPDATE_BEFORE: + if (upsert) { + break; // UPDATE_BEFORE is not necessary for UPDATE, ignore to prevent deleting twice + } + delete(row); + break; + + case UPDATE_AFTER: + if (upsert) { + delete(row); + } + insert(row); + break; + + case DELETE: + delete(row); + break; + + default: + throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind()); + } + } + + protected boolean internalPosDelete(StructLike key) { + PathOffset previous = insertedRowMap.remove(key); + if (previous != null) { + positionDeleteWriter.write(previous.setTo(positionDelete), spec, partition); + return true; + } + return false; + } + + protected void insert(RowData row) throws IOException { + StructLike copiedKey = StructCopy.copy(equalityKey); + + internalPosDelete(copiedKey); + PathOffset pathOffset = insertWriter.write(row, spec, partition); + insertedRowMap.put(copiedKey, pathOffset); + } + + protected void delete(RowData row) throws IOException { + if (!internalPosDelete(equalityKey)) { + equalityDeleteWriter.write(row, spec, partition); + } + } + + @Override + public void abort() throws IOException { + close(); + + // clean up files created by this writer + WriteResult result = writeResult(); + Tasks.foreach(ObjectArrays.concat(result.dataFiles(), result.deleteFiles(), ContentFile.class)) + .throwFailureWhenFinished() + .noRetry() + .run(file -> io.deleteFile(file.path().toString())); + } + + @Override + public WriteResult complete() throws IOException { + close(); + return writeResult(); + } + + @Override + public void close() throws IOException { + insertWriter.close(); + equalityDeleteWriter.close(); + positionDeleteWriter.close(); + } + + private WriteResult writeResult() { + DataWriteResult insertResult = insertWriter.result(); + DeleteWriteResult equalityDeleteResult = equalityDeleteWriter.result(); + DeleteWriteResult positionDeleteResult = positionDeleteWriter.result(); + + return WriteResult.builder() + .addDataFiles(insertResult.dataFiles()) + .addDeleteFiles(equalityDeleteResult.deleteFiles()) + .addDeleteFiles(positionDeleteResult.deleteFiles()) + .addReferencedDataFiles(positionDeleteResult.referencedDataFiles()) + .build(); + } +} From da590422cf5f45d641413cf3717799d0104cd4a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BA=B7ng=20Minh=20D=C5=A9ng?= Date: Tue, 15 Feb 2022 22:43:59 +0700 Subject: [PATCH 08/10] Flink: re-implement `RowDataTaskWriterFactory` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit to use `DirectTaskWriter` and `FlinkTaskWriter` Signed-off-by: Đặng Minh Dũng --- .../iceberg/data/BaseFileWriterFactory.java | 3 +- .../flink/sink/RowDataTaskWriterFactory.java | 94 ++++++++----------- 2 files changed, 43 insertions(+), 54 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index 3791d348a845..51a997d99921 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -20,6 +20,7 @@ package org.apache.iceberg.data; import java.io.IOException; +import java.io.Serializable; import java.io.UncheckedIOException; import java.util.Map; import org.apache.iceberg.FileFormat; @@ -43,7 +44,7 @@ /** * A base writer factory to be extended by query engine integrations. */ -public abstract class BaseFileWriterFactory implements FileWriterFactory { +public abstract class BaseFileWriterFactory implements FileWriterFactory, Serializable { private final Table table; private final FileFormat dataFileFormat; private final Schema dataSchema; diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 2849100858a1..e745d372454a 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -20,21 +20,22 @@ package org.apache.iceberg.flink.sink; import java.util.List; +import java.util.function.Function; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.DirectTaskWriter; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileWriterFactory; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.io.PartitionedFanoutWriter; +import org.apache.iceberg.io.PartitioningWriterFactory; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.ArrayUtil; public class RowDataTaskWriterFactory implements TaskWriterFactory { @@ -47,16 +48,17 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory { private final FileFormat format; private final List equalityFieldIds; private final boolean upsert; - private final FileAppenderFactory appenderFactory; + private final FileWriterFactory writerFactory; private transient OutputFileFactory outputFileFactory; - public RowDataTaskWriterFactory(Table table, - RowType flinkSchema, - long targetFileSizeBytes, - FileFormat format, - List equalityFieldIds, - boolean upsert) { + public RowDataTaskWriterFactory( + Table table, + RowType flinkSchema, + long targetFileSizeBytes, + FileFormat format, + List equalityFieldIds, + boolean upsert) { this.table = table; this.schema = table.schema(); this.flinkSchema = flinkSchema; @@ -67,13 +69,19 @@ public RowDataTaskWriterFactory(Table table, this.equalityFieldIds = equalityFieldIds; this.upsert = upsert; - if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); - } else { - // TODO provide the ability to customize the equality-delete row schema. - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, - ArrayUtil.toIntArray(equalityFieldIds), schema, null); + FlinkFileWriterFactory.Builder writerFactoryBuilder = FlinkFileWriterFactory.builderFor(table) + .dataSchema(schema) + .dataFlinkType(flinkSchema) + .dataFileFormat(format) + .deleteFileFormat(format); + + if (equalityFieldIds != null && !equalityFieldIds.isEmpty()) { + writerFactoryBuilder + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(schema); } + + writerFactory = writerFactoryBuilder.build(); } @Override @@ -86,43 +94,23 @@ public TaskWriter create() { Preconditions.checkNotNull(outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize()."); - if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - // Initialize a task writer to write INSERT only. - if (spec.isUnpartitioned()) { - return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes); - } else { - return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory, - io, targetFileSizeBytes, schema, flinkSchema); - } - } else { - // Initialize a task writer to write both INSERT and equality DELETE. - if (spec.isUnpartitioned()) { - return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert); - } else { - return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert); - } - } - } - - private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter { - - private final PartitionKey partitionKey; - private final RowDataWrapper rowDataWrapper; + Function partitioner = spec.isPartitioned() ? + FlinkTaskWriter.partitionerFor(spec, schema, flinkSchema) : + DirectTaskWriter.unpartition(); - RowDataPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema, - RowType flinkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - this.partitionKey = new PartitionKey(spec, schema); - this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); - } + PartitioningWriterFactory partitioningWriterFactory = + PartitioningWriterFactory.builder(writerFactory) + .fileFactory(outputFileFactory) + .io(io) + .fileFormat(format) + .targetFileSizeInBytes(targetFileSizeBytes) + .buildForFanoutPartition(); - @Override - protected PartitionKey partition(RowData row) { - partitionKey.partition(rowDataWrapper.wrap(row)); - return partitionKey; + if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { + return new DirectTaskWriter<>(partitioningWriterFactory, partitioner, spec, io); + } else { + return new FlinkTaskWriter(partitioningWriterFactory, partitioner, + schema, spec, Sets.newHashSet(equalityFieldIds), io, flinkSchema, upsert); } } } From 732ef143dcd3e035784d3a5f7aced80eefe2caaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BA=B7ng=20Minh=20D=C5=A9ng?= Date: Tue, 15 Feb 2022 22:44:26 +0700 Subject: [PATCH 09/10] Flink: remove legacy `DeltaTaskWriter`s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Đặng Minh Dũng --- .../flink/sink/BaseDeltaTaskWriter.java | 107 ------------------ .../flink/sink/PartitionedDeltaWriter.java | 87 -------------- .../flink/sink/UnpartitionedDeltaWriter.java | 60 ---------- 3 files changed, 254 deletions(-) delete mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java delete mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java delete mode 100644 flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java deleted file mode 100644 index 8415129db9a7..000000000000 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import java.util.List; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.io.BaseTaskWriter; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; - -abstract class BaseDeltaTaskWriter extends BaseTaskWriter { - - private final Schema schema; - private final Schema deleteSchema; - private final RowDataWrapper wrapper; - private final boolean upsert; - - BaseDeltaTaskWriter(PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - RowType flinkSchema, - List equalityFieldIds, - boolean upsert) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - this.schema = schema; - this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); - this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); - this.upsert = upsert; - } - - abstract RowDataDeltaWriter route(RowData row); - - RowDataWrapper wrapper() { - return wrapper; - } - - @Override - public void write(RowData row) throws IOException { - RowDataDeltaWriter writer = route(row); - - switch (row.getRowKind()) { - case INSERT: - case UPDATE_AFTER: - if (upsert) { - writer.delete(row); - } - writer.write(row); - break; - - case UPDATE_BEFORE: - if (upsert) { - break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice - } - writer.delete(row); - break; - case DELETE: - writer.delete(row); - break; - - default: - throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind()); - } - } - - protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { - RowDataDeltaWriter(PartitionKey partition) { - super(partition, schema, deleteSchema); - } - - @Override - protected StructLike asStructLike(RowData data) { - return wrapper.wrap(data); - } - } -} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java deleted file mode 100644 index 1eee6298e933..000000000000 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.List; -import java.util.Map; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Tasks; - -class PartitionedDeltaWriter extends BaseDeltaTaskWriter { - - private final PartitionKey partitionKey; - - private final Map writers = Maps.newHashMap(); - - PartitionedDeltaWriter(PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - RowType flinkSchema, - List equalityFieldIds, - boolean upsert) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds, - upsert); - this.partitionKey = new PartitionKey(spec, schema); - } - - @Override - RowDataDeltaWriter route(RowData row) { - partitionKey.partition(wrapper().wrap(row)); - - RowDataDeltaWriter writer = writers.get(partitionKey); - if (writer == null) { - // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers. - PartitionKey copiedKey = partitionKey.copy(); - writer = new RowDataDeltaWriter(copiedKey); - writers.put(copiedKey, writer); - } - - return writer; - } - - @Override - public void close() { - try { - Tasks.foreach(writers.values()) - .throwFailureWhenFinished() - .noRetry() - .run(RowDataDeltaWriter::close, IOException.class); - - writers.clear(); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close equality delta writer", e); - } - } -} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java deleted file mode 100644 index 331ed7c78192..000000000000 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import java.util.List; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; - -class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { - private final RowDataDeltaWriter writer; - - UnpartitionedDeltaWriter(PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - RowType flinkSchema, - List equalityFieldIds, - boolean upsert) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds, - upsert); - this.writer = new RowDataDeltaWriter(null); - } - - @Override - RowDataDeltaWriter route(RowData row) { - return writer; - } - - @Override - public void close() throws IOException { - writer.close(); - } -} From 40354bbf9506f9ee91bad9e494b1bf3614e09578 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BA=B7ng=20Minh=20D=C5=A9ng?= Date: Tue, 15 Feb 2022 22:47:38 +0700 Subject: [PATCH 10/10] Spark: refactor to use `DirectTaskWriter` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Đặng Minh Dũng --- .../spark/source/WritersBenchmark.java | 92 ++++++++++--------- .../iceberg/spark/source/RowDataRewriter.java | 90 +++++++++++------- 2 files changed, 106 insertions(+), 76 deletions(-) diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java index 93ee5574a58d..d2f2562d17a8 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java @@ -23,11 +23,13 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.expressions.Expressions; @@ -36,11 +38,12 @@ import org.apache.iceberg.io.ClusteredEqualityDeleteWriter; import org.apache.iceberg.io.ClusteredPositionDeleteWriter; import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.DirectTaskWriter; import org.apache.iceberg.io.FanoutDataWriter; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitioningWriterFactory; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -152,19 +155,7 @@ public void writeUnpartitionedClusteredDataWriter(Blackhole blackhole) throws IO @Benchmark @Threads(1) public void writeUnpartitionedLegacyDataWriter(Blackhole blackhole) throws IOException { - FileIO io = table().io(); - - OutputFileFactory fileFactory = newFileFactory(); - - Schema writeSchema = table().schema(); - StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema); - SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType) - .spec(unpartitionedSpec) - .build(); - - TaskWriter writer = new UnpartitionedWriter<>( - unpartitionedSpec, fileFormat(), appenders, - fileFactory, io, TARGET_FILE_SIZE_IN_BYTES); + TaskWriter writer = newTaskWriter(false, false); try (TaskWriter closableWriter = writer) { for (InternalRow row : rows) { @@ -207,20 +198,7 @@ public void writePartitionedClusteredDataWriter(Blackhole blackhole) throws IOEx @Benchmark @Threads(1) public void writePartitionedLegacyDataWriter(Blackhole blackhole) throws IOException { - FileIO io = table().io(); - - OutputFileFactory fileFactory = newFileFactory(); - - Schema writeSchema = table().schema(); - StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema); - SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType) - .spec(partitionedSpec) - .build(); - - TaskWriter writer = new SparkPartitionedWriter( - partitionedSpec, fileFormat(), appenders, - fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, - writeSchema, sparkWriteType); + TaskWriter writer = newTaskWriter(true, false); try (TaskWriter closableWriter = writer) { for (InternalRow row : rows) { @@ -263,20 +241,7 @@ public void writePartitionedFanoutDataWriter(Blackhole blackhole) throws IOExcep @Benchmark @Threads(1) public void writePartitionedLegacyFanoutDataWriter(Blackhole blackhole) throws IOException { - FileIO io = table().io(); - - OutputFileFactory fileFactory = newFileFactory(); - - Schema writeSchema = table().schema(); - StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema); - SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType) - .spec(partitionedSpec) - .build(); - - TaskWriter writer = new SparkPartitionedFanoutWriter( - partitionedSpec, fileFormat(), appenders, - fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, - writeSchema, sparkWriteType); + TaskWriter writer = newTaskWriter(true, true); try (TaskWriter closableWriter = writer) { for (InternalRow row : rows) { @@ -298,7 +263,7 @@ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole) t SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) .dataFileFormat(fileFormat()) .equalityDeleteRowSchema(table().schema()) - .equalityFieldIds(new int[]{equalityFieldId}) + .equalityFieldIds(new int[] {equalityFieldId}) .build(); ClusteredEqualityDeleteWriter writer = new ClusteredEqualityDeleteWriter<>( @@ -351,4 +316,45 @@ private OutputFileFactory newFileFactory() { .format(fileFormat()) .build(); } + + private PartitioningWriterFactory newPartitioningWriterFactory(boolean fanout) { + Schema writeSchema = table().schema(); + FileIO io = table().io(); + + OutputFileFactory fileFactory = newFileFactory(); + SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table()) + .dataSchema(writeSchema) + .dataFileFormat(fileFormat()) + .deleteFileFormat(fileFormat()) + .build(); + PartitioningWriterFactory.Builder builder = + PartitioningWriterFactory.builder(writerFactory) + .fileFactory(fileFactory) + .io(io) + .fileFormat(fileFormat()) + .targetFileSizeInBytes(TARGET_FILE_SIZE_IN_BYTES); + + return fanout ? builder.buildForFanoutPartition() : builder.buildForClusteredPartition(); + } + + private TaskWriter newTaskWriter(boolean partition, boolean fanout) { + Schema writeSchema = table().schema(); + PartitionSpec spec = table().spec(); + FileIO io = table().io(); + StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema); + + PartitioningWriterFactory partitioningWriterFactory = newPartitioningWriterFactory(fanout); + Function partitioner = partition ? new Function() { + private final PartitionKey partitionKey = new PartitionKey(spec, writeSchema); + private final InternalRowWrapper wrapper = new InternalRowWrapper(sparkWriteType); + + @Override + public StructLike apply(InternalRow row) { + partitionKey.partition(wrapper.wrap(row)); + return partitionKey; + } + } : DirectTaskWriter.unpartition(); + + return new DirectTaskWriter<>(partitioningWriterFactory, partitioner, partitionedSpec, io); + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java index 63cc3a466c1a..a7e53b6cc219 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java @@ -24,17 +24,21 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.io.DirectTaskWriter; import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitioningWriterFactory; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.util.PropertyUtil; @@ -47,7 +51,6 @@ import org.slf4j.LoggerFactory; public class RowDataRewriter implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class); private final Broadcast tableBroadcast; @@ -80,35 +83,8 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio long taskId = context.taskAttemptId(); Table table = tableBroadcast.value(); - Schema schema = table.schema(); - Map properties = table.properties(); - - RowDataReader dataReader = new RowDataReader(task, table, schema, caseSensitive); - - StructType structType = SparkSchemaUtil.convert(schema); - SparkAppenderFactory appenderFactory = SparkAppenderFactory.builderFor(table, schema, structType) - .spec(spec) - .build(); - OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, partitionId, taskId) - .defaultSpec(spec) - .format(format) - .build(); - - TaskWriter writer; - if (spec.isUnpartitioned()) { - writer = new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, table.io(), - Long.MAX_VALUE); - } else if (PropertyUtil.propertyAsBoolean(properties, - TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, - TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)) { - writer = new SparkPartitionedFanoutWriter( - spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE, schema, - structType); - } else { - writer = new SparkPartitionedWriter( - spec, format, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE, schema, - structType); - } + RowDataReader dataReader = new RowDataReader(task, table, table.schema(), caseSensitive); + TaskWriter writer = newTaskWriter(table, partitionId, taskId); try { while (dataReader.next()) { @@ -121,7 +97,6 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio writer.close(); return Lists.newArrayList(writer.dataFiles()); - } catch (Throwable originalThrowable) { try { LOG.error("Aborting task", originalThrowable); @@ -135,7 +110,6 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio writer.abort(); LOG.error("Aborted commit for partition {} (task {}, attempt {}, stage {}.{})", partitionId, taskId, context.taskAttemptId(), context.stageId(), context.stageAttemptNumber()); - } catch (Throwable inner) { if (originalThrowable != inner) { originalThrowable.addSuppressed(inner); @@ -150,4 +124,54 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio } } } + + private TaskWriter newTaskWriter(Table table, int partitionId, long taskId) { + Schema schema = table.schema(); + Map properties = table.properties(); + StructType structType = SparkSchemaUtil.convert(schema); + + OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, partitionId, taskId) + .defaultSpec(spec) + .format(format) + .build(); + + Function partitioner; + if (spec.isPartitioned()) { + partitioner = new Function() { + private final PartitionKey partitionKey = new PartitionKey(spec, schema); + private final InternalRowWrapper wrapper = new InternalRowWrapper(structType); + + @Override + public StructLike apply(InternalRow row) { + partitionKey.partition(wrapper.wrap(row)); + return partitionKey; + } + }; + } else { + partitioner = DirectTaskWriter.unpartition(); + } + + SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table) + .dataSchema(schema) + .dataFileFormat(format) + .deleteFileFormat(format) + .build(); + + PartitioningWriterFactory.Builder partitioningWriterFactoryBuilder = + PartitioningWriterFactory.builder(writerFactory) + .fileFactory(fileFactory) + .io(table.io()) + .fileFormat(format) + .targetFileSizeInBytes(Long.MAX_VALUE); + + PartitioningWriterFactory partitioningWriterFactory = + PropertyUtil.propertyAsBoolean( + properties, + TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, + TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT + ) ? partitioningWriterFactoryBuilder.buildForFanoutPartition() : + partitioningWriterFactoryBuilder.buildForClusteredPartition(); + + return new DirectTaskWriter<>(partitioningWriterFactory, partitioner, spec, table.io()); + } }