diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java index c914ad224f30..003d4c8996a1 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java @@ -43,6 +43,7 @@ public class EqualityDeleteWriter implements FileWriter private final int[] equalityFieldIds; private final SortOrder sortOrder; private DeleteFile deleteFile = null; + private long recordCount = 0; public EqualityDeleteWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata, @@ -60,6 +61,17 @@ public EqualityDeleteWriter(FileAppender appender, FileFormat format, String @Override public void write(T row) { appender.add(row); + recordCount += 1; + } + + @Override + public CharSequence location() { + return location; + } + + @Override + public long recordCount() { + return recordCount; } /** @@ -69,7 +81,7 @@ public void write(T row) { */ @Deprecated public void deleteAll(Iterable rows) { - appender.addAll(rows); + rows.forEach(this::write); } /** @@ -79,7 +91,7 @@ public void deleteAll(Iterable rows) { */ @Deprecated public void delete(T row) { - appender.add(row); + write(row); } @Override diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index a7dff07e7105..e9f0eef79f7b 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -43,6 +43,7 @@ public class PositionDeleteWriter implements FileWriter, De private final PositionDelete delete; private final CharSequenceSet referencedDataFiles; private DeleteFile deleteFile = null; + private long recordCount = 0; public PositionDeleteWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { @@ -60,6 +61,17 @@ public PositionDeleteWriter(FileAppender appender, FileFormat format public void write(PositionDelete positionDelete) { referencedDataFiles.add(positionDelete.path()); appender.add(positionDelete); + recordCount += 1; + } + + @Override + public CharSequence location() { + return location; + } + + @Override + public long recordCount() { + return recordCount; } /** @@ -69,7 +81,7 @@ public void write(PositionDelete positionDelete) { */ @Deprecated public void delete(CharSequence path, long pos) { - delete(path, pos, null); + write(delete.set(path, pos, null)); } /** @@ -79,8 +91,7 @@ public void delete(CharSequence path, long pos) { */ @Deprecated public void delete(CharSequence path, long pos, T row) { - referencedDataFiles.add(path); - appender.add(delete.set(path, pos, row)); + write(delete.set(path, pos, row)); } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/BaseEqualityDeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseEqualityDeltaWriter.java new file mode 100644 index 000000000000..378df85aa390 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/BaseEqualityDeltaWriter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Map; +import java.util.function.Function; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.StructLikeMap; +import org.apache.iceberg.util.StructProjection; + +public class BaseEqualityDeltaWriter implements EqualityDeltaWriter { + + private final PositionDelete posDelete = PositionDelete.create(); + + private final PartitioningWriter dataWriter; + private final PartitioningWriter equalityWriter; + private final PartitioningWriter, DeleteWriteResult> positionWriter; + + private final Map insertedRowMap; + + private final Function asStructLikeKey; + private final Function keyRefFunc; + private final Function keyCopyFunc; + + private boolean closed = false; + + public BaseEqualityDeltaWriter( + PartitioningWriter dataWriter, + PartitioningWriter equalityWriter, + PartitioningWriter, DeleteWriteResult> positionWriter, + Schema schema, + Schema deleteSchema, + Function asStructLike, + Function asStructLikeKey) { + this.dataWriter = dataWriter; + this.equalityWriter = equalityWriter; + this.positionWriter = positionWriter; + + this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct()); + this.asStructLikeKey = asStructLikeKey; + + StructProjection projection = StructProjection.create(schema, deleteSchema); + this.keyRefFunc = row -> projection.wrap(asStructLike.apply(row)); + this.keyCopyFunc = row -> StructCopy.copy(keyRefFunc.apply(row)); + } + + @Override + public void insert(T row, PartitionSpec spec, StructLike partition) { + PathOffset pathOffset = dataWriter.write(row, spec, partition); + + // Create a copied key from this row. + StructLike copiedKey = keyCopyFunc.apply(row); + + // Adding a pos-delete to replace the old path-offset. + PathOffset previous = insertedRowMap.put(copiedKey, pathOffset); + if (previous != null) { + posDelete.set(previous.path(), previous.rowOffset(), null); + positionWriter.write(posDelete, spec, partition); + } + } + + /** + * Retire the old key & position from insertedRowMap cache to position delete file. + */ + private boolean retireOldKey(StructLike key, PartitionSpec spec, StructLike partition) { + PathOffset previous = insertedRowMap.remove(key); + if (previous != null) { + posDelete.set(previous.path(), previous.rowOffset(), null); + positionWriter.write(posDelete, spec, partition); + return true; + } else { + return false; + } + } + + @Override + public void delete(T row, PartitionSpec spec, StructLike partition) { + if (!retireOldKey(keyRefFunc.apply(row), spec, partition)) { + equalityWriter.write(row, spec, partition); + } + } + + @Override + public void deleteKey(T key, PartitionSpec spec, StructLike partition) { + if (!retireOldKey(asStructLikeKey.apply(key), spec, partition)) { + equalityWriter.write(key, spec, partition); + } + } + + @Override + public WriteResult result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer."); + + DataWriteResult dataWriteResult = dataWriter.result(); + DeleteWriteResult positionWriteResult = positionWriter.result(); + DeleteWriteResult equalityWriteResult = equalityWriter.result(); + + return WriteResult.builder() + .addDataFiles(dataWriteResult.dataFiles()) + .addDeleteFiles(positionWriteResult.deleteFiles()) + .addDeleteFiles(equalityWriteResult.deleteFiles()) + .addReferencedDataFiles(positionWriteResult.referencedDataFiles()) + .build(); + } + + @Override + public void close() throws IOException { + if (!closed) { + dataWriter.close(); + positionWriter.close(); + equalityWriter.close(); + + this.closed = true; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index 386405afc27f..45349f2c769d 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -32,7 +32,6 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; -import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -130,7 +129,7 @@ public void write(T row) throws IOException { PathOffset previous = insertedRowMap.put(copiedKey, pathOffset); if (previous != null) { // TODO attach the previous row if has a positional-delete row schema in appender factory. - posDeleteWriter.delete(previous.path, previous.rowOffset, null); + posDeleteWriter.delete(previous.path(), previous.rowOffset(), null); } dataWriter.write(row); @@ -146,7 +145,7 @@ private boolean internalPosDelete(StructLike key) { if (previous != null) { // TODO attach the previous row if has a positional-delete row schema in appender factory. - posDeleteWriter.delete(previous.path, previous.rowOffset, null); + posDeleteWriter.delete(previous.path(), previous.rowOffset(), null); return true; } @@ -205,28 +204,6 @@ public void close() throws IOException { } } - private static class PathOffset { - private final CharSequence path; - private final long rowOffset; - - private PathOffset(CharSequence path, long rowOffset) { - this.path = path; - this.rowOffset = rowOffset; - } - - private static PathOffset of(CharSequence path, long rowOffset) { - return new PathOffset(path, rowOffset); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("path", path) - .add("row_offset", rowOffset) - .toString(); - } - } - private abstract class BaseRollingWriter implements Closeable { private static final int ROWS_DIVISOR = 1000; private final StructLike partitionKey; diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java index 61a6f9f9164d..8d981dd3d13a 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java @@ -64,7 +64,7 @@ abstract class ClusteredWriter implements PartitioningWriter { protected abstract R aggregatedResult(); @Override - public void write(T row, PartitionSpec spec, StructLike partition) { + public PathOffset write(T row, PartitionSpec spec, StructLike partition) { if (!spec.equals(currentSpec)) { if (currentSpec != null) { closeCurrentWriter(); @@ -85,7 +85,6 @@ public void write(T row, PartitionSpec spec, StructLike partition) { // copy the partition key as the key object may be reused this.currentPartition = StructCopy.copy(partition); this.currentWriter = newWriter(currentSpec, currentPartition); - } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) { closeCurrentWriter(); completedPartitions.add(currentPartition); @@ -100,7 +99,10 @@ public void write(T row, PartitionSpec spec, StructLike partition) { this.currentWriter = newWriter(currentSpec, currentPartition); } + PathOffset pathOffset = PathOffset.of(currentWriter.location(), currentWriter.recordCount()); currentWriter.write(row); + + return pathOffset; } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java index 090ccebfa80f..f4521c6174d0 100644 --- a/core/src/main/java/org/apache/iceberg/io/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -39,6 +39,7 @@ public class DataWriter implements FileWriter { private final ByteBuffer keyMetadata; private final SortOrder sortOrder; private DataFile dataFile = null; + private long recordCount = 0; public DataWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { @@ -59,6 +60,17 @@ public DataWriter(FileAppender appender, FileFormat format, String location, @Override public void write(T row) { appender.add(row); + recordCount += 1; + } + + @Override + public CharSequence location() { + return location; + } + + @Override + public long recordCount() { + return recordCount; } /** diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java new file mode 100644 index 000000000000..99d26180590f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +public class FanoutEqualityDeleteWriter extends FanoutWriter { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final long targetFileSizeInBytes; + + private final List deleteFiles; + + public FanoutEqualityDeleteWriter( + FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.deleteFiles = Lists.newArrayList(); + } + + @Override + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { + return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, CharSequenceSet.empty()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java new file mode 100644 index 000000000000..7f3cc59c2784 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +public class FanoutPositionDeleteWriter extends FanoutWriter, DeleteWriteResult> { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final long targetFileSizeBytes; + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public FanoutPositionDeleteWriter( + FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, long targetFileSizeBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.targetFileSizeBytes = targetFileSizeBytes; + this.deleteFiles = Lists.newArrayList(); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + @Override + protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeBytes, spec, partition); + } + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java index 631fc0a6d4ea..7d72675da439 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java @@ -49,9 +49,13 @@ abstract class FanoutWriter implements PartitioningWriter { protected abstract R aggregatedResult(); @Override - public void write(T row, PartitionSpec spec, StructLike partition) { + public PathOffset write(T row, PartitionSpec spec, StructLike partition) { FileWriter writer = writer(spec, partition); + + PathOffset pathOffset = PathOffset.of(writer.location(), writer.recordCount()); writer.write(row); + + return pathOffset; } private FileWriter writer(PartitionSpec spec, StructLike partition) { diff --git a/core/src/main/java/org/apache/iceberg/io/FileWriter.java b/core/src/main/java/org/apache/iceberg/io/FileWriter.java index 6f0c4ab2194a..3205afabb7c0 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FileWriter.java @@ -54,6 +54,20 @@ default void write(Iterable rows) { */ void write(T row); + /** + * Returns the file path that is currently opened. + * + * @return the current file path. + */ + CharSequence location(); + + /** + * Returns the number of top-level records in this file. + * + * @return the current row offset. + */ + long recordCount(); + /** * Returns the number of bytes that were currently written by this writer. * diff --git a/core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java b/core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java index 9b57676f099d..130b5d2d056f 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java @@ -19,6 +19,7 @@ package org.apache.iceberg.io; +import java.io.Serializable; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.EqualityDeleteWriter; @@ -28,7 +29,7 @@ /** * A factory for creating data and delete writers. */ -public interface FileWriterFactory { +public interface FileWriterFactory extends Serializable { /** * Creates a new {@link DataWriter}. diff --git a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java index 4afdd2162f8b..c1be23879ead 100644 --- a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java @@ -46,7 +46,7 @@ public interface PartitioningWriter extends Closeable { * @param spec a partition spec * @param partition a partition or null if the spec is unpartitioned */ - void write(T row, PartitionSpec spec, StructLike partition); + PathOffset write(T row, PartitionSpec spec, StructLike partition); /** * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. diff --git a/core/src/main/java/org/apache/iceberg/io/PathOffset.java b/core/src/main/java/org/apache/iceberg/io/PathOffset.java new file mode 100644 index 000000000000..9bb095a410ad --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/PathOffset.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +public class PathOffset { + + private final CharSequence path; + private final long rowOffset; + + private PathOffset(CharSequence path, long rowOffset) { + this.path = path; + this.rowOffset = rowOffset; + } + + public static PathOffset of(CharSequence path, long rowOffset) { + return new PathOffset(path, rowOffset); + } + + public CharSequence path() { + return path; + } + + public long rowOffset() { + return rowOffset; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("path", path) + .add("row_offset", rowOffset) + .toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java index 80a589b18095..83ab0a1c7304 100644 --- a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java @@ -68,14 +68,6 @@ protected StructLike partition() { return partition; } - public CharSequence currentFilePath() { - return currentFile.encryptingOutputFile().location(); - } - - public long currentFileRows() { - return currentFileRows; - } - @Override public long length() { throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length"); @@ -92,6 +84,16 @@ public void write(T row) { } } + @Override + public CharSequence location() { + return currentFile.encryptingOutputFile().location(); + } + + @Override + public long recordCount() { + return currentFileRows; + } + private boolean shouldRollToNewFile() { return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >= targetFileSizeInBytes; } diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index 36a0313a4e41..5021226427be 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -83,6 +83,16 @@ public void write(PositionDelete payload) { delete(payload.path(), payload.pos(), payload.row()); } + @Override + public CharSequence location() { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement location."); + } + + @Override + public long recordCount() { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement recordCount."); + } + public void delete(CharSequence path, long pos) { delete(path, pos, null); } diff --git a/data/src/test/java/org/apache/iceberg/io/TestEqualityDeltaWriters.java b/data/src/test/java/org/apache/iceberg/io/TestEqualityDeltaWriters.java new file mode 100644 index 000000000000..20e469c87b40 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/io/TestEqualityDeltaWriters.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public abstract class TestEqualityDeltaWriters extends WriterTestBase { + + @Parameterized.Parameters(name = "FileFormat={0}") + public static Object[] parameters() { + return new Object[] { + new Object[] {FileFormat.AVRO}, + new Object[] {FileFormat.PARQUET}, + new Object[] {FileFormat.ORC} + }; + } + + private static final int TABLE_FORMAT_VERSION = 2; + private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024; + private static final GenericRecord RECORD = GenericRecord.create(SCHEMA); + private static final Schema POS_DELETE_SCHEMA = DeleteSchemaUtil.pathPosSchema(); + private static final GenericRecord POS_RECORD = GenericRecord.create(DeleteSchemaUtil.pathPosSchema()); + + private final FileFormat fileFormat; + + private FileIO io; + private int idFieldId; + private int dataFieldId; + + private OutputFileFactory fileFactory = null; + + public TestEqualityDeltaWriters(FileFormat fileFormat) { + super(TABLE_FORMAT_VERSION); + this.fileFormat = fileFormat; + } + + @Override + @Before + public void setupTable() throws Exception { + this.tableDir = temp.newFolder(); + Assert.assertTrue(tableDir.delete()); // created during table creation + + this.metadataDir = new File(tableDir, "metadata"); + this.table = create(SCHEMA, PartitionSpec.unpartitioned()); + this.table.updateProperties() + .set(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.toString().toLowerCase()) + .set(TableProperties.DELETE_DEFAULT_FILE_FORMAT, fileFormat.toString().toLowerCase()) + .commit(); + + this.io = table.io(); + + this.idFieldId = table.schema().findField("id").fieldId(); + this.dataFieldId = table.schema().findField("data").fieldId(); + + this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build(); + } + + protected FileFormat format() { + return fileFormat; + } + + protected abstract T toKey(List keyFieldIds, Integer id, String data); + + protected abstract StructLikeSet toSet(Iterable records); + + public abstract StructLike asStructLike(T data); + + public abstract StructLike asStructLikeKey(List keyFieldIds, T key); + + @Test + public void testInsertOnly() throws IOException { + // Commit the first row collection. + EqualityDeltaWriter deltaWriter = createEqualityWriter(table.schema(), fullKey(), false); + + List rows = IntStream.range(0, 20) + .mapToObj(i -> toRow(i, String.format("val-%d", i))) + .collect(Collectors.toList()); + for (T row : rows) { + deltaWriter.insert(row, PartitionSpec.unpartitioned(), null); + } + + deltaWriter.close(); + WriteResult result = deltaWriter.result(); + Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length); + Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length); + commitTransaction(result); + Assert.assertEquals("Should have expected records", toSet(rows), actualRowSet("*")); + + // Commit the second row collection. + deltaWriter = createEqualityWriter(table.schema(), fullKey(), false); + + rows = IntStream.range(20, 40) + .mapToObj(i -> toRow(i, String.format("val-%d", i))) + .collect(Collectors.toList()); + for (T row : rows) { + deltaWriter.insert(row, PartitionSpec.unpartitioned(), null); + } + + deltaWriter.close(); + result = deltaWriter.result(); + Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length); + Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length); + commitTransaction(result); + + rows = IntStream.range(0, 40) + .mapToObj(i -> toRow(i, String.format("val-%d", i))) + .collect(Collectors.toList()); + Assert.assertEquals("Should have expected records", toSet(rows), actualRowSet("*")); + } + + @Test + public void testInsertDuplicatedKey() throws IOException { + EqualityDeltaWriter deltaWriter = createEqualityWriter(table.schema(), idKey(), false); + List records = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "bbb"), + toRow(3, "ccc"), + toRow(4, "ddd"), + toRow(4, "eee"), + toRow(3, "fff"), + toRow(2, "ggg"), + toRow(1, "hhh") + ); + records.forEach(row -> deltaWriter.insert(row, PartitionSpec.unpartitioned(), null)); + + deltaWriter.close(); + WriteResult result = deltaWriter.result(); + commitTransaction(result); + + Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length); + Assert.assertEquals("Should have a pos-delete file.", 1, result.deleteFiles().length); + DeleteFile posDeleteFile = result.deleteFiles()[0]; + Assert.assertEquals("Should be a pos-delete file.", FileContent.POSITION_DELETES, posDeleteFile.content()); + Assert.assertEquals(1, result.referencedDataFiles().length); + DataFile dataFile = result.dataFiles()[0]; + Assert.assertEquals("Should have the expected referenced data file", + dataFile.path(), result.referencedDataFiles()[0]); + Assert.assertEquals("Should have expected records", toSet(ImmutableList.of( + toRow(4, "eee"), + toRow(3, "fff"), + toRow(2, "ggg"), + toRow(1, "hhh") + )), actualRowSet("*")); + + // Check records in the data file. + List expectedRecords = ImmutableList.of( + toRecord(1, "aaa"), + toRecord(2, "bbb"), + toRecord(3, "ccc"), + toRecord(4, "ddd"), + toRecord(4, "eee"), + toRecord(3, "fff"), + toRecord(2, "ggg"), + toRecord(1, "hhh") + ); + Assert.assertEquals(expectedRecords, readFile(fileFormat, table.schema(), dataFile.path())); + + // Check records in the pos-delete file. + Assert.assertEquals(ImmutableList.of( + toPosDelete(dataFile.path(), 3L), + toPosDelete(dataFile.path(), 2L), + toPosDelete(dataFile.path(), 1L), + toPosDelete(dataFile.path(), 0L) + ), readFile(fileFormat, POS_DELETE_SCHEMA, posDeleteFile.path())); + } + + @Test + public void testUpsertSameRow() throws IOException { + EqualityDeltaWriter deltaWriter = createEqualityWriter(table.schema(), fullKey(), false); + + T row = toRow(1, "aaa"); + deltaWriter.insert(row, PartitionSpec.unpartitioned(), null); + + // UPSERT <1, 'aaa'> to <1, 'aaa'> + deltaWriter.delete(row, PartitionSpec.unpartitioned(), null); + deltaWriter.insert(row, PartitionSpec.unpartitioned(), null); + + deltaWriter.close(); + WriteResult result = deltaWriter.result(); + Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length); + Assert.assertEquals("Should have a pos-delete file.", 1, result.deleteFiles().length); + commitTransaction(result); + Assert.assertEquals("Should have an expected record", toSet(ImmutableList.of(row)), actualRowSet("*")); + + // Check records in the data file. + DataFile dataFile = result.dataFiles()[0]; + Assert.assertEquals( + ImmutableList.of(toRecord(1, "aaa"), toRecord(1, "aaa")), + readFile(fileFormat, table.schema(), dataFile.path())); + + // Check records in the pos-delete file. + DeleteFile posDeleteFile = result.deleteFiles()[0]; + Assert.assertEquals( + ImmutableList.of(toPosDelete(dataFile.path(), 0L)), + readFile(fileFormat, POS_DELETE_SCHEMA, posDeleteFile.path())); + + // DELETE the row. + deltaWriter = createEqualityWriter(table.schema(), fullKey(), false); + deltaWriter.delete(row, PartitionSpec.unpartitioned(), null); + deltaWriter.close(); + result = deltaWriter.result(); + Assert.assertEquals("Should have 0 data file.", 0, result.dataFiles().length); + Assert.assertEquals("Should have 1 eq-delete file.", 1, result.deleteFiles().length); + Assert.assertEquals(0, result.referencedDataFiles().length); + Assert.assertEquals(FileContent.EQUALITY_DELETES, result.deleteFiles()[0].content()); + commitTransaction(result); + Assert.assertEquals("Should have no record", toSet(ImmutableList.of()), actualRowSet("*")); + Assert.assertEquals( + ImmutableList.of(toRecord(1, "aaa")), + readFile(fileFormat, table.schema(), result.deleteFiles()[0].path())); + } + + @Test + public void testUpsertMultipleRows() throws IOException { + EqualityDeltaWriter deltaWriter = createEqualityWriter(table.schema(), dataKey(), false); + + List rows = Lists.newArrayList( + toRow(1, "aaa"), + toRow(2, "bbb"), + toRow(3, "aaa"), + toRow(3, "ccc"), + toRow(4, "ccc") + ); + for (T row : rows) { + deltaWriter.insert(row, PartitionSpec.unpartitioned(), null); + } + + // Commit the 1th transaction. + deltaWriter.close(); + WriteResult result = deltaWriter.result(); + Assert.assertEquals("Should have a data file", 1, result.dataFiles().length); + Assert.assertEquals("Should have a pos-delete file for deduplication purpose", 1, result.deleteFiles().length); + Assert.assertEquals("Should be pos-delete file", FileContent.POSITION_DELETES, result.deleteFiles()[0].content()); + Assert.assertEquals(1, result.referencedDataFiles().length); + commitTransaction(result); + + Assert.assertEquals("Should have expected records", toSet(ImmutableList.of( + toRow(2, "bbb"), + toRow(3, "aaa"), + toRow(4, "ccc") + )), actualRowSet("*")); + + // Start the 2nd transaction. + deltaWriter = createEqualityWriter(table.schema(), dataKey(), false); + + // UPSERT <3,'aaa'> to <5,'aaa'> - (by delete the key) + deltaWriter.deleteKey(toKey(dataKey(), 3, "aaa"), PartitionSpec.unpartitioned(), null); + deltaWriter.insert(toRow(5, "aaa"), PartitionSpec.unpartitioned(), null); + + // UPSERT <5,'aaa'> to <6,'aaa'> - (by delete the key) + deltaWriter.deleteKey(toKey(dataKey(), 5, "aaa"), PartitionSpec.unpartitioned(), null); + deltaWriter.insert(toRow(6, "aaa"), PartitionSpec.unpartitioned(), null); + + // UPSERT <4,'ccc'> to <7,'ccc'> - (by delete the key) + deltaWriter.deleteKey(toKey(dataKey(), 4, "ccc"), PartitionSpec.unpartitioned(), null); + deltaWriter.insert(toRow(7, "ccc"), PartitionSpec.unpartitioned(), null); + + // DELETE <2, 'bbb'> - (by delete the key) + deltaWriter.deleteKey(toKey(dataKey(), 2, "bbb"), PartitionSpec.unpartitioned(), null); + + // Commit the 2nd transaction. + deltaWriter.close(); + result = deltaWriter.result(); + Assert.assertEquals(1, result.dataFiles().length); + Assert.assertEquals(2, result.deleteFiles().length); + commitTransaction(result); + + Assert.assertEquals("Should have expected records", toSet(ImmutableList.of( + toRow(6, "aaa"), + toRow(7, "ccc") + )), actualRowSet("*")); + + // Check records in the data file. + DataFile dataFile = result.dataFiles()[0]; + Assert.assertEquals(ImmutableList.of( + toRecord(5, "aaa"), + toRecord(6, "aaa"), + toRecord(7, "ccc") + ), readFile(fileFormat, table.schema(), dataFile.path())); + + // Check records in position delete file. + DeleteFile posDeleteFile = result.deleteFiles()[0]; + Assert.assertEquals(FileContent.POSITION_DELETES, posDeleteFile.content()); + Assert.assertEquals(ImmutableList.of( + toPosDelete(dataFile.path(), 0L) + ), readFile(fileFormat, POS_DELETE_SCHEMA, posDeleteFile.path())); + + // Check records in the equality delete file. + DeleteFile eqDeleteFile = result.deleteFiles()[1]; + Assert.assertEquals(FileContent.EQUALITY_DELETES, eqDeleteFile.content()); + Schema keySchema = TypeUtil.select(table.schema(), Sets.newHashSet(dataKey())); + GenericRecord dataRecord = GenericRecord.create(keySchema.asStruct()); + Assert.assertEquals(ImmutableList.of( + dataRecord.copy("data", "aaa"), + dataRecord.copy("data", "ccc"), + dataRecord.copy("data", "bbb") + ), readFile(fileFormat, keySchema, eqDeleteFile.path())); + } + + @Test + public void testUpsertDataWithFullRowSchema() throws IOException { + EqualityDeltaWriter deltaWriter = createEqualityWriter(table.schema(), table.schema(), dataKey(), false); + + List rows = ImmutableList.of( + toRow(1, "aaa"), + toRow(2, "bbb"), + toRow(3, "aaa"), + toRow(3, "ccc"), + toRow(4, "ccc") + ); + for (T row : rows) { + deltaWriter.insert(row, PartitionSpec.unpartitioned(), null); + } + + // Commit the 1th transaction. + deltaWriter.close(); + WriteResult result = deltaWriter.result(); + Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length); + Assert.assertEquals("Should have a pos-delete file for deduplication purpose", 1, result.deleteFiles().length); + Assert.assertEquals("Should be a pos-delete file", FileContent.POSITION_DELETES, result.deleteFiles()[0].content()); + Assert.assertEquals(1, result.referencedDataFiles().length); + commitTransaction(result); + + Assert.assertEquals("Should have expected records", toSet(ImmutableList.of( + toRow(2, "bbb"), + toRow(3, "aaa"), + toRow(4, "ccc") + )), actualRowSet("*")); + + // Start the 2nd transaction. + deltaWriter = createEqualityWriter(table.schema(), table.schema(), dataKey(), false); + + // UPSERT <3, 'aaa'> to <5, 'aaa'> - (by delete the entire row). + deltaWriter.delete(toRow(3, "aaa"), PartitionSpec.unpartitioned(), null); + deltaWriter.insert(toRow(5, "aaa"), PartitionSpec.unpartitioned(), null); + + // UPSERT <5, 'aaa'> to <6, 'aaa'> - (by delete the entire row) + deltaWriter.delete(toRow(5, "aaa"), PartitionSpec.unpartitioned(), null); + deltaWriter.insert(toRow(6, "aaa"), PartitionSpec.unpartitioned(), null); + + // UPSERT <4, 'ccc'> to <7, 'ccc'> - (by delete the entire row) + deltaWriter.delete(toRow(4, "ccc"), PartitionSpec.unpartitioned(), null); + deltaWriter.insert(toRow(7, "ccc"), PartitionSpec.unpartitioned(), null); + + // DELETE <2, 'bbb'> - (by delete the entire row) + deltaWriter.delete(toRow(2, "bbb"), PartitionSpec.unpartitioned(), null); + + // Commit the 2nd transaction. + deltaWriter.close(); + result = deltaWriter.result(); + Assert.assertEquals(1, result.dataFiles().length); + Assert.assertEquals(2, result.deleteFiles().length); + Assert.assertEquals(1, result.referencedDataFiles().length); + commitTransaction(result); + + Assert.assertEquals("Should have expected records", toSet(ImmutableList.of( + toRow(6, "aaa"), + toRow(7, "ccc") + )), actualRowSet("*")); + + // Check records in the data file. + DataFile dataFile = result.dataFiles()[0]; + Assert.assertEquals(ImmutableList.of( + toRecord(5, "aaa"), + toRecord(6, "aaa"), + toRecord(7, "ccc") + ), readFile(fileFormat, table.schema(), dataFile.path())); + + // Check records in the pos-delete file. + DeleteFile posDeleteFile = result.deleteFiles()[0]; + Assert.assertEquals(FileContent.POSITION_DELETES, posDeleteFile.content()); + Assert.assertEquals(ImmutableList.of( + toPosDelete(dataFile.path(), 0L) + ), readFile(fileFormat, POS_DELETE_SCHEMA, posDeleteFile.path())); + + // Check records in the equality delete file. + DeleteFile eqDeleteFile = result.deleteFiles()[1]; + Assert.assertEquals(FileContent.EQUALITY_DELETES, eqDeleteFile.content()); + Assert.assertEquals(ImmutableList.of( + toRecord(3, "aaa"), + toRecord(4, "ccc"), + toRecord(2, "bbb") + ), readFile(fileFormat, table.schema(), eqDeleteFile.path())); + } + + private Record toRecord(Integer id, String data) { + return RECORD.copy("id", id, "data", data); + } + + private Record toPosDelete(CharSequence path, Long pos) { + return POS_RECORD.copy("file_path", path, "pos", pos); + } + + protected List fullKey() { + return ImmutableList.of(idFieldId, dataFieldId); + } + + protected List idKey() { + return ImmutableList.of(idFieldId); + } + + protected List dataKey() { + return ImmutableList.of(dataFieldId); + } + + private void commitTransaction(WriteResult result) { + RowDelta rowDelta = table.newRowDelta(); + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + + rowDelta.validateDeletedFiles() + .validateDataFilesExist(Lists.newArrayList(result.referencedDataFiles())) + .commit(); + } + + private EqualityDeltaWriter createEqualityWriter( + Schema schema, + List equalityFieldIds, + boolean fanoutEnabled) { + // Select the equality fields to generate the delete schema. + Schema deleteSchema = TypeUtil.select(schema, ImmutableSet.copyOf(equalityFieldIds)); + return createEqualityWriter(schema, deleteSchema, equalityFieldIds, fanoutEnabled); + } + + private EqualityDeltaWriter createEqualityWriter( + Schema schema, + Schema equalityDeleteSchema, + List equalityFieldIds, + boolean fanoutEnabled) { + FileWriterFactory writerFactory = newWriterFactory(schema, equalityFieldIds, equalityDeleteSchema, null); + + PartitioningWriter dataWriter; + PartitioningWriter eqWriter; + PartitioningWriter, DeleteWriteResult> posWriter; + + if (fanoutEnabled) { + dataWriter = new FanoutDataWriter<>(writerFactory, fileFactory, io, TARGET_FILE_SIZE); + eqWriter = new FanoutEqualityDeleteWriter<>(writerFactory, fileFactory, io, TARGET_FILE_SIZE); + posWriter = new FanoutPositionDeleteWriter<>(writerFactory, fileFactory, io, TARGET_FILE_SIZE); + } else { + dataWriter = new ClusteredDataWriter<>(writerFactory, fileFactory, io, TARGET_FILE_SIZE); + eqWriter = new ClusteredEqualityDeleteWriter<>(writerFactory, fileFactory, io, TARGET_FILE_SIZE); + posWriter = new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, TARGET_FILE_SIZE); + } + + return new BaseEqualityDeltaWriter<>( + dataWriter, + eqWriter, + posWriter, + schema, + TypeUtil.select(schema, ImmutableSet.copyOf(equalityFieldIds)), + this::asStructLike, + data -> this.asStructLikeKey(equalityFieldIds, data)); + } +} diff --git a/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java b/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java index 661ab642f516..a1fe62f063c8 100644 --- a/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java +++ b/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java @@ -22,16 +22,25 @@ import java.io.IOException; import java.util.List; import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.StructLikeSet; public abstract class WriterTestBase extends TableTestBase { @@ -76,6 +85,43 @@ protected StructLikeSet actualRowSet(String... columns) throws IOException { return set; } + protected List readFile(FileFormat format, Schema schema, CharSequence path) throws IOException { + CloseableIterable iterable; + + InputFile inputFile = Files.localInput(path.toString()); + switch (format) { + case PARQUET: + iterable = Parquet.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build(); + break; + + case AVRO: + iterable = Avro.read(inputFile) + .project(schema) + .createReaderFunc(DataReader::create) + .build(); + break; + + case ORC: + iterable = ORC.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema)) + .build(); + break; + + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + + List records = Lists.newArrayList(); + try (CloseableIterable reader = iterable) { + reader.forEach(records::add); + } + return records; + } + protected DataFile writeData(FileWriterFactory writerFactory, OutputFileFactory fileFactory, List rows, PartitionSpec spec, StructLike partitionKey) throws IOException { diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java deleted file mode 100644 index 16262b22e99c..000000000000 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import java.util.List; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.flink.FlinkSchemaUtil; -import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.flink.data.RowDataProjection; -import org.apache.iceberg.io.BaseTaskWriter; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; - -abstract class BaseDeltaTaskWriter extends BaseTaskWriter { - - private final Schema schema; - private final Schema deleteSchema; - private final RowDataWrapper wrapper; - private final RowDataWrapper keyWrapper; - private final RowDataProjection keyProjection; - private final boolean upsert; - - BaseDeltaTaskWriter(PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - RowType flinkSchema, - List equalityFieldIds, - boolean upsert) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - this.schema = schema; - this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); - this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); - this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct()); - this.keyProjection = RowDataProjection.create(schema, deleteSchema); - this.upsert = upsert; - } - - abstract RowDataDeltaWriter route(RowData row); - - RowDataWrapper wrapper() { - return wrapper; - } - - @Override - public void write(RowData row) throws IOException { - RowDataDeltaWriter writer = route(row); - - switch (row.getRowKind()) { - case INSERT: - case UPDATE_AFTER: - if (upsert) { - writer.deleteKey(keyProjection.wrap(row)); - } - writer.write(row); - break; - - case UPDATE_BEFORE: - if (upsert) { - break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice - } - writer.delete(row); - break; - case DELETE: - writer.delete(row); - break; - - default: - throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind()); - } - } - - protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { - RowDataDeltaWriter(PartitionKey partition) { - super(partition, schema, deleteSchema); - } - - @Override - protected StructLike asStructLike(RowData data) { - return wrapper.wrap(data); - } - - @Override - protected StructLike asStructLikeKey(RowData data) { - return keyWrapper.wrap(data); - } - } -} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 72f0580cf856..714a0b1c541c 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -522,7 +522,7 @@ static IcebergStreamWriter createStreamWriter(Table table, Table serializableTable = SerializableTable.copyOf(table); TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory( serializableTable, flinkRowType, targetFileSize, - fileFormat, equalityFieldIds, upsert); + equalityFieldIds, upsert); return new IcebergStreamWriter<>(table.name(), taskWriterFactory); } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java deleted file mode 100644 index 1eee6298e933..000000000000 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.List; -import java.util.Map; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Tasks; - -class PartitionedDeltaWriter extends BaseDeltaTaskWriter { - - private final PartitionKey partitionKey; - - private final Map writers = Maps.newHashMap(); - - PartitionedDeltaWriter(PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - RowType flinkSchema, - List equalityFieldIds, - boolean upsert) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds, - upsert); - this.partitionKey = new PartitionKey(spec, schema); - } - - @Override - RowDataDeltaWriter route(RowData row) { - partitionKey.partition(wrapper().wrap(row)); - - RowDataDeltaWriter writer = writers.get(partitionKey); - if (writer == null) { - // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers. - PartitionKey copiedKey = partitionKey.copy(); - writer = new RowDataDeltaWriter(copiedKey); - writers.put(copiedKey, writer); - } - - return writer; - } - - @Override - public void close() { - try { - Tasks.foreach(writers.values()) - .throwFailureWhenFinished() - .noRetry() - .run(RowDataDeltaWriter::close, IOException.class); - - writers.clear(); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close equality delta writer", e); - } - } -} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 5144305029f4..6d4ca78cec26 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -19,67 +19,95 @@ package org.apache.iceberg.flink.sink; +import java.io.IOException; import java.util.List; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.flink.data.RowDataProjection; +import org.apache.iceberg.io.BaseEqualityDeltaWriter; +import org.apache.iceberg.io.ClusteredDataWriter; +import org.apache.iceberg.io.ClusteredEqualityDeleteWriter; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.DataWriteResult; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.EqualityDeltaWriter; +import org.apache.iceberg.io.FanoutDataWriter; +import org.apache.iceberg.io.FanoutEqualityDeleteWriter; +import org.apache.iceberg.io.FanoutPositionDeleteWriter; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.io.PartitionedFanoutWriter; +import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.Tasks; public class RowDataTaskWriterFactory implements TaskWriterFactory { private final Table table; - private final Schema schema; private final RowType flinkSchema; + private final Schema schema; private final PartitionSpec spec; private final FileIO io; + + private final Schema deleteSchema; + private final long targetFileSizeBytes; - private final FileFormat format; - private final List equalityFieldIds; private final boolean upsert; - private final FileAppenderFactory appenderFactory; + private final FlinkFileWriterFactory fileWriterFactory; private transient OutputFileFactory outputFileFactory; - public RowDataTaskWriterFactory(Table table, - RowType flinkSchema, - long targetFileSizeBytes, - FileFormat format, - List equalityFieldIds, - boolean upsert) { + public RowDataTaskWriterFactory( + Table table, + RowType flinkSchema, + long targetFileSizeBytes, + List equalityFieldIds, + boolean upsert) { this.table = table; - this.schema = table.schema(); this.flinkSchema = flinkSchema; + this.schema = table.schema(); this.spec = table.spec(); this.io = table.io(); + this.targetFileSizeBytes = targetFileSizeBytes; - this.format = format; - this.equalityFieldIds = equalityFieldIds; this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); + this.deleteSchema = null; + this.fileWriterFactory = FlinkFileWriterFactory.builderFor(table) + .dataSchema(schema) + .dataFlinkType(flinkSchema) + .build(); } else if (upsert) { // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted row // may differ from the deleted row other than the primary key fields, and the delete file must contain values // that are correct for the deleted row. Therefore, only write the equality delete fields. - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, - ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null); + this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); + this.fileWriterFactory = FlinkFileWriterFactory.builderFor(table) + .dataSchema(schema) + .dataFlinkType(flinkSchema) + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(deleteSchema) + .build(); } else { - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, - ArrayUtil.toIntArray(equalityFieldIds), schema, null); + this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); + this.fileWriterFactory = FlinkFileWriterFactory.builderFor(table) + .dataSchema(schema) + .dataFlinkType(flinkSchema) + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(schema) + .build(); } } @@ -90,46 +118,203 @@ public void initialize(int taskId, int attemptId) { @Override public TaskWriter create() { - Preconditions.checkNotNull(outputFileFactory, + Preconditions.checkNotNull( + outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize()."); - if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - // Initialize a task writer to write INSERT only. - if (spec.isUnpartitioned()) { - return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes); - } else { - return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory, - io, targetFileSizeBytes, schema, flinkSchema); + if (deleteSchema != null) { + return new DeltaWriter(spec.isPartitioned()); + } else { + return new InsertOnlyWriter(spec.isPartitioned()); + } + } + + private static > void cleanFiles(FileIO io, T[] files) { + Tasks.foreach(files) + .throwFailureWhenFinished() + .noRetry() + .run(file -> io.deleteFile(file.path().toString())); + } + + private abstract class FlinkBaseWriter implements TaskWriter { + private final WriteResult.Builder writeResultBuilder = WriteResult.builder(); + + @Override + public void write(RowData row) throws IOException { + switch (row.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + if (upsert) { + deleteKey(row); + } + insert(row); + break; + + case UPDATE_BEFORE: + if (upsert) { + // Skip to write delete in upsert mode because UPDATE_AFTER will insert after delete. + break; + } + delete(row); + break; + + case DELETE: + delete(row); + break; + + default: + throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind()); } + } + + protected abstract void delete(RowData row); + + protected abstract void deleteKey(RowData row); + + protected abstract void insert(RowData row); + + protected void aggregateResult(DataWriteResult result) { + writeResultBuilder.addDataFiles(result.dataFiles()); + } + + protected void aggregateResult(WriteResult result) { + writeResultBuilder.add(result); + } + + @Override + public void abort() throws IOException { + close(); + + WriteResult result = writeResultBuilder.build(); + cleanFiles(io, result.dataFiles()); + cleanFiles(io, result.deleteFiles()); + } + + @Override + public WriteResult complete() throws IOException { + close(); + + return writeResultBuilder.build(); + } + } + + private PartitioningWriter newDataWriter(boolean fanoutEnabled) { + if (fanoutEnabled) { + return new FanoutDataWriter<>(fileWriterFactory, outputFileFactory, io, targetFileSizeBytes); + } else { + return new ClusteredDataWriter<>(fileWriterFactory, outputFileFactory, io, targetFileSizeBytes); + } + } + + private PartitioningWriter newEqualityWriter(boolean fanoutEnabled) { + if (fanoutEnabled) { + return new FanoutEqualityDeleteWriter<>(fileWriterFactory, outputFileFactory, io, targetFileSizeBytes); } else { - // Initialize a task writer to write both INSERT and equality DELETE. - if (spec.isUnpartitioned()) { - return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert); - } else { - return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert); + return new ClusteredEqualityDeleteWriter<>(fileWriterFactory, outputFileFactory, io, targetFileSizeBytes); + } + } + + private PartitioningWriter, DeleteWriteResult> newPositionWriter(boolean fanoutEnabled) { + if (fanoutEnabled) { + return new FanoutPositionDeleteWriter<>(fileWriterFactory, outputFileFactory, io, targetFileSizeBytes); + } else { + return new ClusteredPositionDeleteWriter<>(fileWriterFactory, outputFileFactory, io, targetFileSizeBytes); + } + } + + private class InsertOnlyWriter extends FlinkBaseWriter { + + private final PartitionKey partitionKey; + private final RowDataWrapper rowDataWrapper; + + private final PartitioningWriter delegate; + private boolean closed = false; + + private InsertOnlyWriter(boolean fanoutEnabled) { + this.partitionKey = new PartitionKey(spec, schema); + this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + + this.delegate = newDataWriter(fanoutEnabled); + } + + @Override + protected void delete(RowData row) { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement delete."); + } + + @Override + protected void deleteKey(RowData key) { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement deleteKey."); + } + + @Override + protected void insert(RowData row) { + partitionKey.partition(rowDataWrapper.wrap(row)); + delegate.write(row, spec, partitionKey); + } + + @Override + public void close() throws IOException { + if (!closed) { + delegate.close(); + aggregateResult(delegate.result()); + + closed = true; } } } - private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter { + private class DeltaWriter extends FlinkBaseWriter { private final PartitionKey partitionKey; private final RowDataWrapper rowDataWrapper; + private final RowDataWrapper keyWrapper; + private final RowDataProjection keyProjection; - RowDataPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema, - RowType flinkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + private final EqualityDeltaWriter delegate; + private boolean closed = false; + + private DeltaWriter(boolean fanoutEnabled) { this.partitionKey = new PartitionKey(spec, schema); this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct()); + this.keyProjection = RowDataProjection.create(schema, deleteSchema); + + this.delegate = new BaseEqualityDeltaWriter<>( + newDataWriter(fanoutEnabled), + newEqualityWriter(fanoutEnabled), + newPositionWriter(fanoutEnabled), + schema, deleteSchema, + rowDataWrapper::wrap, + keyWrapper::wrap); } @Override - protected PartitionKey partition(RowData row) { + protected void delete(RowData row) { partitionKey.partition(rowDataWrapper.wrap(row)); - return partitionKey; + delegate.delete(row, spec, partitionKey); + } + + @Override + protected void deleteKey(RowData row) { + partitionKey.partition(rowDataWrapper.wrap(row)); + delegate.deleteKey(keyProjection.wrap(row), spec, partitionKey); + } + + @Override + protected void insert(RowData row) { + partitionKey.partition(rowDataWrapper.wrap(row)); + delegate.insert(row, spec, partitionKey); + } + + @Override + public void close() throws IOException { + if (!closed) { + delegate.close(); + aggregateResult(delegate.result()); + + this.closed = true; + } } } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java deleted file mode 100644 index 331ed7c78192..000000000000 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import java.util.List; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; - -class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { - private final RowDataDeltaWriter writer; - - UnpartitionedDeltaWriter(PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - RowType flinkSchema, - List equalityFieldIds, - boolean upsert) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds, - upsert); - this.writer = new RowDataDeltaWriter(null); - } - - @Override - RowDataDeltaWriter route(RowData row) { - return writer; - } - - @Override - public void close() throws IOException { - writer.close(); - } -} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index 5e8837c5d47b..a5a0a83faaa8 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -21,7 +21,6 @@ import java.util.Collection; import java.util.List; -import java.util.Locale; import java.util.stream.Collectors; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; @@ -30,11 +29,9 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; @@ -68,15 +65,11 @@ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, Encryption this.nameMapping = PropertyUtil.propertyAsString(table.properties(), DEFAULT_NAME_MAPPING, null); this.tableName = table.name(); - String formatString = PropertyUtil.propertyAsString(table.properties(), TableProperties.DEFAULT_FILE_FORMAT, - TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); - FileFormat format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); RowType flinkSchema = FlinkSchemaUtil.convert(table.schema()); this.taskWriterFactory = new RowDataTaskWriterFactory( SerializableTable.copyOf(table), flinkSchema, Long.MAX_VALUE, - format, null, false); } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java index e30412ad83cc..4d52ce8246bc 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java @@ -94,7 +94,8 @@ private void initTable(boolean partitioned) { table.updateProperties() .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 * 1024)) - .defaultFormat(format) + .set(TableProperties.DEFAULT_FILE_FORMAT, format.toString().toLowerCase()) + .set(TableProperties.DELETE_DEFAULT_FILE_FORMAT, format.toString().toLowerCase()) .commit(); } @@ -340,6 +341,6 @@ private StructLikeSet actualRowSet(String... columns) throws IOException { private TaskWriterFactory createTaskWriterFactory(List equalityFieldIds) { return new RowDataTaskWriterFactory( SerializableTable.copyOf(table), FlinkSchemaUtil.convert(table.schema()), - 128 * 1024 * 1024, format, equalityFieldIds, false); + 128 * 1024 * 1024, equalityFieldIds, false); } } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkEqualityDeleteWriters.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkEqualityDeleteWriters.java new file mode 100644 index 000000000000..e224326e74e8 --- /dev/null +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkEqualityDeleteWriters.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.List; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.io.TestEqualityDeltaWriters; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeSet; + +public class TestFlinkEqualityDeleteWriters extends TestEqualityDeltaWriters { + + public TestFlinkEqualityDeleteWriters(FileFormat fileFormat) { + super(fileFormat); + } + + private boolean sameElements(List left, List right) { + if (left == right) { + return true; + } + if (left == null || right == null) { + return false; + } + return ImmutableSet.copyOf(left).equals(ImmutableSet.copyOf(right)); + } + + @Override + protected RowData toKey(List keyFieldIds, Integer id, String data) { + if (sameElements(fullKey(), keyFieldIds)) { + return GenericRowData.of(id, StringData.fromString(data)); + } else if (sameElements(idKey(), keyFieldIds)) { + return GenericRowData.of(id); + } else if (sameElements(dataKey(), keyFieldIds)) { + return GenericRowData.of(StringData.fromString(data)); + } else { + throw new UnsupportedOperationException("Unknown equality field ids: " + keyFieldIds); + } + } + + @Override + protected StructLikeSet toSet(Iterable rows) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + for (RowData row : rows) { + set.add(asStructLike(row)); + } + return set; + } + + @Override + public StructLike asStructLike(RowData data) { + RowType flinkType = FlinkSchemaUtil.convert(table.schema()); + RowDataWrapper wrapper = new RowDataWrapper(flinkType, table.schema().asStruct()); + + return wrapper.wrap(data); + } + + @Override + public StructLike asStructLikeKey(List keyFieldIds, RowData key) { + Schema deleteSchema = TypeUtil.select(table.schema(), Sets.newHashSet(keyFieldIds)); + RowType keyType = FlinkSchemaUtil.convert(deleteSchema); + RowDataWrapper wrapper = new RowDataWrapper(keyType, deleteSchema.asStruct()); + return wrapper.wrap(key); + } + + @Override + protected FileWriterFactory newWriterFactory( + Schema dataSchema, + List equalityFieldIds, + Schema equalityDeleteRowSchema, + Schema positionDeleteRowSchema) { + return FlinkFileWriterFactory.builderFor(table) + .dataSchema(dataSchema) + .dataFileFormat(format()) + .deleteFileFormat(format()) + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(equalityDeleteRowSchema) + .positionDeleteRowSchema(positionDeleteRowSchema) + .build(); + } + + @Override + protected RowData toRow(Integer id, String data) { + return SimpleDataUtil.createRowData(id, data); + } +} diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java index 2595b098dfea..7c740173020f 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java @@ -88,6 +88,7 @@ public void before() throws IOException { // Construct the iceberg table with the specified file format. Map props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()); table = SimpleDataUtil.createTable(path, props, partitioned); + table.updateProperties().defaultFormat(format).commit(); } @Test @@ -235,7 +236,7 @@ public void testRandomData() throws IOException { private TaskWriter createTaskWriter(long targetFileSize) { TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory( SerializableTable.copyOf(table), (RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(), - targetFileSize, format, null, false); + targetFileSize, null, false); taskWriterFactory.initialize(1, 1); return taskWriterFactory.create(); } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java index baccadc31f08..49782c69048c 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java @@ -74,6 +74,10 @@ private void testSkipToRemoveMetaColumn(int formatVersion) throws IOException { Table table = SimpleDataUtil.createTable(location, ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)), false); + table.updateProperties() + .set(TableProperties.DEFAULT_FILE_FORMAT, format.toString().toLowerCase()) + .set(TableProperties.DELETE_DEFAULT_FILE_FORMAT, format.toString().toLowerCase()) + .commit(); List rows = Lists.newArrayList( SimpleDataUtil.createInsert(1, "AAA"), @@ -170,7 +174,6 @@ private TaskWriter createTaskWriter(Table table, List equality SerializableTable.copyOf(table), SimpleDataUtil.ROW_TYPE, TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, - format, equalityFieldIds, upsert);