diff --git a/build.gradle b/build.gradle index 77b15e8eb08c..544d65987462 100644 --- a/build.gradle +++ b/build.gradle @@ -216,6 +216,7 @@ project(':iceberg-core') { implementation "com.fasterxml.jackson.core:jackson-core" implementation "com.github.ben-manes.caffeine:caffeine" implementation "org.roaringbitmap:RoaringBitmap" + implementation "org.rocksdb:rocksdbjni" compileOnly("org.apache.hadoop:hadoop-client") { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.slf4j', module: 'slf4j-log4j12' diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java index 4e4d375ddd30..8f879cbf4363 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java @@ -238,6 +238,8 @@ public void write(Object s, Encoder encoder) throws IOException { encoder.writeString((Utf8) s); } else if (s instanceof String) { encoder.writeString(new Utf8((String) s)); + } else if (s instanceof CharSequence) { + encoder.writeString((CharSequence) s); } else if (s == null) { throw new IllegalArgumentException("Cannot write null to required string column"); } else { diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java index c914ad224f30..959e5054ca43 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java @@ -43,6 +43,7 @@ public class EqualityDeleteWriter implements FileWriter private final int[] equalityFieldIds; private final SortOrder sortOrder; private DeleteFile deleteFile = null; + private long rowOffset = 0; public EqualityDeleteWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata, @@ -60,6 +61,17 @@ public EqualityDeleteWriter(FileAppender appender, FileFormat format, String @Override public void write(T row) { appender.add(row); + rowOffset += 1; + } + + @Override + public CharSequence location() { + return location; + } + + @Override + public long rowOffset() { + return rowOffset; } /** @@ -69,7 +81,7 @@ public void write(T row) { */ @Deprecated public void deleteAll(Iterable rows) { - appender.addAll(rows); + rows.forEach(this::write); } /** @@ -79,7 +91,7 @@ public void deleteAll(Iterable rows) { */ @Deprecated public void delete(T row) { - appender.add(row); + write(row); } @Override diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index a7dff07e7105..85991c36057d 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -43,6 +43,7 @@ public class PositionDeleteWriter implements FileWriter, De private final PositionDelete delete; private final CharSequenceSet referencedDataFiles; private DeleteFile deleteFile = null; + private long rowOffset = 0; public PositionDeleteWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { @@ -60,6 +61,17 @@ public PositionDeleteWriter(FileAppender appender, FileFormat format public void write(PositionDelete positionDelete) { referencedDataFiles.add(positionDelete.path()); appender.add(positionDelete); + rowOffset += 1; + } + + @Override + public CharSequence location() { + return location; + } + + @Override + public long rowOffset() { + return rowOffset; } /** @@ -69,7 +81,7 @@ public void write(PositionDelete positionDelete) { */ @Deprecated public void delete(CharSequence path, long pos) { - delete(path, pos, null); + write(delete.set(path, pos, null)); } /** @@ -79,8 +91,7 @@ public void delete(CharSequence path, long pos) { */ @Deprecated public void delete(CharSequence path, long pos, T row) { - referencedDataFiles.add(path); - appender.add(delete.set(path, pos, row)); + write(delete.set(path, pos, row)); } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/BaseEqualityDeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseEqualityDeltaWriter.java new file mode 100644 index 000000000000..6301d650bfb3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/BaseEqualityDeltaWriter.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.relocated.com.google.common.base.Function; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.StructLikeMapUtil; +import org.apache.iceberg.util.StructProjection; + +public class BaseEqualityDeltaWriter implements EqualityDeltaWriter { + + private final ThreadLocal> posDelete = ThreadLocal.withInitial(PositionDelete::create); + + private final PartitioningWriter dataWriter; + private final PartitioningWriter equalityWriter; + private final PartitioningWriter, DeleteWriteResult> positionWriter; + + private final Map insertedRowMap; + + private final Function asStructLike; + private final Function keyRefFunc; + private final Function keyCopyFunc; + + private boolean closed = false; + + public BaseEqualityDeltaWriter( + PartitioningWriter dataWriter, + PartitioningWriter equalityWriter, + PartitioningWriter, DeleteWriteResult> positionWriter, + Schema schema, + Schema deleteSchema, + Map tableProperties, + Function asStructLike) { + this.dataWriter = dataWriter; + this.equalityWriter = equalityWriter; + this.positionWriter = positionWriter; + + this.insertedRowMap = StructLikeMapUtil.load( + deleteSchema.asStruct(), + PathOffset.schema().asStruct(), + tableProperties); + this.asStructLike = asStructLike; + + StructProjection projection = StructProjection.create(schema, deleteSchema); + this.keyRefFunc = row -> projection.wrap(asStructLike.apply(row)); + this.keyCopyFunc = row -> StructCopy.copy(keyRefFunc.apply(row)); + } + + private PositionDelete wrap(StructLike structLike) { + CharSequence path = structLike.get(0, CharSequence.class); + long offset = structLike.get(1, Long.class); + return posDelete.get().set(path, offset, null); + } + + @Override + public void insert(T row, PartitionSpec spec, StructLike partition) { + PathOffset pathOffset = dataWriter.write(row, spec, partition); + + // Create a copied key from this row. + StructLike copiedKey = keyCopyFunc.apply(row); + + // Adding a pos-delete to replace the old path-offset. + StructLike previous = insertedRowMap.put(copiedKey, pathOffset); + if (previous != null) { + positionWriter.write(wrap(previous), spec, partition); + } + } + + /** + * Retire the old key & position from insertedRowMap cache to position delete file. + */ + private boolean retireOldKey(StructLike key, PartitionSpec spec, StructLike partition) { + StructLike previous = insertedRowMap.remove(key); + if (previous != null) { + positionWriter.write(wrap(previous), spec, partition); + return true; + } else { + return false; + } + } + + @Override + public void delete(T row, PartitionSpec spec, StructLike partition) { + if (!retireOldKey(keyRefFunc.apply(row), spec, partition)) { + equalityWriter.write(row, spec, partition); + } + } + + @Override + public void deleteKey(T key, PartitionSpec spec, StructLike partition) { + if (!retireOldKey(asStructLike.apply(key), spec, partition)) { + equalityWriter.write(key, spec, partition); + } + } + + @Override + public WriteResult result() { + Preconditions.checkState(closed, "Cannot get result from unclosed writer."); + + DataWriteResult dataWriteResult = dataWriter.result(); + DeleteWriteResult positionWriteResult = positionWriter.result(); + DeleteWriteResult equalityWriteResult = equalityWriter.result(); + + return WriteResult.builder() + .addDataFiles(dataWriteResult.dataFiles()) + .addDeleteFiles(positionWriteResult.deleteFiles()) + .addDeleteFiles(equalityWriteResult.deleteFiles()) + .addReferencedDataFiles(positionWriteResult.referencedDataFiles()) + .build(); + } + + @Override + public void close() throws IOException { + if (!closed) { + dataWriter.close(); + positionWriter.close(); + equalityWriter.close(); + + this.closed = true; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index 46f997e4e7e1..2b58e0ea4ada 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -32,7 +32,6 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; -import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -125,7 +124,7 @@ public void write(T row) throws IOException { PathOffset previous = insertedRowMap.put(copiedKey, pathOffset); if (previous != null) { // TODO attach the previous row if has a positional-delete row schema in appender factory. - posDeleteWriter.delete(previous.path, previous.rowOffset, null); + posDeleteWriter.delete(previous.path(), previous.rowOffset(), null); } dataWriter.write(row); @@ -141,7 +140,7 @@ private boolean internalPosDelete(StructLike key) { if (previous != null) { // TODO attach the previous row if has a positional-delete row schema in appender factory. - posDeleteWriter.delete(previous.path, previous.rowOffset, null); + posDeleteWriter.delete(previous.path(), previous.rowOffset(), null); return true; } @@ -200,28 +199,6 @@ public void close() throws IOException { } } - private static class PathOffset { - private final CharSequence path; - private final long rowOffset; - - private PathOffset(CharSequence path, long rowOffset) { - this.path = path; - this.rowOffset = rowOffset; - } - - private static PathOffset of(CharSequence path, long rowOffset) { - return new PathOffset(path, rowOffset); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("path", path) - .add("row_offset", rowOffset) - .toString(); - } - } - private abstract class BaseRollingWriter implements Closeable { private static final int ROWS_DIVISOR = 1000; private final StructLike partitionKey; diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java index 61a6f9f9164d..3b322cb809a6 100644 --- a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java @@ -64,7 +64,7 @@ abstract class ClusteredWriter implements PartitioningWriter { protected abstract R aggregatedResult(); @Override - public void write(T row, PartitionSpec spec, StructLike partition) { + public PathOffset write(T row, PartitionSpec spec, StructLike partition) { if (!spec.equals(currentSpec)) { if (currentSpec != null) { closeCurrentWriter(); @@ -85,7 +85,6 @@ public void write(T row, PartitionSpec spec, StructLike partition) { // copy the partition key as the key object may be reused this.currentPartition = StructCopy.copy(partition); this.currentWriter = newWriter(currentSpec, currentPartition); - } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) { closeCurrentWriter(); completedPartitions.add(currentPartition); @@ -100,7 +99,10 @@ public void write(T row, PartitionSpec spec, StructLike partition) { this.currentWriter = newWriter(currentSpec, currentPartition); } + PathOffset pathOffset = PathOffset.of(currentWriter.location(), currentWriter.rowOffset()); currentWriter.write(row); + + return pathOffset; } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java index 090ccebfa80f..8683d7d3285f 100644 --- a/core/src/main/java/org/apache/iceberg/io/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -39,6 +39,7 @@ public class DataWriter implements FileWriter { private final ByteBuffer keyMetadata; private final SortOrder sortOrder; private DataFile dataFile = null; + private long rowOffset = 0; public DataWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { @@ -59,6 +60,17 @@ public DataWriter(FileAppender appender, FileFormat format, String location, @Override public void write(T row) { appender.add(row); + rowOffset += 1; + } + + @Override + public CharSequence location() { + return location; + } + + @Override + public long rowOffset() { + return rowOffset; } /** diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java new file mode 100644 index 000000000000..48664e10583a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +public class FanoutEqualityDeleteWriter extends FanoutWriter { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + + private final List deleteFiles; + + public FanoutEqualityDeleteWriter( + FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.deleteFiles = Lists.newArrayList(); + } + + @Override + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { + // TODO: support ORC rolling writers. + if (fileFormat == FileFormat.ORC) { + EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); + return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition); + } else { + return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + } + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, CharSequenceSet.empty()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java new file mode 100644 index 000000000000..48e7139ed367 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +public class FanoutPositionDeleteWriter extends FanoutWriter, DeleteWriteResult> { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeBytes; + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public FanoutPositionDeleteWriter( + FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeBytes = targetFileSizeBytes; + this.deleteFiles = Lists.newArrayList(); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + @Override + protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + // TODO: support ORC rolling writers. + if (fileFormat == FileFormat.ORC) { + EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); + return writerFactory.newPositionDeleteWriter(outputFile, spec, partition); + } else { + return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeBytes, spec, partition); + } + } + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java index 631fc0a6d4ea..f12553e150d6 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java @@ -49,9 +49,13 @@ abstract class FanoutWriter implements PartitioningWriter { protected abstract R aggregatedResult(); @Override - public void write(T row, PartitionSpec spec, StructLike partition) { + public PathOffset write(T row, PartitionSpec spec, StructLike partition) { FileWriter writer = writer(spec, partition); + + PathOffset pathOffset = PathOffset.of(writer.location(), writer.rowOffset()); writer.write(row); + + return pathOffset; } private FileWriter writer(PartitionSpec spec, StructLike partition) { diff --git a/core/src/main/java/org/apache/iceberg/io/FileWriter.java b/core/src/main/java/org/apache/iceberg/io/FileWriter.java index 6f0c4ab2194a..39b3893aeb3e 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FileWriter.java @@ -54,6 +54,20 @@ default void write(Iterable rows) { */ void write(T row); + /** + * Returns the file path that are currently opened. + * + * @return the current file path. + */ + CharSequence location(); + + /** + * Returns the row offset that are currently writing, starting from 0. + * + * @return the current row offset. + */ + long rowOffset(); + /** * Returns the number of bytes that were currently written by this writer. * diff --git a/core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java b/core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java index 9b57676f099d..130b5d2d056f 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileWriterFactory.java @@ -19,6 +19,7 @@ package org.apache.iceberg.io; +import java.io.Serializable; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.EqualityDeleteWriter; @@ -28,7 +29,7 @@ /** * A factory for creating data and delete writers. */ -public interface FileWriterFactory { +public interface FileWriterFactory extends Serializable { /** * Creates a new {@link DataWriter}. diff --git a/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java index a49fe199cfc9..bc9ceb40cb3a 100644 --- a/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java @@ -29,8 +29,9 @@ public abstract class PartitionedFanoutWriter extends BaseTaskWriter { private final Map writers = Maps.newHashMap(); - protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { + protected PartitionedFanoutWriter( + PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); } diff --git a/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java index a551b8e2686e..d9f9df31dcaf 100644 --- a/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java @@ -38,7 +38,7 @@ public abstract class PartitionedWriter extends BaseTaskWriter { private RollingFileWriter currentWriter = null; protected PartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { + OutputFileFactory fileFactory, FileIO io, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); } diff --git a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java index 4afdd2162f8b..c1be23879ead 100644 --- a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java @@ -46,7 +46,7 @@ public interface PartitioningWriter extends Closeable { * @param spec a partition spec * @param partition a partition or null if the spec is unpartitioned */ - void write(T row, PartitionSpec spec, StructLike partition); + PathOffset write(T row, PartitionSpec spec, StructLike partition); /** * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. diff --git a/core/src/main/java/org/apache/iceberg/io/PathOffset.java b/core/src/main/java/org/apache/iceberg/io/PathOffset.java new file mode 100644 index 000000000000..f093c0dbe783 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/PathOffset.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.types.Types; + +public class PathOffset implements StructLike { + + private static final Schema PATH_OFFSET_SCHEMA = new Schema( + Types.NestedField.required(0, "path", Types.StringType.get()), + Types.NestedField.required(1, "row_offset", Types.LongType.get()) + ); + + private CharSequence path; + private long rowOffset; + + private PathOffset(CharSequence path, long rowOffset) { + this.path = path; + this.rowOffset = rowOffset; + } + + public static PathOffset of(CharSequence path, long rowOffset) { + return new PathOffset(path, rowOffset); + } + + public static Schema schema() { + return PATH_OFFSET_SCHEMA; + } + + public CharSequence path() { + return path; + } + + public long rowOffset() { + return rowOffset; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("path", path) + .add("row_offset", rowOffset) + .toString(); + } + + @Override + public int size() { + return 2; + } + + @Override + public T get(int pos, Class javaClass) { + switch (pos) { + case 0: + return javaClass.cast(path); + case 1: + return javaClass.cast(rowOffset); + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } + + @Override + public void set(int pos, T value) { + switch (pos) { + case 0: + this.path = (CharSequence) value; + break; + case 1: + this.rowOffset = (long) value; + break; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java index 80a589b18095..e5dd597a1ffd 100644 --- a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java @@ -68,14 +68,6 @@ protected StructLike partition() { return partition; } - public CharSequence currentFilePath() { - return currentFile.encryptingOutputFile().location(); - } - - public long currentFileRows() { - return currentFileRows; - } - @Override public long length() { throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length"); @@ -92,6 +84,16 @@ public void write(T row) { } } + @Override + public CharSequence location() { + return currentFile.encryptingOutputFile().location(); + } + + @Override + public long rowOffset() { + return currentFileRows; + } + private boolean shouldRollToNewFile() { return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >= targetFileSizeInBytes; } diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index 36a0313a4e41..7ab80c4ceb16 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -83,6 +83,16 @@ public void write(PositionDelete payload) { delete(payload.path(), payload.pos(), payload.row()); } + @Override + public CharSequence location() { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement location."); + } + + @Override + public long rowOffset() { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement rowOffset."); + } + public void delete(CharSequence path, long pos) { delete(path, pos, null); } diff --git a/core/src/main/java/org/apache/iceberg/types/Serializer.java b/core/src/main/java/org/apache/iceberg/types/Serializer.java new file mode 100644 index 000000000000..0b74dd247439 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/types/Serializer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.types; + +public interface Serializer { + byte[] serialize(T object); + + T deserialize(byte[] data); +} diff --git a/core/src/main/java/org/apache/iceberg/types/Serializers.java b/core/src/main/java/org/apache/iceberg/types/Serializers.java new file mode 100644 index 000000000000..f99b83f47ce5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/types/Serializers.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.types; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.function.IntFunction; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.ByteBuffers; + +public class Serializers { + private static final int NULL_MARK = -1; + + private Serializers() { + } + + public static Serializer forType(Types.StructType struct) { + return new StructLikeSerializer(struct); + } + + public static Serializer> forType(Types.ListType list) { + return new ListSerializer<>(list); + } + + public static Serializer forType(Type.PrimitiveType type) { + return new PrimitiveSerializer<>(type); + } + + @SuppressWarnings("unchecked") + private static Serializer internal(Type type) { + if (type.isPrimitiveType()) { + return forType(type.asPrimitiveType()); + } else if (type.isStructType()) { + return (Serializer) forType(type.asStructType()); + } else if (type.isListType()) { + return (Serializer) forType(type.asListType()); + } + throw new UnsupportedOperationException("Cannot determine serializer for type: " + type); + } + + @SuppressWarnings("unchecked") + private static Class internalClass(Type type) { + if (type.isPrimitiveType()) { + return (Class) type.typeId().javaClass(); + } else if (type.isStructType()) { + return (Class) StructLike.class; + } else if (type.isListType()) { + return (Class) List.class; + } else if (type.isMapType()) { + return (Class) Map.class; + } + + throw new UnsupportedOperationException("Cannot determine expected class for type: " + type); + } + + private static class StructLikeSerializer implements Serializer { + private final Types.StructType struct; + private final Serializer[] serializers; + private final Class[] classes; + + private StructLikeSerializer(Types.StructType struct) { + this.struct = struct; + this.serializers = struct.fields().stream() + .map(field -> internal(field.type())) + .toArray((IntFunction[]>) Serializer[]::new); + this.classes = struct.fields().stream() + .map(field -> internalClass(field.type())) + .toArray(Class[]::new); + } + + @Override + public byte[] serialize(StructLike object) { + if (object == null) { + return null; + } + + try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out)) { + + for (int i = 0; i < serializers.length; i += 1) { + Class valueClass = classes[i]; + + byte[] fieldData = serializers[i].serialize(object.get(i, valueClass)); + if (fieldData == null) { + dos.writeInt(NULL_MARK); + } else { + dos.writeInt(fieldData.length); + dos.write(fieldData); + } + } + return out.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public StructLike deserialize(byte[] data) { + if (data == null) { + return null; + } + + try (ByteArrayInputStream in = new ByteArrayInputStream(data); + DataInputStream dis = new DataInputStream(in)) { + + GenericRecord record = GenericRecord.create(struct); + for (int i = 0; i < serializers.length; i += 1) { + int length = dis.readInt(); + + if (length == NULL_MARK) { + record.set(i, null); + } else { + byte[] fieldData = new byte[length]; + int fieldDataSize = dis.read(fieldData); + Preconditions.checkState(length == fieldDataSize, "%s != %s", length, fieldDataSize); + record.set(i, serializers[i].deserialize(fieldData)); + } + } + + return record; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + private static class ListSerializer implements Serializer> { + private final Serializer elementSerializer; + + private ListSerializer(Types.ListType list) { + this.elementSerializer = internal(list.elementType()); + } + + @Override + public byte[] serialize(List object) { + if (object == null) { + return null; + } + + try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(out)) { + + dos.writeInt(object.size()); + for (T elem : object) { + byte[] data = elementSerializer.serialize(elem); + + if (data == null) { + dos.writeInt(NULL_MARK); + } else { + dos.writeInt(data.length); + dos.write(data); + } + } + + return out.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public List deserialize(byte[] data) { + if (data == null) { + return null; + } + + try (ByteArrayInputStream in = new ByteArrayInputStream(data); + DataInputStream dis = new DataInputStream(in)) { + + int size = dis.readInt(); + List result = Lists.newArrayListWithExpectedSize(size); + for (int i = 0; i < size; i++) { + int length = dis.readInt(); + + if (length == NULL_MARK) { + result.add(null); + } else { + byte[] fieldData = new byte[length]; + int fieldDataSize = dis.read(fieldData); + Preconditions.checkState(length == fieldDataSize, "%s != %s", length, fieldDataSize); + result.add(elementSerializer.deserialize(fieldData)); + } + } + + return result; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private static class PrimitiveSerializer implements Serializer { + + private final Type.PrimitiveType type; + + private PrimitiveSerializer(Type.PrimitiveType type) { + this.type = type; + } + + @Override + public byte[] serialize(Object object) { + return ByteBuffers.toByteArray(Conversions.toByteBuffer(type, object)); + } + + @Override + public T deserialize(byte[] data) { + return Conversions.fromByteBuffer(type, data == null ? null : ByteBuffer.wrap(data)); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/RocksDBStructLikeMap.java b/core/src/main/java/org/apache/iceberg/util/RocksDBStructLikeMap.java new file mode 100644 index 000000000000..9d565cfba892 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/RocksDBStructLikeMap.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.commons.io.FileUtils; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Serializer; +import org.apache.iceberg.types.Serializers; +import org.apache.iceberg.types.Types; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +public class RocksDBStructLikeMap extends AbstractMap implements Map { + + static { + RocksDB.loadLibrary(); + } + + public static RocksDBStructLikeMap create(String path, + Types.StructType keyType, + Types.StructType valType) { + // Create the RocksDB directory if not exists. + Path rocksDBDir = Paths.get(path); + if (!Files.exists(rocksDBDir)) { + try { + Files.createDirectory(rocksDBDir); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (!Files.isDirectory(rocksDBDir)) { + throw new ValidationException("The existing path %s is not a directory", path); + } + + return new RocksDBStructLikeMap(path, keyType, valType); + } + + private final String path; + private final WriteOptions writeOptions; + private final RocksDB db; + private final Types.StructType keyType; + private final Types.StructType valType; + + private final Serializer keySerializer; + private final Serializer valSerializer; + + // It's expensive to get the RocksDB's data size, so we maintain the size when put/delete rows. + private int size = 0; + + private RocksDBStructLikeMap(String path, Types.StructType keyType, Types.StructType valType) { + this.path = path; + this.writeOptions = new WriteOptions().setDisableWAL(true); + try { + Options options = new Options().setCreateIfMissing(true); + this.db = RocksDB.open(options, path); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + this.keyType = keyType; + this.valType = valType; + this.keySerializer = Serializers.forType(keyType); + this.valSerializer = Serializers.forType(valType); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size <= 0; + } + + @Override + public boolean containsKey(Object key) { + if (key instanceof StructLike) { + byte[] keyData = keySerializer.serialize((StructLike) key); + try { + return db.get(keyData) != null; + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + return false; + } + + @Override + public boolean containsValue(Object value) { + if (value instanceof StructLike) { + byte[] valData = valSerializer.serialize((StructLike) value); + try (RocksIterator iter = db.newIterator()) { + for (iter.seekToFirst(); iter.isValid(); iter.next()) { + if (Arrays.equals(valData, iter.value())) { + return true; + } + } + } + } + return false; + } + + @Override + public StructLike get(Object key) { + if (key instanceof StructLike) { + byte[] keyData = keySerializer.serialize((StructLike) key); + try { + byte[] valData = db.get(keyData); + if (valData == null) { + return null; + } + + return valSerializer.deserialize(valData); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + return null; + } + + @Override + public StructLike put(StructLike key, StructLike value) { + byte[] keyData = keySerializer.serialize(key); + byte[] newValue = valSerializer.serialize(value); + try { + byte[] oldValue = db.get(keyData); + db.put(writeOptions, keyData, newValue); + + if (oldValue == null) { + // Add a new row into the map. + size += 1; + return null; + } else { + // Replace the old row with the new row. + return valSerializer.deserialize(oldValue); + } + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + + @Override + public StructLike remove(Object key) { + if (key instanceof StructLike) { + byte[] keyData = keySerializer.serialize((StructLike) key); + try { + byte[] valData = db.get(keyData); + if (valData != null) { + db.delete(writeOptions, keyData); + + size -= 1; + return valSerializer.deserialize(valData); + } + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + return null; + } + + @Override + public void clear() { + size = 0; + db.close(); + try { + FileUtils.cleanDirectory(new File(path)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public Set keySet() { + StructLikeSet keySet = StructLikeSet.create(keyType); + + try (RocksIterator iter = db.newIterator()) { + for (iter.seekToFirst(); iter.isValid(); iter.next()) { + keySet.add(keySerializer.deserialize(iter.key())); + } + } + + return keySet; + } + + @Override + public Collection values() { + Set valueSet = Sets.newHashSet(); + + try (RocksIterator iter = db.newIterator()) { + for (iter.seekToFirst(); iter.isValid(); iter.next()) { + valueSet.add(valSerializer.deserialize(iter.value())); + } + } + + return valueSet; + } + + @Override + public Set> entrySet() { + Set> entrySet = Sets.newHashSet(); + try (RocksIterator iter = db.newIterator()) { + for (iter.seekToFirst(); iter.isValid(); iter.next()) { + StructLikeEntry entry = new StructLikeEntry( + keySerializer.deserialize(iter.key()), + valSerializer.deserialize(iter.value())); + entrySet.add(entry); + } + return entrySet; + } + } + + private class StructLikeEntry implements Entry { + + private final StructLike key; + private final StructLike value; + + private StructLikeEntry(StructLike key, StructLike value) { + this.key = key; + this.value = value; + } + + @Override + public StructLike getKey() { + return key; + } + + @Override + public StructLike getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof StructLikeEntry)) { + return false; + } else { + StructLikeEntry that = (StructLikeEntry) o; + return Objects.equals( + StructLikeWrapper.forType(keyType).set(key), + StructLikeWrapper.forType(keyType).set(that.key)) && + Objects.equals( + StructLikeWrapper.forType(valType).set(value), + StructLikeWrapper.forType(valType).set(that.value) + ); + } + } + + @Override + public int hashCode() { + int hashCode = 0; + if (key != null) { + hashCode ^= StructLikeWrapper.forType(keyType).set(key).hashCode(); + } + if (value != null) { + hashCode ^= StructLikeWrapper.forType(valType).set(value).hashCode(); + } + return hashCode; + } + + @Override + public StructLike setValue(StructLike newValue) { + throw new UnsupportedOperationException("Does not support setValue."); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java index dc6e6ab9acb0..acc443ff360d 100644 --- a/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java +++ b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java @@ -161,7 +161,7 @@ public boolean equals(Object o) { } else if (!(o instanceof StructLikeEntry)) { return false; } else { - StructLikeEntry that = (StructLikeEntry) o; + StructLikeEntry that = (StructLikeEntry) o; return Objects.equals(getKey(), that.getKey()) && Objects.equals(getValue(), that.getValue()); } diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeMapUtil.java b/core/src/main/java/org/apache/iceberg/util/StructLikeMapUtil.java new file mode 100644 index 000000000000..215be843f752 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/StructLikeMapUtil.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StructLikeMapUtil { + + private static final Logger LOG = LoggerFactory.getLogger(StructLikeMap.class); + private static final String ROCKSDB_DIR_PREFIX = "iceberg-rocksdb-"; + + public static final String IMPL = "spill-disk-impl"; + public static final String IN_MEMORY_MAP = "in-memory"; + public static final String ROCKSDB_MAP = "rocksdb"; + public static final String ROCKSDB_DIR = "rocksdb-dir"; + + private StructLikeMapUtil() { + } + + public static Map load( + Types.StructType keyType, + Types.StructType valType, + Map properties) { + String impl = properties.getOrDefault(IMPL, IN_MEMORY_MAP); + LOG.info("Loading StructLikeMap implementation: {}", impl); + + switch (impl) { + case IN_MEMORY_MAP: + return StructLikeMap.create(keyType); + case ROCKSDB_MAP: + return RocksDBStructLikeMap.create(rocksDBDir(properties), keyType, valType); + default: + throw new UnsupportedOperationException("Unknown StructLikeMap implementation: " + impl); + } + } + + private static String rocksDBDir(Map properties) { + String dir = properties.getOrDefault(ROCKSDB_DIR, System.getProperty("java.io.tmpdir")); + try { + return Files.createTempDirectory(Paths.get(dir), ROCKSDB_DIR_PREFIX).toString(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create the temporary directory", e); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestStructLikeMap.java b/core/src/test/java/org/apache/iceberg/util/TestStructLikeMap.java index 891a7782217a..fa921ef1020b 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestStructLikeMap.java +++ b/core/src/test/java/org/apache/iceberg/util/TestStructLikeMap.java @@ -27,142 +27,219 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class TestStructLikeMap { - private static final Types.StructType STRUCT_TYPE = Types.StructType.of( + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + private final String mapType; + private Map map; + + @Parameterized.Parameters(name = "StructLikeType = {0}") + public static Iterable parameters() { + return Lists.newArrayList( + new Object[] {"rocksdb"}, + new Object[] {"in-memory"} + ); + } + + public TestStructLikeMap(String mapType) { + this.mapType = mapType; + } + + @Before + public void before() { + Map props = Maps.newHashMap(); + props.put(StructLikeMapUtil.IMPL, mapType); + + if (StructLikeMapUtil.ROCKSDB_MAP.equals(mapType)) { + props.put(StructLikeMapUtil.ROCKSDB_DIR, temp.getRoot().getAbsolutePath()); + } + + this.map = StructLikeMapUtil.load(KEY_TYPE, VAL_TYPE, props); + } + + @After + public void after() { + map.clear(); + } + + private static final Types.StructType KEY_TYPE = Types.StructType.of( Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "data", Types.LongType.get()) + Types.NestedField.optional(2, "data", Types.StringType.get()) + ); + + private static final Types.StructType VAL_TYPE = Types.StructType.of( + Types.NestedField.optional(1, "value", Types.StringType.get()) ); + private static GenericRecord key(int id, CharSequence data) { + return (GenericRecord) GenericRecord.create(KEY_TYPE) + .copy("id", id, "data", data); + } + + private static GenericRecord val(CharSequence value) { + return (GenericRecord) GenericRecord.create(VAL_TYPE) + .copy("value", value); + } + + private static void assertKey(StructLike key1, StructLike key2) { + StructLikeWrapper expected = StructLikeWrapper.forType(KEY_TYPE).set(key1); + StructLikeWrapper actual = StructLikeWrapper.forType(VAL_TYPE).set(key2); + Assert.assertEquals(expected, actual); + } + + private static void assertValue(StructLike val1, StructLike val2) { + StructLikeWrapper expected = StructLikeWrapper.forType(VAL_TYPE).set(val1); + StructLikeWrapper actual = StructLikeWrapper.forType(VAL_TYPE).set(val2); + Assert.assertEquals(expected, actual); + } + + private static void assertKeySet(Collection expected, Collection actual) { + StructLikeSet expectedSet = StructLikeSet.create(KEY_TYPE); + StructLikeSet actualSet = StructLikeSet.create(KEY_TYPE); + expectedSet.addAll(expected); + actualSet.addAll(actual); + Assert.assertEquals(expectedSet, actualSet); + } + + private static void assertValueSet(Collection expected, Collection actual) { + StructLikeSet expectedSet = StructLikeSet.create(VAL_TYPE); + StructLikeSet actualSet = StructLikeSet.create(VAL_TYPE); + expectedSet.addAll(expected); + actualSet.addAll(actual); + Assert.assertEquals(expectedSet, actualSet); + } + @Test public void testSingleRecord() { - Record gRecord = GenericRecord.create(STRUCT_TYPE); - Record record1 = gRecord.copy(ImmutableMap.of("id", 1, "data", "aaa")); - - Map map = StructLikeMap.create(STRUCT_TYPE); Assert.assertEquals(0, map.size()); - map.put(record1, "1-aaa"); + map.put(key(1, "aaa"), val("1-aaa")); Assert.assertEquals(1, map.size()); Assert.assertFalse(map.isEmpty()); - Assert.assertTrue(map.containsKey(record1)); - Assert.assertTrue(map.containsValue("1-aaa")); - Assert.assertEquals("1-aaa", map.get(record1)); + Assert.assertTrue(map.containsKey(key(1, "aaa"))); + Assert.assertTrue(map.containsValue(val("1-aaa"))); + assertValue(val("1-aaa"), map.get(key(1, "aaa"))); Set keySet = map.keySet(); Assert.assertEquals(1, keySet.size()); - Assert.assertTrue(keySet.contains(record1)); + Assert.assertTrue(keySet.contains(key(1, "aaa"))); - Collection values = map.values(); + Collection values = map.values(); Assert.assertEquals(1, values.size()); - Assert.assertEquals("1-aaa", values.toArray(new String[0])[0]); + assertValue(val("1-aaa"), values.toArray(new StructLike[0])[0]); - Set> entrySet = map.entrySet(); + Set> entrySet = map.entrySet(); Assert.assertEquals(1, entrySet.size()); - for (Map.Entry entry : entrySet) { - Assert.assertEquals(record1, entry.getKey()); - Assert.assertEquals("1-aaa", entry.getValue()); + for (Map.Entry entry : entrySet) { + assertKey(key(1, "aaa"), entry.getKey()); + assertValue(val("1-aaa"), entry.getValue()); break; } } @Test public void testMultipleRecord() { - Record gRecord = GenericRecord.create(STRUCT_TYPE); - Record record1 = gRecord.copy(ImmutableMap.of("id", 1, "data", "aaa")); - Record record2 = gRecord.copy(ImmutableMap.of("id", 2, "data", "bbb")); - Record record3 = gRecord.copy(); - record3.setField("id", 3); - record3.setField("data", null); - - Map map = StructLikeMap.create(STRUCT_TYPE); + Record record1 = key(1, "aaa"); + Record record2 = key(2, "bbb"); + Record record3 = key(3, null); + Assert.assertEquals(0, map.size()); - map.putAll(ImmutableMap.of(record1, "1-aaa", record2, "2-bbb", record3, "3-null")); + map.putAll(ImmutableMap.of(record1, val("1-aaa"), record2, val("2-bbb"), record3, val("3-null"))); Assert.assertEquals(3, map.size()); Assert.assertTrue(map.containsKey(record1)); Assert.assertTrue(map.containsKey(record2)); Assert.assertTrue(map.containsKey(record3)); - Assert.assertTrue(map.containsValue("1-aaa")); - Assert.assertTrue(map.containsValue("2-bbb")); - Assert.assertTrue(map.containsValue("3-null")); - Assert.assertEquals("1-aaa", map.get(record1)); - Assert.assertEquals("2-bbb", map.get(record2)); - Assert.assertEquals("3-null", map.get(record3)); + Assert.assertTrue(map.containsValue(val("1-aaa"))); + Assert.assertTrue(map.containsValue(val("2-bbb"))); + Assert.assertTrue(map.containsValue(val("3-null"))); + assertValue(val("1-aaa"), map.get(record1)); + assertValue(val("2-bbb"), map.get(record2)); + assertValue(val("3-null"), map.get(record3)); Set keySet = map.keySet(); Assert.assertEquals(3, keySet.size()); - Assert.assertEquals(ImmutableSet.of(record1, record2, record3), keySet); + assertKeySet(ImmutableSet.of(record1, record2, record3), keySet); - Collection values = map.values(); + Collection values = map.values(); Assert.assertEquals(3, values.size()); - Assert.assertEquals(ImmutableSet.of("1-aaa", "2-bbb", "3-null"), Sets.newHashSet(values)); + assertValueSet(ImmutableSet.of(val("1-aaa"), val("2-bbb"), val("3-null")), Sets.newHashSet(values)); - Set> entrySet = map.entrySet(); + Set> entrySet = map.entrySet(); Assert.assertEquals(3, entrySet.size()); Set structLikeSet = Sets.newHashSet(); - Set valueSet = Sets.newHashSet(); - for (Map.Entry entry : entrySet) { + Set valueSet = Sets.newHashSet(); + for (Map.Entry entry : entrySet) { structLikeSet.add(entry.getKey()); valueSet.add(entry.getValue()); } - Assert.assertEquals(ImmutableSet.of(record1, record2, record3), structLikeSet); - Assert.assertEquals(ImmutableSet.of("1-aaa", "2-bbb", "3-null"), valueSet); + assertKeySet(ImmutableSet.of(record1, record2, record3), structLikeSet); + assertValueSet(ImmutableSet.of(val("1-aaa"), val("2-bbb"), val("3-null")), valueSet); } @Test public void testRemove() { - Record gRecord = GenericRecord.create(STRUCT_TYPE); - Record record = gRecord.copy(ImmutableMap.of("id", 1, "data", "aaa")); + StructLike record = key(1, "aaa"); - Map map = StructLikeMap.create(STRUCT_TYPE); - map.put(record, "1-aaa"); + map.put(record, val("1-aaa")); Assert.assertEquals(1, map.size()); - Assert.assertEquals("1-aaa", map.get(record)); - Assert.assertEquals("1-aaa", map.remove(record)); + assertValue(val("1-aaa"), map.get(record)); + assertValue(val("1-aaa"), map.remove(record)); Assert.assertEquals(0, map.size()); - map.put(record, "1-aaa"); - Assert.assertEquals("1-aaa", map.get(record)); + map.put(record, val("1-aaa")); + assertValue(val("1-aaa"), map.get(record)); } @Test public void testNullKeys() { - Map map = StructLikeMap.create(STRUCT_TYPE); - Assert.assertFalse(map.containsKey(null)); + Map newMap = StructLikeMap.create(KEY_TYPE); + Assert.assertFalse(newMap.containsKey(null)); - map.put(null, "aaa"); - Assert.assertTrue(map.containsKey(null)); - Assert.assertEquals("aaa", map.get(null)); + newMap.put(null, "aaa"); + Assert.assertTrue(newMap.containsKey(null)); + Assert.assertEquals("aaa", newMap.get(null)); - String replacedValue = map.put(null, "bbb"); + String replacedValue = newMap.put(null, "bbb"); Assert.assertEquals("aaa", replacedValue); - String removedValue = map.remove(null); + String removedValue = newMap.remove(null); Assert.assertEquals("bbb", removedValue); } @Test public void testKeysWithNulls() { - Record recordTemplate = GenericRecord.create(STRUCT_TYPE); + Record recordTemplate = GenericRecord.create(KEY_TYPE); Record record1 = recordTemplate.copy("id", 1, "data", null); Record record2 = recordTemplate.copy("id", 2, "data", null); - Map map = StructLikeMap.create(STRUCT_TYPE); - map.put(record1, "aaa"); - map.put(record2, "bbb"); + Map newMap = StructLikeMap.create(KEY_TYPE); + newMap.put(record1, "aaa"); + newMap.put(record2, "bbb"); - Assert.assertEquals("aaa", map.get(record1)); - Assert.assertEquals("bbb", map.get(record2)); + Assert.assertEquals("aaa", newMap.get(record1)); + Assert.assertEquals("bbb", newMap.get(record2)); Record record3 = record1.copy(); - Assert.assertTrue(map.containsKey(record3)); - Assert.assertEquals("aaa", map.get(record3)); + Assert.assertTrue(newMap.containsKey(record3)); + Assert.assertEquals("aaa", newMap.get(record3)); - Assert.assertEquals("aaa", map.remove(record3)); + Assert.assertEquals("aaa", newMap.remove(record3)); } } diff --git a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java index f373b3a9d3d2..68161450e1fd 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.function.Function; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -49,6 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.StructLikeMapUtil; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; import org.junit.Before; @@ -62,6 +64,7 @@ public class TestTaskEqualityDeltaWriter extends TableTestBase { private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024L; private final FileFormat format; + private final String structLikeMap; private final GenericRecord gRecord = GenericRecord.create(SCHEMA); private final GenericRecord posRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema()); @@ -69,18 +72,22 @@ public class TestTaskEqualityDeltaWriter extends TableTestBase { private int idFieldId; private int dataFieldId; - @Parameterized.Parameters(name = "FileFormat = {0}") + @Parameterized.Parameters(name = "FileFormat = {0}, StructLikeMap = {1}") public static Object[][] parameters() { return new Object[][] { - {"avro"}, - {"orc"}, - {"parquet"} + {"avro", StructLikeMapUtil.IN_MEMORY_MAP}, + {"avro", StructLikeMapUtil.ROCKSDB_MAP}, + {"orc", StructLikeMapUtil.IN_MEMORY_MAP}, + {"orc", StructLikeMapUtil.ROCKSDB_MAP}, + {"parquet", StructLikeMapUtil.IN_MEMORY_MAP}, + {"parquet", StructLikeMapUtil.ROCKSDB_MAP} }; } - public TestTaskEqualityDeltaWriter(String fileFormat) { + public TestTaskEqualityDeltaWriter(String fileFormat, String structLikeMap) { super(FORMAT_V2); this.format = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH)); + this.structLikeMap = structLikeMap; } @Override @@ -99,6 +106,7 @@ public void setupTable() throws IOException { table.updateProperties() .defaultFormat(format) + .set(StructLikeMapUtil.IMPL, structLikeMap) .commit(); } @@ -439,7 +447,7 @@ private GenericTaskDeltaWriter createTaskWriter(List equalityFieldIds, Schema deleteSchema = table.schema().select(columns); return new GenericTaskDeltaWriter(table.schema(), deleteSchema, table.spec(), format, appenderFactory, - fileFactory, table.io(), TARGET_FILE_SIZE); + fileFactory, table.io(), TARGET_FILE_SIZE, table.properties()); } private static class GenericTaskDeltaWriter extends BaseTaskWriter { @@ -447,7 +455,8 @@ private static class GenericTaskDeltaWriter extends BaseTaskWriter { private GenericTaskDeltaWriter(Schema schema, Schema deleteSchema, PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Map properties) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); this.deltaWriter = new GenericEqualityDeltaWriter(null, schema, deleteSchema); } diff --git a/data/src/test/java/org/apache/iceberg/types/TestSerializers.java b/data/src/test/java/org/apache/iceberg/types/TestSerializers.java new file mode 100644 index 000000000000..f3e508596cc4 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/types/TestSerializers.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.types; + +import java.util.Comparator; +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestSerializers { + + private static final Types.StructType REQUIRED_PRIMITIVES = Types.StructType.of( + required(100, "id", Types.LongType.get()), + required(101, "data", Types.StringType.get()), + required(102, "b", Types.BooleanType.get()), + required(103, "i", Types.IntegerType.get()), + required(104, "l", Types.LongType.get()), + required(105, "f", Types.FloatType.get()), + required(106, "d", Types.DoubleType.get()), + required(107, "date", Types.DateType.get()), + required(108, "ts_tz", Types.TimestampType.withZone()), + required(109, "ts", Types.TimestampType.withoutZone()), + required(110, "s", Types.StringType.get()), + required(112, "fixed", Types.FixedType.ofLength(7)), + required(113, "bytes", Types.BinaryType.get()), + required(114, "dec_9_0", Types.DecimalType.of(9, 0)), + required(115, "dec_11_2", Types.DecimalType.of(11, 2)), + required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision + required(117, "time", Types.TimeType.get()) + ); + + private static final Types.StructType OPTIONAL_PRIMITIVES = Types.StructType.of( + optional(200, "_id", Types.LongType.get()), + optional(201, "_data", Types.StringType.get()), + optional(202, "_b", Types.BooleanType.get()), + optional(203, "_i", Types.IntegerType.get()), + optional(204, "_l", Types.LongType.get()), + optional(205, "_f", Types.FloatType.get()), + optional(206, "_d", Types.DoubleType.get()), + optional(207, "_date", Types.DateType.get()), + optional(208, "_ts_tz", Types.TimestampType.withZone()), + optional(209, "_ts", Types.TimestampType.withoutZone()), + optional(210, "_s", Types.StringType.get()), + optional(212, "_fixed", Types.FixedType.ofLength(7)), + optional(213, "_bytes", Types.BinaryType.get()), + optional(214, "_dec_9_0", Types.DecimalType.of(9, 0)), + optional(215, "_dec_11_2", Types.DecimalType.of(11, 2)), + optional(216, "_dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision + optional(217, "_time", Types.TimeType.get()) + ); + + @Test + public void testRequiredPrimitives() { + generateAndValidate(new Schema(REQUIRED_PRIMITIVES.fields())); + } + + @Test + public void testOptionalPrimitives() { + generateAndValidate(new Schema(OPTIONAL_PRIMITIVES.fields())); + } + + @Test + public void testListType() { + Types.StructType structType = Types.StructType.of( + required(0, "id", Types.LongType.get()), + optional(1, "optional_list", Types.ListType.ofOptional( + 2, Types.ListType.ofRequired( + 3, Types.LongType.get() + ) + )), + required(6, "required_list", Types.ListType.ofRequired( + 7, Types.ListType.ofOptional( + 8, Types.LongType.get() + ) + )) + ); + + generateAndValidate(new Schema(structType.fields())); + } + + @Test + @Ignore("The InternalRecordWrapper does not support nested ListType.") + public void testNestedList() { + Types.StructType structType = Types.StructType.of( + optional(1, "list_struct", Types.ListType.ofOptional( + 2, Types.ListType.ofRequired( + 3, Types.StructType.of(REQUIRED_PRIMITIVES.fields()) + ) + )), + optional(4, "struct_list", Types.StructType.of( + Types.NestedField.required(5, "inner_list", + Types.ListType.ofOptional( + 6, Types.StructType.of(OPTIONAL_PRIMITIVES.fields()) + )) + )) + ); + + generateAndValidate(new Schema(structType.fields())); + } + + @Test + public void testNestedSchema() { + Types.StructType structType = Types.StructType.of( + required(0, "id", Types.LongType.get()), + required(1, "level1", Types.StructType.of( + optional(2, "level2", Types.StructType.of( + required(3, "level3", Types.StructType.of( + optional(4, "level4", Types.StructType.of( + required(5, "level5", Types.StructType.of( + REQUIRED_PRIMITIVES.fields() + )) + )) + )) + )) + )) + ); + + generateAndValidate(new Schema(structType.fields())); + } + + private void generateAndValidate(Schema schema) { + List records = RandomGenericData.generate(schema, 100, 1_000_000_1); + + InternalRecordWrapper recordWrapper = new InternalRecordWrapper(schema.asStruct()); + Serializer serializer = Serializers.forType(schema.asStruct()); + Comparator comparator = Comparators.forType(schema.asStruct()); + + for (Record expectedRecord : records) { + StructLike expectedStructLike = recordWrapper.wrap(expectedRecord); + + byte[] expectedData = serializer.serialize(expectedStructLike); + StructLike actualStructLike = serializer.deserialize(expectedData); + + Assert.assertEquals("Should produce the equivalent StructLike", 0, + comparator.compare(expectedStructLike, actualStructLike)); + + byte[] actualData = serializer.serialize(actualStructLike); + Assert.assertArrayEquals("Should have the expected serialized data", expectedData, actualData); + } + } +} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java deleted file mode 100644 index 8415129db9a7..000000000000 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import java.util.List; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.io.BaseTaskWriter; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; - -abstract class BaseDeltaTaskWriter extends BaseTaskWriter { - - private final Schema schema; - private final Schema deleteSchema; - private final RowDataWrapper wrapper; - private final boolean upsert; - - BaseDeltaTaskWriter(PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - RowType flinkSchema, - List equalityFieldIds, - boolean upsert) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - this.schema = schema; - this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); - this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); - this.upsert = upsert; - } - - abstract RowDataDeltaWriter route(RowData row); - - RowDataWrapper wrapper() { - return wrapper; - } - - @Override - public void write(RowData row) throws IOException { - RowDataDeltaWriter writer = route(row); - - switch (row.getRowKind()) { - case INSERT: - case UPDATE_AFTER: - if (upsert) { - writer.delete(row); - } - writer.write(row); - break; - - case UPDATE_BEFORE: - if (upsert) { - break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice - } - writer.delete(row); - break; - case DELETE: - writer.delete(row); - break; - - default: - throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind()); - } - } - - protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { - RowDataDeltaWriter(PartitionKey partition) { - super(partition, schema, deleteSchema); - } - - @Override - protected StructLike asStructLike(RowData data) { - return wrapper.wrap(data); - } - } -} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java deleted file mode 100644 index 1eee6298e933..000000000000 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.List; -import java.util.Map; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Tasks; - -class PartitionedDeltaWriter extends BaseDeltaTaskWriter { - - private final PartitionKey partitionKey; - - private final Map writers = Maps.newHashMap(); - - PartitionedDeltaWriter(PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - RowType flinkSchema, - List equalityFieldIds, - boolean upsert) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds, - upsert); - this.partitionKey = new PartitionKey(spec, schema); - } - - @Override - RowDataDeltaWriter route(RowData row) { - partitionKey.partition(wrapper().wrap(row)); - - RowDataDeltaWriter writer = writers.get(partitionKey); - if (writer == null) { - // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers. - PartitionKey copiedKey = partitionKey.copy(); - writer = new RowDataDeltaWriter(copiedKey); - writers.put(copiedKey, writer); - } - - return writer; - } - - @Override - public void close() { - try { - Tasks.foreach(writers.values()) - .throwFailureWhenFinished() - .noRetry() - .run(RowDataDeltaWriter::close, IOException.class); - - writers.clear(); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close equality delta writer", e); - } - } -} diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 2849100858a1..e3da9a68da65 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -19,60 +19,88 @@ package org.apache.iceberg.flink.sink; +import java.io.IOException; import java.util.List; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.BaseEqualityDeltaWriter; +import org.apache.iceberg.io.ClusteredDataWriter; +import org.apache.iceberg.io.ClusteredEqualityDeleteWriter; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.DataWriteResult; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.EqualityDeltaWriter; +import org.apache.iceberg.io.FanoutDataWriter; +import org.apache.iceberg.io.FanoutEqualityDeleteWriter; +import org.apache.iceberg.io.FanoutPositionDeleteWriter; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.io.PartitionedFanoutWriter; +import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.Tasks; public class RowDataTaskWriterFactory implements TaskWriterFactory { private final Table table; - private final Schema schema; private final RowType flinkSchema; + private final Schema schema; private final PartitionSpec spec; private final FileIO io; + + private final Schema deleteSchema; + private final long targetFileSizeBytes; private final FileFormat format; - private final List equalityFieldIds; private final boolean upsert; - private final FileAppenderFactory appenderFactory; + private final FlinkFileWriterFactory fileWriterFactory; private transient OutputFileFactory outputFileFactory; - public RowDataTaskWriterFactory(Table table, - RowType flinkSchema, - long targetFileSizeBytes, - FileFormat format, - List equalityFieldIds, - boolean upsert) { + public RowDataTaskWriterFactory( + Table table, + RowType flinkSchema, + long targetFileSizeBytes, + FileFormat format, + List equalityFieldIds, + boolean upsert) { this.table = table; - this.schema = table.schema(); this.flinkSchema = flinkSchema; + this.schema = table.schema(); this.spec = table.spec(); this.io = table.io(); + this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; - this.equalityFieldIds = equalityFieldIds; this.upsert = upsert; - if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); + if (equalityFieldIds != null && !equalityFieldIds.isEmpty()) { + this.fileWriterFactory = FlinkFileWriterFactory.builderFor(table) + .dataSchema(schema) + .dataFlinkType(flinkSchema) + .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds)) + .equalityDeleteRowSchema(schema) + .build(); + + this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); } else { - // TODO provide the ability to customize the equality-delete row schema. - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, - ArrayUtil.toIntArray(equalityFieldIds), schema, null); + this.fileWriterFactory = FlinkFileWriterFactory.builderFor(table) + .dataSchema(schema) + .dataFlinkType(flinkSchema) + .build(); + + this.deleteSchema = null; } } @@ -83,46 +111,183 @@ public void initialize(int taskId, int attemptId) { @Override public TaskWriter create() { - Preconditions.checkNotNull(outputFileFactory, + Preconditions.checkNotNull( + outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize()."); - if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - // Initialize a task writer to write INSERT only. - if (spec.isUnpartitioned()) { - return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes); - } else { - return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory, - io, targetFileSizeBytes, schema, flinkSchema); - } + if (deleteSchema != null) { + return new DeltaWriter(spec.isPartitioned()); } else { - // Initialize a task writer to write both INSERT and equality DELETE. - if (spec.isUnpartitioned()) { - return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert); - } else { - return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io, - targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert); + return new InsertOnlyWriter(spec.isPartitioned()); + } + } + + private static > void cleanFiles(FileIO io, T[] files) { + Tasks.foreach(files) + .throwFailureWhenFinished() + .noRetry() + .run(file -> io.deleteFile(file.path().toString())); + } + + private abstract class FlinkBaseWriter implements TaskWriter { + private final WriteResult.Builder writeResultBuilder = WriteResult.builder(); + + @Override + public void write(RowData row) throws IOException { + switch (row.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + if (upsert) { + delete(row); + } + insert(row); + break; + + case UPDATE_BEFORE: + if (upsert) { + // Skip to write delete in upsert mode because UPDATE_AFTER will insert after delete. + break; + } + delete(row); + break; + + case DELETE: + delete(row); + break; + + default: + throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind()); } } + + protected void delete(RowData row) { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement delete."); + } + + protected void insert(RowData row) { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement insert."); + } + + protected void aggregateResult(DataWriteResult result) { + writeResultBuilder.addDataFiles(result.dataFiles()); + } + + protected void aggregateResult(WriteResult result) { + writeResultBuilder.add(result); + } + + @Override + public void abort() throws IOException { + close(); + + WriteResult result = writeResultBuilder.build(); + cleanFiles(io, result.dataFiles()); + cleanFiles(io, result.deleteFiles()); + } + + @Override + public WriteResult complete() throws IOException { + close(); + + return writeResultBuilder.build(); + } } - private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter { + private PartitioningWriter newDataWriter(boolean fanoutEnabled) { + if (fanoutEnabled) { + return new FanoutDataWriter<>(fileWriterFactory, outputFileFactory, io, format, targetFileSizeBytes); + } else { + return new ClusteredDataWriter<>(fileWriterFactory, outputFileFactory, io, format, targetFileSizeBytes); + } + } + + private PartitioningWriter newEqualityWriter(boolean fanoutEnabled) { + if (fanoutEnabled) { + return new FanoutEqualityDeleteWriter<>(fileWriterFactory, outputFileFactory, io, format, targetFileSizeBytes); + } else { + return new ClusteredEqualityDeleteWriter<>(fileWriterFactory, outputFileFactory, io, format, targetFileSizeBytes); + } + } + + private PartitioningWriter, DeleteWriteResult> newPositionWriter(boolean fanoutEnabled) { + if (fanoutEnabled) { + return new FanoutPositionDeleteWriter<>(fileWriterFactory, outputFileFactory, io, format, targetFileSizeBytes); + } else { + return new ClusteredPositionDeleteWriter<>(fileWriterFactory, outputFileFactory, io, format, targetFileSizeBytes); + } + } + + private class InsertOnlyWriter extends FlinkBaseWriter { private final PartitionKey partitionKey; private final RowDataWrapper rowDataWrapper; - RowDataPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema, - RowType flinkSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + private final PartitioningWriter delegate; + private boolean closed = false; + + private InsertOnlyWriter(boolean fanoutEnabled) { this.partitionKey = new PartitionKey(spec, schema); this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + + this.delegate = newDataWriter(fanoutEnabled); + } + + @Override + protected void insert(RowData row) { + partitionKey.partition(rowDataWrapper.wrap(row)); + delegate.write(row, spec, partitionKey); } @Override - protected PartitionKey partition(RowData row) { + public void close() throws IOException { + if (!closed) { + delegate.close(); + aggregateResult(delegate.result()); + + closed = true; + } + } + } + + private class DeltaWriter extends FlinkBaseWriter { + + private final PartitionKey partitionKey; + private final RowDataWrapper rowDataWrapper; + + private final EqualityDeltaWriter delegate; + private boolean closed = false; + + private DeltaWriter(boolean fanoutEnabled) { + this.partitionKey = new PartitionKey(spec, schema); + this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + + this.delegate = new BaseEqualityDeltaWriter<>( + newDataWriter(fanoutEnabled), + newEqualityWriter(fanoutEnabled), + newPositionWriter(fanoutEnabled), + schema, deleteSchema, + table.properties(), + rowDataWrapper::wrap); + } + + protected void delete(RowData row) { partitionKey.partition(rowDataWrapper.wrap(row)); - return partitionKey; + delegate.delete(row, spec, partitionKey); + } + + protected void insert(RowData row) { + partitionKey.partition(rowDataWrapper.wrap(row)); + delegate.insert(row, spec, partitionKey); + } + + @Override + public void close() throws IOException { + if (!closed) { + delegate.close(); + aggregateResult(delegate.result()); + + this.closed = true; + } } } } diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java deleted file mode 100644 index 331ed7c78192..000000000000 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.flink.sink; - -import java.io.IOException; -import java.util.List; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; - -class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { - private final RowDataDeltaWriter writer; - - UnpartitionedDeltaWriter(PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - RowType flinkSchema, - List equalityFieldIds, - boolean upsert) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds, - upsert); - this.writer = new RowDataDeltaWriter(null); - } - - @Override - RowDataDeltaWriter route(RowData row) { - return writer; - } - - @Override - public void close() throws IOException { - writer.close(); - } -} diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java index 68b706e2d281..875ded0ec415 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.flink.types.Row; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.BaseTable; @@ -38,7 +39,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.StructLikeMapUtil; import org.apache.iceberg.util.StructLikeSet; import org.junit.After; import org.junit.Assert; @@ -63,17 +66,21 @@ public class TestChangeLogTable extends ChangeLogTableTestBase { private static String warehouse; private final boolean partitioned; + private final String spillDiskImpl; - @Parameterized.Parameters(name = "PartitionedTable={0}") + @Parameterized.Parameters(name = "PartitionedTable={0}, SpillDiskImpl={1}") public static Iterable parameters() { return ImmutableList.of( - new Object[] {true}, - new Object[] {false} + new Object[] {true, StructLikeMapUtil.IN_MEMORY_MAP}, + new Object[] {true, StructLikeMapUtil.ROCKSDB_MAP}, + new Object[] {false, StructLikeMapUtil.IN_MEMORY_MAP}, + new Object[] {false, StructLikeMapUtil.ROCKSDB_MAP} ); } - public TestChangeLogTable(boolean partitioned) { + public TestChangeLogTable(boolean partitioned, String spillDiskImpl) { this.partitioned = partitioned; + this.spillDiskImpl = spillDiskImpl; } @BeforeClass @@ -244,8 +251,11 @@ private static Record record(int id, String data) { private Table createTable(String tableName, List key, boolean isPartitioned) { String partitionByCause = isPartitioned ? "PARTITIONED BY (data)" : ""; - sql("CREATE TABLE %s(id INT, data VARCHAR, PRIMARY KEY(%s) NOT ENFORCED) %s", - tableName, Joiner.on(',').join(key), partitionByCause); + Map props = Maps.newHashMap(); + props.put(StructLikeMapUtil.IMPL, spillDiskImpl); + + sql("CREATE TABLE %s(id INT, data VARCHAR, PRIMARY KEY(%s) NOT ENFORCED) %s WITH %s", + tableName, Joiner.on(',').join(key), partitionByCause, FlinkCatalogTestBase.toWithClause(props)); // Upgrade the iceberg table to format v2. CatalogLoader loader = CatalogLoader.hadoop("my_catalog", CONF, ImmutableMap.of( diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java index 348580d2ffa0..3fe9eafe800a 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java @@ -86,6 +86,7 @@ private static HiveIcebergRecordWriter writer(JobConf jc) { String tableName = jc.get(Catalogs.NAME); return new HiveIcebergRecordWriter(schema, spec, fileFormat, - new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, taskAttemptID, tableName); + new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, table.properties(), + taskAttemptID, tableName); } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java index 2adf43d1211a..b67434c842eb 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java @@ -64,7 +64,7 @@ static Map getWriters(TaskAttemptID taskAttempt HiveIcebergRecordWriter(Schema schema, PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize, - TaskAttemptID taskAttemptID, String tableName) { + Map properties, TaskAttemptID taskAttemptID, String tableName) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); this.io = io; this.currentKey = new PartitionKey(spec, schema); diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java index a33b9f781472..250f815be184 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java @@ -278,7 +278,7 @@ private List writeRecords(String name, int taskNum, int attemptNum, bool .operationId(operationId) .build(); HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, spec, fileFormat, - new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE, + new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE, table.properties(), TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME)); Container container = new Container<>(); diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java index d38ae2f40316..63ae26dbfa4c 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java @@ -35,9 +35,9 @@ public class SparkPartitionedFanoutWriter extends PartitionedFanoutWriter appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, - Schema schema, StructType sparkSchema) { + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Schema schema, StructType sparkSchema) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); this.partitionKey = new PartitionKey(spec, schema); this.internalRowWrapper = new InternalRowWrapper(sparkSchema); diff --git a/versions.props b/versions.props index 0dc08fbe81cc..2e161aa5d7d9 100644 --- a/versions.props +++ b/versions.props @@ -24,6 +24,7 @@ javax.activation:activation = 1.1.1 org.glassfish.jaxb:jaxb-runtime = 2.3.3 software.amazon.awssdk:* = 2.17.131 org.projectnessie:* = 0.21.2 +org.rocksdb:rocksdbjni = 6.6.4 com.google.cloud:libraries-bom = 24.1.0 org.scala-lang.modules:scala-collection-compat_2.12 = 2.6.0 org.scala-lang.modules:scala-collection-compat_2.13 = 2.6.0