diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 1c871ba30c04..60c89590ae4c 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1333,6 +1333,45 @@ acceptedBreaks: \ java.util.List)" justification: "Removing deprecations for 1.10.0" org.apache.iceberg:iceberg-parquet: + - code: "java.annotation.added" + old: "parameter org.apache.iceberg.parquet.ParquetValueReader\ + \ org.apache.iceberg.data.parquet.GenericParquetReaders::createStructReader(java.util.List,\ + \ java.util.List>, ===org.apache.iceberg.types.Types.StructType===)" + new: "parameter org.apache.iceberg.parquet.ParquetValueReader org.apache.iceberg.data.parquet.BaseParquetReaders::createStructReader(java.util.List>,\ + \ org.apache.iceberg.types.Types.StructType, ===java.lang.Integer===) @ org.apache.iceberg.data.parquet.GenericParquetReaders" + justification: "Changes to Internal Reader Base Implementation" + - code: "java.method.parameterTypeChanged" + old: "parameter org.apache.iceberg.parquet.ParquetValueReader org.apache.iceberg.data.parquet.InternalReader::createStructReader(java.util.List,\ + \ ===java.util.List>===,\ + \ org.apache.iceberg.types.Types.StructType)" + new: "parameter org.apache.iceberg.parquet.ParquetValueReader org.apache.iceberg.data.parquet.InternalReader::createStructReader(java.util.List>,\ + \ ===org.apache.iceberg.types.Types.StructType===, java.lang.Integer)" + justification: "Changes to Internal Reader Base Implementation" + - code: "java.method.parameterTypeChanged" + old: "parameter org.apache.iceberg.parquet.ParquetValueReader org.apache.iceberg.data.parquet.InternalReader::createStructReader(java.util.List,\ + \ java.util.List>, ===org.apache.iceberg.types.Types.StructType===)" + new: "parameter org.apache.iceberg.parquet.ParquetValueReader org.apache.iceberg.data.parquet.InternalReader::createStructReader(java.util.List>,\ + \ org.apache.iceberg.types.Types.StructType, ===java.lang.Integer===)" + justification: "Changes to Internal Reader Base Implementation" + - code: "java.method.parameterTypeChanged" + old: "parameter org.apache.iceberg.parquet.ParquetValueReader\ + \ org.apache.iceberg.data.parquet.GenericParquetReaders::createStructReader(java.util.List,\ + \ ===java.util.List>===,\ + \ org.apache.iceberg.types.Types.StructType)" + new: "parameter org.apache.iceberg.parquet.ParquetValueReader org.apache.iceberg.data.parquet.BaseParquetReaders::createStructReader(java.util.List>,\ + \ ===org.apache.iceberg.types.Types.StructType===, java.lang.Integer) @ org.apache.iceberg.data.parquet.GenericParquetReaders" + justification: "Changes to Internal Reader Base Implementation" + - code: "java.method.parameterTypeChanged" + old: "parameter org.apache.iceberg.parquet.ParquetValueReader\ + \ org.apache.iceberg.data.parquet.GenericParquetReaders::createStructReader(java.util.List,\ + \ java.util.List>, ===org.apache.iceberg.types.Types.StructType===)" + new: "parameter org.apache.iceberg.parquet.ParquetValueReader org.apache.iceberg.data.parquet.BaseParquetReaders::createStructReader(java.util.List>,\ + \ org.apache.iceberg.types.Types.StructType, ===java.lang.Integer===) @ org.apache.iceberg.data.parquet.GenericParquetReaders" + justification: "Changes to Internal Reader Base Implementation" - code: "java.method.removed" old: "method org.apache.iceberg.parquet.ParquetValueWriter\ \ org.apache.iceberg.data.parquet.InternalWriter::create(org.apache.parquet.schema.MessageType)" diff --git a/build.gradle b/build.gradle index 6bc052885fc4..2705bdbc39ea 100644 --- a/build.gradle +++ b/build.gradle @@ -348,6 +348,7 @@ project(':iceberg-core') { api project(':iceberg-api') implementation project(':iceberg-common') implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + testRuntimeOnly project(':iceberg-parquet') annotationProcessor libs.immutables.value compileOnly libs.immutables.value diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index a02e0eff55a2..5cfc4b636ed7 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -531,7 +531,9 @@ public ByteBuffer keyMetadata() { @Override public List splitOffsets() { if (hasWellDefinedOffsets()) { - return ArrayUtil.toUnmodifiableLongList(splitOffsets); + // We want to use this as a re-usable container so we can't have an immutable list here + // Fixes Find Files TestFindFiles + return ArrayUtil.toLongList(splitOffsets); } return null; diff --git a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java index 439524deaf24..55d1a76b1f0a 100644 --- a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java +++ b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.SnapshotChanges.changesFrom; + import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; @@ -71,6 +73,8 @@ public CherryPickOperation cherrypick(long snapshotId) { ValidationException.check( cherrypickSnapshot != null, "Cannot cherry-pick unknown snapshot ID: %s", snapshotId); + SnapshotChanges changes = SnapshotChanges.changesFrom(cherrypickSnapshot, io, specsById); + if (cherrypickSnapshot.operation().equals(DataOperations.APPEND)) { // this property is set on target snapshot that will get published String wapId = WapUtil.validateWapPublish(current, snapshotId); @@ -82,7 +86,7 @@ public CherryPickOperation cherrypick(long snapshotId) { set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(snapshotId)); // Pick modifications from the snapshot - for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) { + for (DataFile addedFile : changes.addedDataFiles()) { add(addedFile); } @@ -114,13 +118,13 @@ public CherryPickOperation cherrypick(long snapshotId) { // copy adds from the picked snapshot this.replacedPartitions = PartitionSet.create(specsById); - for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) { + for (DataFile addedFile : changes.addedDataFiles()) { add(addedFile); replacedPartitions.add(addedFile.specId(), addedFile.partition()); } // copy deletes from the picked snapshot - for (DataFile deletedFile : cherrypickSnapshot.removedDataFiles(io)) { + for (DataFile deletedFile : changes.removedDataFiles()) { delete(deletedFile); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 40ea17aaa592..7c1710bc72bb 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -126,7 +127,13 @@ protected ManifestReader( if (specsById != null) { this.spec = specsById.get(specId); } else { - this.spec = readPartitionSpec(file); + if (FileFormat.fromFileName(file.location()) == FileFormat.PARQUET) { + // We don't want to have to rely on touching footer information to read the partition spec + throw new UnsupportedOperationException( + "Reading partition spec from Parquet manifest files is not supported"); + } else { + this.spec = readPartitionSpec(file); + } } this.fileSchema = new Schema(DataFile.getType(spec.rawPartitionType()).fields()); @@ -146,18 +153,17 @@ private > PartitionSpec readPartitionSpec(InputFile inp } private static > Map readMetadata(InputFile inputFile) { - Map metadata; + Map metadata = Collections.emptyMap(); + FileFormat manifestFormat = FileFormat.fromFileName(inputFile.location()); + try { try (CloseableIterable> headerReader = - InternalData.read(FileFormat.AVRO, inputFile) + InternalData.read(manifestFormat, inputFile) .project(ManifestEntry.getSchema(Types.StructType.of()).select("status")) .build()) { - if (headerReader instanceof AvroIterable) { - metadata = ((AvroIterable>) headerReader).getMetadata(); - } else { - throw new RuntimeException( - "Reader does not support metadata reading: " + headerReader.getClass().getName()); + if (manifestFormat == FileFormat.AVRO) { + metadata = ((AvroIterable) headerReader).getMetadata(); } } } catch (IOException e) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index fd560b2b83ff..0c4f39aa9acb 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; @@ -57,7 +58,7 @@ public abstract class ManifestWriter> implements FileAp private ManifestWriter( PartitionSpec spec, EncryptedOutputFile file, Long snapshotId, Long firstRowId) { - this.file = file.encryptingOutputFile(); + this.file = outputFile(file); this.specId = spec.specId(); this.writer = newAppender(spec, this.file); this.snapshotId = snapshotId; @@ -73,6 +74,15 @@ private ManifestWriter( protected abstract FileAppender> newAppender( PartitionSpec spec, OutputFile outputFile); + /** + * Gets the actual OutputFile that will be used to write the manifest taking into account + * encryption if needed. V3 and earlier use AVRO so whole file encryption is invoked . V4+ use + * parquet so they pass through the native encryption output file if it is available. + */ + protected OutputFile outputFile(EncryptedOutputFile encryptedFile) { + return encryptedFile.encryptingOutputFile(); + } + protected ManifestContent content() { return ManifestContent.DATA; } @@ -229,6 +239,15 @@ static class V4Writer extends ManifestWriter { this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId); } + @Override + protected OutputFile outputFile(EncryptedOutputFile encryptedFile) { + if (encryptedFile instanceof NativeEncryptionOutputFile) { + return (NativeEncryptionOutputFile) encryptedFile; + } else { + return encryptedFile.encryptingOutputFile(); + } + } + @Override protected ManifestEntry prepare(ManifestEntry entry) { return entryWrapper.wrap(entry); @@ -239,7 +258,7 @@ protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType()); try { - return InternalData.write(FileFormat.AVRO, file) + return InternalData.write(FileFormat.PARQUET, file) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) @@ -264,6 +283,15 @@ static class V4DeleteWriter extends ManifestWriter { this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId); } + @Override + protected OutputFile outputFile(EncryptedOutputFile encryptedFile) { + if (encryptedFile instanceof NativeEncryptionOutputFile) { + return (NativeEncryptionOutputFile) encryptedFile; + } else { + return encryptedFile.encryptingOutputFile(); + } + } + @Override protected ManifestEntry prepare(ManifestEntry entry) { return entryWrapper.wrap(entry); @@ -274,7 +302,7 @@ protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType()); try { - return InternalData.write(FileFormat.AVRO, file) + return InternalData.write(FileFormat.PARQUET, file) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) diff --git a/core/src/main/java/org/apache/iceberg/SnapshotChanges.java b/core/src/main/java/org/apache/iceberg/SnapshotChanges.java new file mode 100644 index 000000000000..5bf07305caa0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/SnapshotChanges.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; + +/** + * A utility class to work around the current Snapshot interface and load cahnges from V4 Manfiests + * - needs to be discussed and changed + */ +public class SnapshotChanges { + + private final Snapshot snapshot; + private final FileIO io; + private final Map specsById; + + // Lazy Cache + private List addedDataFiles = null; + private List removedDataFiles = null; + private List addedDeleteFiles = null; + private List removedDeleteFiles = null; + + private SnapshotChanges(Snapshot snapshot, FileIO io, Map specsById) { + this.snapshot = snapshot; + this.io = io; + this.specsById = specsById; + } + + public List addedDataFiles() { + if (addedDataFiles == null) { + cacheDataFileChanges(); + } + return addedDataFiles; + } + + public List removedDataFiles() { + if (removedDataFiles == null) { + cacheDataFileChanges(); + } + return removedDataFiles; + } + + public List addedDeleteFiles() { + if (addedDeleteFiles == null) { + cacheDeleteFileChanges(); + } + return addedDeleteFiles; + } + + public List removedDeleteFiles() { + if (removedDeleteFiles == null) { + cacheDeleteFileChanges(); + } + return removedDeleteFiles; + } + + private void cacheDataFileChanges() { + ImmutableList.Builder addedDataFileBuilder = ImmutableList.builder(); + ImmutableList.Builder removedDataFileBuilder = ImmutableList.builder(); + + // read only manifests that were created by this snapshot + Iterable changedDataManifests = + Iterables.filter( + snapshot.dataManifests(io), + manifest -> Objects.equal(manifest.snapshotId(), snapshot.snapshotId())); + try (CloseableIterable> entries = + new ManifestGroup(io, changedDataManifests) + .specsById(specsById) + .ignoreExisting() + .entries()) { + for (ManifestEntry entry : entries) { + switch (entry.status()) { + case ADDED: + addedDataFileBuilder.add(entry.file().copy()); + break; + case DELETED: + removedDataFileBuilder.add(entry.file().copyWithoutStats()); + break; + default: + throw new IllegalStateException( + "Unexpected entry status, not added or deleted: " + entry); + } + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close entries while caching changes"); + } + + this.addedDataFiles = addedDataFileBuilder.build(); + this.removedDataFiles = removedDataFileBuilder.build(); + } + + private void cacheDeleteFileChanges() { + ImmutableList.Builder addedDeleteFilesBuilder = ImmutableList.builder(); + ImmutableList.Builder removedDeleteFilesBuilder = ImmutableList.builder(); + + Iterable changedDeleteManifests = + Iterables.filter( + snapshot.deleteManifests(io), + manifest -> Objects.equal(manifest.snapshotId(), snapshot.snapshotId())); + + for (ManifestFile manifest : changedDeleteManifests) { + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(manifest, io, specsById)) { + for (ManifestEntry entry : reader.entries()) { + switch (entry.status()) { + case ADDED: + addedDeleteFilesBuilder.add(entry.file().copy()); + break; + case DELETED: + removedDeleteFilesBuilder.add(entry.file().copyWithoutStats()); + break; + default: + // ignore existing + } + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to close manifest reader", e); + } + } + + this.addedDeleteFiles = addedDeleteFilesBuilder.build(); + this.removedDeleteFiles = removedDeleteFilesBuilder.build(); + } + + public static SnapshotChanges changesFrom( + Snapshot snapshot, FileIO io, Map specsById) { + return new SnapshotChanges(snapshot, io, specsById); + } +} diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 77cdac8f4a29..cd435b0d5218 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -559,9 +559,12 @@ protected OutputFile manifestListPath() { } protected EncryptedOutputFile newManifestOutputFile() { + FileFormat manifestFormat = + ops.current().formatVersion() <= 3 ? FileFormat.AVRO : FileFormat.PARQUET; + String manifestFileLocation = ops.metadataFileLocation( - FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())); + manifestFormat.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())); return EncryptingFileIO.combine(ops.io(), ops.encryption()) .newEncryptingOutputFile(manifestFileLocation); } diff --git a/core/src/main/java/org/apache/iceberg/V4Metadata.java b/core/src/main/java/org/apache/iceberg/V4Metadata.java index 67478290aa10..41d55a564fe3 100644 --- a/core/src/main/java/org/apache/iceberg/V4Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V4Metadata.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import java.nio.ByteBuffer; @@ -277,13 +278,24 @@ static Schema wrapFileSchema(Types.StructType fileSchema) { required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema)); } + static final Types.StructType PLACEHOLDER_PARTITION_TYPE = + Types.StructType.of(optional(-1000, "UNPARTITIONED_PLACEHOLDER", Types.BooleanType.get())); + + static final PartitionData PLACEHOLDER_PARTITION = new PartitionData(PLACEHOLDER_PARTITION_TYPE); + static Types.StructType fileType(Types.StructType partitionType) { + Types.StructType writePartitionType = + partitionType.fields().isEmpty() ? PLACEHOLDER_PARTITION_TYPE : partitionType; + return Types.StructType.of( DataFile.CONTENT.asRequired(), DataFile.FILE_PATH, DataFile.FILE_FORMAT, required( - DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType, DataFile.PARTITION_DOC), + DataFile.PARTITION_ID, + DataFile.PARTITION_NAME, + writePartitionType, + DataFile.PARTITION_DOC), DataFile.RECORD_COUNT, DataFile.FILE_SIZE, DataFile.COLUMN_SIZES, @@ -460,7 +472,7 @@ private Object get(int pos) { case 2: return wrapped.format() != null ? wrapped.format().toString() : null; case 3: - return wrapped.partition(); + return wrapped.partition() != null ? wrapped.partition() : PLACEHOLDER_PARTITION; case 4: return wrapped.recordCount(); case 5: diff --git a/core/src/main/java/org/apache/iceberg/data/HasMetadata.java b/core/src/main/java/org/apache/iceberg/data/HasMetadata.java new file mode 100644 index 000000000000..821dfcd98c22 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/data/HasMetadata.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.util.Map; + +public interface HasMetadata { + Map getMetadata(); +} diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index a040bd26786d..1faad69c11f6 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -20,13 +20,16 @@ import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.function.Function; import org.apache.iceberg.DataFile; import org.apache.iceberg.HistoryEntry; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotChanges; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -289,14 +292,15 @@ private static Iterable toIds(Iterable snapshots) { @Deprecated public static List newFiles( Long baseSnapshotId, long latestSnapshotId, Function lookup, FileIO io) { + List newFiles = Lists.newArrayList(); Snapshot lastSnapshot = null; + for (Snapshot currentSnapshot : ancestorsOf(latestSnapshotId, lookup)) { lastSnapshot = currentSnapshot; if (Objects.equals(currentSnapshot.snapshotId(), baseSnapshotId)) { return newFiles; } - Iterables.addAll(newFiles, currentSnapshot.addedDataFiles(io)); } @@ -309,6 +313,33 @@ public static List newFiles( return newFiles; } + public static List newFiles( + Long baseSnapshotId, + long latestSnapshotId, + Function lookup, + FileIO io, + Map specsById) { + List newFiles = Lists.newArrayList(); + Snapshot lastSnapshot = null; + for (Snapshot currentSnapshot : ancestorsOf(latestSnapshotId, lookup)) { + lastSnapshot = currentSnapshot; + if (Objects.equals(currentSnapshot.snapshotId(), baseSnapshotId)) { + return newFiles; + } + + SnapshotChanges changes = SnapshotChanges.changesFrom(currentSnapshot, io, specsById); + Iterables.addAll(newFiles, changes.addedDataFiles()); + } + + ValidationException.check( + Objects.equals(lastSnapshot.parentId(), baseSnapshotId), + "Cannot determine history between read snapshot %s and the last known ancestor %s", + baseSnapshotId, + lastSnapshot.snapshotId()); + + return newFiles; + } + public static CloseableIterable newFilesBetween( Long startSnapshotId, long endSnapshotId, Function lookup, FileIO io) { diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index 30c1fb7191fd..f47785bcd3b1 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -237,7 +237,8 @@ List listManifestFiles(File tableDirToList) { .listFiles( (dir, name) -> !name.startsWith("snap") - && Files.getFileExtension(name).equalsIgnoreCase("avro"))); + && (Files.getFileExtension(name).equalsIgnoreCase("avro") + || Files.getFileExtension(name).equalsIgnoreCase("parquet")))); } List listManifestLists(File tableDirToList) { @@ -276,7 +277,7 @@ ManifestFile writeManifest(DataFile... files) throws IOException { } ManifestFile writeManifest(Long snapshotId, DataFile... files) throws IOException { - File manifestFile = temp.resolve("input.m0.avro").toFile(); + File manifestFile = temp.resolve(manifestExtension("input.m0")).toFile(); assertThat(manifestFile).doesNotExist(); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); @@ -298,13 +299,13 @@ ManifestFile writeManifest(String fileName, ManifestEntry... entries) throws } ManifestFile writeManifest(Long snapshotId, ManifestEntry... entries) throws IOException { - return writeManifest(snapshotId, "input.m0.avro", entries); + return writeManifest(snapshotId, manifestExtension("input.m0"), entries); } @SuppressWarnings("unchecked") > ManifestFile writeManifest( Long snapshotId, String fileName, ManifestEntry... entries) throws IOException { - File manifestFile = temp.resolve(fileName).toFile(); + File manifestFile = temp.resolve(manifestExtension(fileName)).toFile(); assertThat(manifestFile).doesNotExist(); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); @@ -334,8 +335,7 @@ ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFi throws IOException { OutputFile manifestFile = org.apache.iceberg.Files.localOutput( - FileFormat.AVRO.addExtension( - temp.resolve("junit" + System.nanoTime()).toFile().toString())); + temp.resolve(manifestExtension("junit" + System.nanoTime())).toString()); ManifestWriter writer = ManifestFiles.writeDeleteManifest(newFormatVersion, SPEC, manifestFile, snapshotId); try { @@ -349,7 +349,7 @@ ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFi } ManifestFile writeManifestWithName(String name, DataFile... files) throws IOException { - File manifestFile = temp.resolve(name + ".avro").toFile(); + File manifestFile = temp.resolve(manifestExtension(name)).toFile(); assertThat(manifestFile).doesNotExist(); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); @@ -465,7 +465,8 @@ void validateSnapshot(Snapshot old, Snapshot snap, Long sequenceNumber, DataFile long id = snap.snapshotId(); Iterator newPaths = paths(newFiles).iterator(); - for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { + for (ManifestEntry entry : + ManifestFiles.read(manifest, FILE_IO, table.specs()).entries()) { DataFile file = entry.file(); if (sequenceNumber != null) { V1Assert.assertEquals( @@ -577,7 +578,8 @@ void validateManifest( Iterator ids, Iterator expectedFiles, Iterator statuses) { - for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { + for (ManifestEntry entry : + ManifestFiles.read(manifest, FILE_IO, table.specs()).entries()) { DataFile file = entry.file(); DataFile expected = expectedFiles.next(); @@ -603,7 +605,7 @@ void validateDeleteManifest( Iterator expectedFiles, Iterator statuses) { for (ManifestEntry entry : - ManifestFiles.readDeleteManifest(manifest, FILE_IO, null).entries()) { + ManifestFiles.readDeleteManifest(manifest, FILE_IO, table.specs()).entries()) { DeleteFile file = entry.file(); DeleteFile expected = expectedFiles.next(); @@ -750,6 +752,14 @@ protected void withUnavailableLocations(Iterable locations, Action actio } } + protected String manifestExtension(String filename) { + if (formatVersion >= 4) { + return FileFormat.PARQUET.addExtension(filename); + } else { + return FileFormat.AVRO.addExtension(filename); + } + } + private void move(String location, String newLocation) { Path path = Paths.get(location); Path tempPath = Paths.get(newLocation); @@ -765,8 +775,9 @@ static void validateManifestEntries( ManifestFile manifest, Iterator ids, Iterator expectedFiles, - Iterator expectedStatuses) { - for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { + Iterator expectedStatuses, + Map specs) { + for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO, specs).entries()) { DataFile file = entry.file(); DataFile expected = expectedFiles.next(); final ManifestEntry.Status expectedStatus = expectedStatuses.next(); @@ -804,8 +815,8 @@ static Iterator files(DeleteFile... files) { return Iterators.forArray(files); } - static Iterator files(ManifestFile manifest) { - return ManifestFiles.read(manifest, FILE_IO).iterator(); + static Iterator files(ManifestFile manifest, Map specs) { + return ManifestFiles.read(manifest, FILE_IO, specs).iterator(); } static long recordCount(ContentFile... files) { diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java index cad294f97a28..854cc48410ae 100644 --- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java @@ -105,7 +105,8 @@ public void testMultipleDeletes() { delete1.allManifests(table.io()).get(0), ids(delete1.snapshotId(), append.snapshotId(), append.snapshotId()), files(FILE_A, FILE_B, FILE_C), - statuses(Status.DELETED, Status.EXISTING, Status.EXISTING)); + statuses(Status.DELETED, Status.EXISTING, Status.EXISTING), + table.specs()); Snapshot delete2 = commit(table, table.newDelete().deleteFile(FILE_B), branch); assertThat(version()).isEqualTo(3); @@ -114,7 +115,8 @@ public void testMultipleDeletes() { delete2.allManifests(FILE_IO).get(0), ids(delete2.snapshotId(), append.snapshotId()), files(FILE_B, FILE_C), - statuses(Status.DELETED, Status.EXISTING)); + statuses(Status.DELETED, Status.EXISTING), + table.specs()); } @TestTemplate @@ -167,7 +169,8 @@ public void testAlreadyDeletedFilesAreIgnoredDuringDeletesByRowFilter() { initialSnapshot.allManifests(FILE_IO).get(0), ids(initialSnapshot.snapshotId(), initialSnapshot.snapshotId()), files(firstDataFile, secondDataFile), - statuses(Status.ADDED, Status.ADDED)); + statuses(Status.ADDED, Status.ADDED), + table.specs()); // delete the first data file Snapshot deleteSnapshot = commit(table, table.newDelete().deleteFile(firstDataFile), branch); @@ -176,7 +179,8 @@ public void testAlreadyDeletedFilesAreIgnoredDuringDeletesByRowFilter() { deleteSnapshot.allManifests(FILE_IO).get(0), ids(deleteSnapshot.snapshotId(), initialSnapshot.snapshotId()), files(firstDataFile, secondDataFile), - statuses(Status.DELETED, Status.EXISTING)); + statuses(Status.DELETED, Status.EXISTING), + table.specs()); // delete the second data file using a row filter // the commit should succeed as there is only one live data file @@ -188,7 +192,8 @@ public void testAlreadyDeletedFilesAreIgnoredDuringDeletesByRowFilter() { finalSnapshot.allManifests(FILE_IO).get(0), ids(finalSnapshot.snapshotId()), files(secondDataFile), - statuses(Status.DELETED)); + statuses(Status.DELETED), + table.specs()); } @TestTemplate @@ -208,7 +213,8 @@ public void testDeleteSomeFilesByRowFilterWithoutPartitionPredicates() { initialSnapshot.allManifests(FILE_IO).get(0), ids(initialSnapshot.snapshotId(), initialSnapshot.snapshotId()), files(DATA_FILE_BUCKET_0_IDS_0_2, DATA_FILE_BUCKET_0_IDS_8_10), - statuses(Status.ADDED, Status.ADDED)); + statuses(Status.ADDED, Status.ADDED), + table.specs()); // delete the second one using a metrics filter (no partition filter) Snapshot deleteSnapshot = @@ -220,7 +226,8 @@ public void testDeleteSomeFilesByRowFilterWithoutPartitionPredicates() { deleteSnapshot.allManifests(FILE_IO).get(0), ids(initialSnapshot.snapshotId(), deleteSnapshot.snapshotId()), files(DATA_FILE_BUCKET_0_IDS_0_2, DATA_FILE_BUCKET_0_IDS_8_10), - statuses(Status.EXISTING, Status.DELETED)); + statuses(Status.EXISTING, Status.DELETED), + table.specs()); } @TestTemplate @@ -240,7 +247,8 @@ public void testDeleteSomeFilesByRowFilterWithCombinedPredicates() { initialSnapshot.allManifests(FILE_IO).get(0), ids(initialSnapshot.snapshotId(), initialSnapshot.snapshotId()), files(DATA_FILE_BUCKET_0_IDS_0_2, DATA_FILE_BUCKET_0_IDS_8_10), - statuses(Status.ADDED, Status.ADDED)); + statuses(Status.ADDED, Status.ADDED), + table.specs()); // delete the second one using a filter that relies on metrics and partition data Expression partPredicate = Expressions.equal(Expressions.bucket("data", 16), 0); @@ -253,7 +261,8 @@ public void testDeleteSomeFilesByRowFilterWithCombinedPredicates() { deleteSnapshot.allManifests(FILE_IO).get(0), ids(initialSnapshot.snapshotId(), deleteSnapshot.snapshotId()), files(DATA_FILE_BUCKET_0_IDS_0_2, DATA_FILE_BUCKET_0_IDS_8_10), - statuses(Status.EXISTING, Status.DELETED)); + statuses(Status.EXISTING, Status.DELETED), + table.specs()); } @TestTemplate @@ -317,7 +326,8 @@ public void testDeleteCaseSensitivity() { deleteSnapshot.allManifests(FILE_IO).get(0), ids(deleteSnapshot.snapshotId()), files(DATA_FILE_BUCKET_0_IDS_0_2), - statuses(Status.DELETED)); + statuses(Status.DELETED), + table.specs()); } @TestTemplate @@ -338,14 +348,16 @@ public void testDeleteFilesOnIndependentBranches() { Iterables.getOnlyElement(testBranchTip.allManifests(FILE_IO)), ids(testBranchTip.snapshotId(), initialSnapshot.snapshotId(), initialSnapshot.snapshotId()), files(FILE_A, FILE_B, FILE_C), - statuses(Status.DELETED, Status.EXISTING, Status.EXISTING)); + statuses(Status.DELETED, Status.EXISTING, Status.EXISTING), + table.specs()); // Verify A on main validateManifestEntries( Iterables.getOnlyElement(delete2.allManifests(FILE_IO)), ids(initialSnapshot.snapshotId(), delete2.snapshotId(), delete2.snapshotId()), files(FILE_A, FILE_B, FILE_C), - statuses(Status.EXISTING, Status.DELETED, Status.DELETED)); + statuses(Status.EXISTING, Status.DELETED, Status.DELETED), + table.specs()); } @TestTemplate @@ -407,7 +419,8 @@ public void testDeleteValidateFileExistence() { Iterables.getOnlyElement(delete.allManifests(FILE_IO)), ids(delete.snapshotId()), files(FILE_B), - statuses(Status.DELETED)); + statuses(Status.DELETED), + table.specs()); assertThatThrownBy( () -> commit(table, table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch)) @@ -435,7 +448,8 @@ public void testDeleteFilesNoValidation() { Iterables.getOnlyElement(delete1.allManifests(FILE_IO)), ids(delete1.snapshotId()), files(FILE_B), - statuses(Status.DELETED)); + statuses(Status.DELETED), + table.specs()); Snapshot delete2 = commit(table, table.newDelete().deleteFile(FILE_B), branch); assertThat(delete2.allManifests(FILE_IO)).isEmpty(); diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java index 384e0edaaadd..432743ece0ba 100644 --- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java @@ -496,7 +496,8 @@ public void testAppendManifestWithSnapshotIdInheritance() throws IOException { manifests.get(0), ids(snapshot.snapshotId(), snapshot.snapshotId()), files(FILE_A, FILE_B), - statuses(Status.ADDED, Status.ADDED)); + statuses(Status.ADDED, Status.ADDED), + table.specs()); // validate that the metadata summary is correct when using appendManifest assertThat(snapshot.summary()) diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index fe4e4a74d1c4..a1189e6ce505 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -51,7 +51,7 @@ public class TestManifestReader extends TestBase { @TestTemplate public void testManifestReaderWithEmptyInheritableMetadata() throws IOException { ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 1000L, FILE_A)); - try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO, table.specs())) { ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); assertThat(entry.status()).isEqualTo(Status.EXISTING); assertThat(entry.file().location()).isEqualTo(FILE_A.location()); @@ -63,7 +63,8 @@ public void testManifestReaderWithEmptyInheritableMetadata() throws IOException public void testReaderWithFilterWithoutSelect() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE_A, FILE_B, FILE_C); try (ManifestReader reader = - ManifestFiles.read(manifest, FILE_IO).filterRows(Expressions.equal("id", 0))) { + ManifestFiles.read(manifest, FILE_IO, table.specs()) + .filterRows(Expressions.equal("id", 0))) { List files = Streams.stream(reader).collect(Collectors.toList()); // note that all files are returned because the reader returns data files that may match, and @@ -85,8 +86,9 @@ public void testInvalidUsage() throws IOException { @TestTemplate public void testManifestReaderWithPartitionMetadata() throws IOException { + assumeThat(formatVersion <= 3).as("Parquet Manifests in V4+ do not have Metadata"); ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A)); - try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO, table.specs())) { ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); assertThat(entry.snapshotId()).isEqualTo(123L); @@ -106,7 +108,7 @@ public void testManifestReaderWithUpdatedPartitionMetadataForV1Table() throws IO table.ops().commit(table.ops().current(), table.ops().current().updatePartitionSpec(spec)); ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A)); - try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO, table.specs())) { ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); assertThat(entry.snapshotId()).isEqualTo(123L); @@ -126,7 +128,7 @@ public void testManifestReaderWithUpdatedPartitionMetadataForV1Table() throws IO @TestTemplate public void testDataFilePositions() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE_A, FILE_B, FILE_C); - try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO, table.specs())) { long expectedPos = 0L; for (DataFile file : reader) { assertThat(file.pos()).as("Position should match").isEqualTo(expectedPos); @@ -138,7 +140,7 @@ public void testDataFilePositions() throws IOException { @TestTemplate public void testDataFileManifestPaths() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE_A, FILE_B, FILE_C); - try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO, table.specs())) { for (DataFile file : reader) { assertThat(file.manifestLocation()).isEqualTo(manifest.path()); } @@ -168,7 +170,7 @@ public void testDeleteFileManifestPaths() throws IOException { ManifestFile manifest = writeDeleteManifest(formatVersion, 1000L, FILE_A_DELETES, FILE_B_DELETES); try (ManifestReader reader = - ManifestFiles.readDeleteManifest(manifest, FILE_IO, null)) { + ManifestFiles.readDeleteManifest(manifest, FILE_IO, table.specs())) { for (DeleteFile file : reader) { assertThat(file.manifestLocation()).isEqualTo(manifest.path()); } @@ -227,7 +229,7 @@ public void testDataFileSplitOffsetsNullWhenInvalid() throws IOException { .withSplitOffsets(ImmutableList.of(2L, 1000L)) // Offset 1000 is out of bounds .build(); ManifestFile manifest = writeManifest(1000L, invalidOffset); - try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO, table.specs())) { DataFile file = Iterables.getOnlyElement(reader); assertThat(file.splitOffsets()).isNull(); } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java b/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java index 0e2f4c0ebec3..483910d3000b 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java @@ -59,7 +59,7 @@ public class TestManifestReaderStats extends TestBase { @TestTemplate public void testReadIncludesFullStats() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE); - try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) { + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO, table.specs())) { CloseableIterable> entries = reader.entries(); ManifestEntry entry = entries.iterator().next(); assertFullStats(entry.file()); @@ -70,7 +70,8 @@ public void testReadIncludesFullStats() throws IOException { public void testReadEntriesWithFilterIncludesFullStats() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE); try (ManifestReader reader = - ManifestFiles.read(manifest, FILE_IO).filterRows(Expressions.equal("id", 3))) { + ManifestFiles.read(manifest, FILE_IO, table.specs()) + .filterRows(Expressions.equal("id", 3))) { CloseableIterable> entries = reader.entries(); ManifestEntry entry = entries.iterator().next(); assertFullStats(entry.file()); @@ -81,7 +82,8 @@ public void testReadEntriesWithFilterIncludesFullStats() throws IOException { public void testReadIteratorWithFilterIncludesFullStats() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE); try (ManifestReader reader = - ManifestFiles.read(manifest, FILE_IO).filterRows(Expressions.equal("id", 3))) { + ManifestFiles.read(manifest, FILE_IO, table.specs()) + .filterRows(Expressions.equal("id", 3))) { DataFile entry = reader.iterator().next(); assertFullStats(entry); } @@ -91,7 +93,7 @@ public void testReadIteratorWithFilterIncludesFullStats() throws IOException { public void testReadEntriesWithFilterAndSelectIncludesFullStats() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE); try (ManifestReader reader = - ManifestFiles.read(manifest, FILE_IO) + ManifestFiles.read(manifest, FILE_IO, table.specs()) .select(ImmutableList.of("file_path")) .filterRows(Expressions.equal("id", 3))) { CloseableIterable> entries = reader.entries(); @@ -104,7 +106,7 @@ public void testReadEntriesWithFilterAndSelectIncludesFullStats() throws IOExcep public void testReadIteratorWithFilterAndSelectDropsStats() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE); try (ManifestReader reader = - ManifestFiles.read(manifest, FILE_IO) + ManifestFiles.read(manifest, FILE_IO, table.specs()) .select(ImmutableList.of("file_path")) .filterRows(Expressions.equal("id", 3))) { DataFile entry = reader.iterator().next(); @@ -116,7 +118,7 @@ public void testReadIteratorWithFilterAndSelectDropsStats() throws IOException { public void testReadIteratorWithFilterAndSelectRecordCountDropsStats() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE); try (ManifestReader reader = - ManifestFiles.read(manifest, FILE_IO) + ManifestFiles.read(manifest, FILE_IO, table.specs()) .select(ImmutableList.of("file_path", "record_count")) .filterRows(Expressions.equal("id", 3))) { DataFile entry = reader.iterator().next(); @@ -128,7 +130,7 @@ public void testReadIteratorWithFilterAndSelectRecordCountDropsStats() throws IO public void testReadIteratorWithFilterAndSelectStatsIncludesFullStats() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE); try (ManifestReader reader = - ManifestFiles.read(manifest, FILE_IO) + ManifestFiles.read(manifest, FILE_IO, table.specs()) .select(ImmutableList.of("file_path", "value_counts")) .filterRows(Expressions.equal("id", 3))) { DataFile entry = reader.iterator().next(); @@ -143,7 +145,7 @@ public void testReadIteratorWithFilterAndSelectStatsIncludesFullStats() throws I public void testReadIteratorWithProjectStats() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE); try (ManifestReader reader = - ManifestFiles.read(manifest, FILE_IO) + ManifestFiles.read(manifest, FILE_IO, table.specs()) .project(new Schema(ImmutableList.of(DataFile.FILE_PATH, DataFile.VALUE_COUNTS)))) { DataFile entry = reader.iterator().next(); @@ -162,7 +164,8 @@ public void testReadIteratorWithProjectStats() throws IOException { public void testReadEntriesWithSelectNotProjectStats() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE); try (ManifestReader reader = - ManifestFiles.read(manifest, FILE_IO).select(ImmutableList.of("file_path"))) { + ManifestFiles.read(manifest, FILE_IO, table.specs()) + .select(ImmutableList.of("file_path"))) { CloseableIterable> entries = reader.entries(); ManifestEntry entry = entries.iterator().next(); DataFile dataFile = entry.file(); @@ -185,7 +188,7 @@ public void testReadEntriesWithSelectNotProjectStats() throws IOException { public void testReadEntriesWithSelectCertainStatNotProjectStats() throws IOException { ManifestFile manifest = writeManifest(1000L, FILE); try (ManifestReader reader = - ManifestFiles.read(manifest, FILE_IO) + ManifestFiles.read(manifest, FILE_IO, table.specs()) .select(ImmutableList.of("file_path", "value_counts"))) { DataFile dataFile = reader.iterator().next(); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java index 5d682421d541..3123f270ff08 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java @@ -92,7 +92,7 @@ public void testManifestPartitionStats() throws IOException { @TestTemplate public void testWriteManifestWithSequenceNumber() throws IOException { assumeThat(formatVersion).isGreaterThan(1); - File manifestFile = temp.resolve("manifest" + System.nanoTime() + ".avro").toFile(); + File manifestFile = temp.resolve(manifestExtension("manifest" + System.nanoTime())).toFile(); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, 1L); @@ -100,7 +100,8 @@ public void testWriteManifestWithSequenceNumber() throws IOException { writer.close(); ManifestFile manifest = writer.toManifestFile(); assertThat(manifest.sequenceNumber()).isEqualTo(-1); - ManifestReader manifestReader = ManifestFiles.read(manifest, table.io()); + ManifestReader manifestReader = + ManifestFiles.read(manifest, table.io(), table.specs()); for (ManifestEntry entry : manifestReader.entries()) { assertThat(entry.dataSequenceNumber()).isEqualTo(1000); assertThat(entry.fileSequenceNumber()).isEqualTo(ManifestWriter.UNASSIGNED_SEQ); @@ -118,7 +119,7 @@ public void testCommitManifestWithExplicitDataSequenceNumber() throws IOExceptio ManifestFile manifest = writeManifest( - "manifest.avro", + manifestExtension("manifest"), manifestEntry(Status.ADDED, null, dataSequenceNumber, null, file1), manifestEntry(Status.ADDED, null, dataSequenceNumber, null, file2)); @@ -160,7 +161,7 @@ public void testCommitManifestWithExistingEntriesWithoutFileSequenceNumber() thr ManifestFile newManifest = writeManifest( - "manifest.avro", + manifestExtension("manifest"), manifestEntry(Status.EXISTING, appendSnapshotId, appendSequenceNumber, null, file1), manifestEntry(Status.EXISTING, appendSnapshotId, appendSequenceNumber, null, file2)); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index 4aa1acfd5f96..94ae788cd43e 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.nio.file.Path; import java.util.List; +import java.util.Map; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.encryption.EncryptionManager; @@ -476,7 +477,7 @@ private ManifestEntry readManifest(ManifestFile manifest) throws IOExc } private ManifestFile writeDeleteManifest(int formatVersion) throws IOException { - String filename = FileFormat.AVRO.addExtension("manifest"); + String filename = manifestFormat(formatVersion).addExtension("manifest"); EncryptedOutputFile manifestFile = encryptionManager().encrypt(io.newOutputFile(filename)); ManifestWriter writer = ManifestFiles.writeDeleteManifest(formatVersion, SPEC, manifestFile, SNAPSHOT_ID); @@ -490,10 +491,18 @@ private ManifestFile writeDeleteManifest(int formatVersion) throws IOException { private ManifestEntry readDeleteManifest(ManifestFile manifest) throws IOException { try (CloseableIterable> reader = - ManifestFiles.readDeleteManifest(manifest, io, null).entries()) { + ManifestFiles.readDeleteManifest(manifest, io, Map.of(0, SPEC)).entries()) { List> entries = Lists.newArrayList(reader); assertThat(entries).hasSize(1); return entries.get(0); } } + + private FileFormat manifestFormat(int formatVersion) { + if (formatVersion <= 3) { + return FileFormat.AVRO; + } else { + return FileFormat.PARQUET; + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index b4dab67cde70..c38f01e9f9e1 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -405,7 +405,7 @@ public void testMergeWithExistingManifest() { dataSeqs(2L, 2L, 1L, 1L), fileSeqs(2L, 2L, 1L, 1L), ids(snapshotId, snapshotId, baseId, baseId), - concat(files(FILE_C, FILE_D), files(initialManifest)), + concat(files(FILE_C, FILE_D), files(initialManifest, table.specs())), statuses(Status.ADDED, Status.ADDED, Status.EXISTING, Status.EXISTING)); } @@ -561,7 +561,7 @@ public void testManifestsMergeIntoOne() throws IOException { .newAppend() .appendManifest( writeManifest( - "input-m0.avro", manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))), + "input-m0", manifestEntry(ManifestEntry.Status.ADDED, null, FILE_C))), branch); base = readMetadata(); @@ -761,7 +761,8 @@ public void testMergeWithExistingManifestAfterDelete() { newManifest, ids(snapshotId, snapshotId, baseId), files(FILE_C, FILE_D, FILE_B), - statuses(Status.ADDED, Status.ADDED, Status.EXISTING)); + statuses(Status.ADDED, Status.ADDED, Status.EXISTING), + table.specs()); } @TestTemplate @@ -863,7 +864,7 @@ public void testMergeSizeTargetWithExistingManifest() { dataSeqs(1L, 1L), fileSeqs(1L, 1L), ids(baseId, baseId), - files(initialManifest), + files(initialManifest, table.specs()), statuses(Status.ADDED, Status.ADDED)); } @@ -1030,7 +1031,7 @@ public void testFailure() { validateManifest( newManifest, ids(pending.snapshotId(), baseId), - concat(files(FILE_B), files(initialManifest))); + concat(files(FILE_B), files(initialManifest, table.specs()))); assertThatThrownBy(() -> commit(table, append, branch)) .isInstanceOf(CommitFailedException.class) @@ -1047,7 +1048,7 @@ public void testFailure() { dataSeqs(1L), fileSeqs(1L), ids(baseId), - files(initialManifest), + files(initialManifest, table.specs()), statuses(Status.ADDED)); assertThat(new File(newManifest.path())).doesNotExist(); @@ -1120,7 +1121,7 @@ public void testRecovery() { validateManifest( newManifest, ids(pending.snapshotId(), baseId), - concat(files(FILE_B), files(initialManifest))); + concat(files(FILE_B), files(initialManifest, table.specs()))); V2Assert.assertEquals( "Snapshot sequence number should be 1", 1, latestSnapshot(table, branch).sequenceNumber()); @@ -1199,7 +1200,7 @@ public void testMergedAppendManifestCleanupWithSnapshotIdInheritance() throws IO table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1").commit(); - ManifestFile manifest1 = writeManifestWithName("manifest-file-1.avro", FILE_A, FILE_B); + ManifestFile manifest1 = writeManifestWithName("manifest-file-1", FILE_A, FILE_B); Snapshot snap1 = commit(table, table.newAppend().appendManifest(manifest1), branch); long commitId1 = snap1.snapshotId(); @@ -1215,7 +1216,7 @@ public void testMergedAppendManifestCleanupWithSnapshotIdInheritance() throws IO statuses(Status.ADDED, Status.ADDED)); assertThat(new File(manifest1.path())).exists(); - ManifestFile manifest2 = writeManifestWithName("manifest-file-2.avro", FILE_C, FILE_D); + ManifestFile manifest2 = writeManifestWithName("manifest-file-2", FILE_C, FILE_D); Snapshot snap2 = commit(table, table.newAppend().appendManifest(manifest2), branch); long commitId2 = snap2.snapshotId(); @@ -1398,7 +1399,8 @@ public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { // field ids of manifest entries in two manifests with different specs of the same source field // should be different ManifestEntry entry = - ManifestFiles.read(committedSnapshot.allManifests(table.io()).get(0), FILE_IO) + ManifestFiles.read( + committedSnapshot.allManifests(table.io()).get(0), FILE_IO, table.specs()) .entries() .iterator() .next(); @@ -1411,7 +1413,8 @@ public void testManifestEntryFieldIdsForChangedPartitionSpecForV1Table() { assertThat(field.name()).isEqualTo("data_bucket"); entry = - ManifestFiles.read(committedSnapshot.allManifests(table.io()).get(1), FILE_IO) + ManifestFiles.read( + committedSnapshot.allManifests(table.io()).get(1), FILE_IO, table.specs()) .entries() .iterator() .next(); diff --git a/core/src/test/java/org/apache/iceberg/TestOverwrite.java b/core/src/test/java/org/apache/iceberg/TestOverwrite.java index c853cf69ea08..4b753c718ee8 100644 --- a/core/src/test/java/org/apache/iceberg/TestOverwrite.java +++ b/core/src/test/java/org/apache/iceberg/TestOverwrite.java @@ -183,7 +183,8 @@ public void testOverwriteWithoutAppend() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(overwriteId, baseId), files(FILE_0_TO_4, FILE_5_TO_9), - statuses(Status.DELETED, Status.EXISTING)); + statuses(Status.DELETED, Status.EXISTING), + table.specs()); } @TestTemplate @@ -229,13 +230,15 @@ public void testOverwriteWithAppendOutsideOfDelete() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(overwriteId), files(FILE_10_TO_14), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); validateManifestEntries( latestSnapshot(table, branch).allManifests(table.io()).get(1), ids(overwriteId, baseId), files(FILE_0_TO_4, FILE_5_TO_9), - statuses(Status.DELETED, Status.EXISTING)); + statuses(Status.DELETED, Status.EXISTING), + table.specs()); } @TestTemplate @@ -265,7 +268,8 @@ public void testOverwriteWithMergedAppendOutsideOfDelete() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(overwriteId, overwriteId, baseId), files(FILE_10_TO_14, FILE_0_TO_4, FILE_5_TO_9), - statuses(Status.ADDED, Status.DELETED, Status.EXISTING)); + statuses(Status.ADDED, Status.DELETED, Status.EXISTING), + table.specs()); } @TestTemplate diff --git a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java index 29daeb995cc5..7163c386c6c7 100644 --- a/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java +++ b/core/src/test/java/org/apache/iceberg/TestReplacePartitions.java @@ -130,13 +130,15 @@ public void testReplaceOnePartition() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(replaceId), files(FILE_E), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); validateManifestEntries( latestSnapshot(table, branch).allManifests(table.io()).get(1), ids(replaceId, baseId), files(FILE_A, FILE_B), - statuses(Status.DELETED, Status.EXISTING)); + statuses(Status.DELETED, Status.EXISTING), + table.specs()); } @TestTemplate @@ -159,7 +161,8 @@ public void testReplaceAndMergeOnePartition() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(replaceId, replaceId, baseId), files(FILE_E, FILE_A, FILE_B), - statuses(Status.ADDED, Status.DELETED, Status.EXISTING)); + statuses(Status.ADDED, Status.DELETED, Status.EXISTING), + table.specs()); } @TestTemplate @@ -189,13 +192,15 @@ public void testReplaceWithUnpartitionedTable() throws IOException { latestSnapshot(replaceMetadata, branch).allManifests(unpartitioned.io()).get(0), ids(replaceId), files(FILE_B), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); validateManifestEntries( latestSnapshot(replaceMetadata, branch).allManifests(unpartitioned.io()).get(1), ids(replaceId), files(FILE_A), - statuses(Status.DELETED)); + statuses(Status.DELETED), + table.specs()); } @TestTemplate @@ -230,7 +235,8 @@ public void testReplaceAndMergeWithUnpartitionedTable() throws IOException { latestSnapshot(replaceMetadata, branch).allManifests(unpartitioned.io()).get(0), ids(replaceId, replaceId), files(FILE_B, FILE_A), - statuses(Status.ADDED, Status.DELETED)); + statuses(Status.ADDED, Status.DELETED), + table.specs()); } @TestTemplate @@ -268,13 +274,15 @@ public void testValidationSuccess() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(replaceId), files(FILE_G), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); validateManifestEntries( latestSnapshot(table, branch).allManifests(table.io()).get(1), ids(baseId, baseId), files(FILE_A, FILE_B), - statuses(Status.ADDED, Status.ADDED)); + statuses(Status.ADDED, Status.ADDED), + table.specs()); } @TestTemplate @@ -306,12 +314,14 @@ public void testValidationNotInvoked() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(replaceId, replaceId), files(FILE_A, FILE_B), - statuses(Status.ADDED, Status.ADDED)); + statuses(Status.ADDED, Status.ADDED), + table.specs()); validateManifestEntries( latestSnapshot(table, branch).allManifests(table.io()).get(1), ids(replaceId), files(FILE_E), - statuses(Status.DELETED)); + statuses(Status.DELETED), + table.specs()); } @TestTemplate @@ -437,12 +447,14 @@ public void testConcurrentReplaceNoConflict() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(id3), files(FILE_B), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); validateManifestEntries( latestSnapshot(table, branch).allManifests(table.io()).get(1), ids(id2), files(FILE_A), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); } @TestTemplate @@ -531,17 +543,20 @@ public void testAppendReplaceNoConflict() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(id3), files(FILE_A), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); validateManifestEntries( latestSnapshot(table, branch).allManifests(table.io()).get(1), ids(id2), files(FILE_B), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); validateManifestEntries( latestSnapshot(table, branch).allManifests(table.io()).get(2), ids(id3), files(FILE_A), - statuses(Status.DELETED)); + statuses(Status.DELETED), + table.specs()); } @TestTemplate @@ -672,12 +687,14 @@ public void testDeleteReplaceNoConflict() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(id3), files(FILE_B), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); validateManifestEntries( latestSnapshot(table, branch).allManifests(table.io()).get(1), ids(id1), files(FILE_A), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); validateDeleteManifest( latestSnapshot(table, branch).allManifests(table.io()).get(2), dataSeqs(2L), @@ -744,12 +761,14 @@ public void testOverwriteReplaceNoConflict() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(finalId), files(FILE_B), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); validateManifestEntries( latestSnapshot(table, branch).allManifests(table.io()).get(1), ids(finalId), files(FILE_B), - statuses(Status.DELETED)); + statuses(Status.DELETED), + table.specs()); } @TestTemplate @@ -808,17 +827,20 @@ public void testValidateOnlyDeletes() { latestSnapshot(table, branch).allManifests(table.io()).get(0), ids(finalId), files(FILE_B), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); validateManifestEntries( latestSnapshot(table, branch).allManifests(table.io()).get(1), ids(finalId), files(FILE_B), - statuses(Status.DELETED)); + statuses(Status.DELETED), + table.specs()); validateManifestEntries( latestSnapshot(table, branch).allManifests(table.io()).get(2), ids(baseId), files(FILE_A), - statuses(Status.ADDED)); + statuses(Status.ADDED), + table.specs()); } @TestTemplate diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java index 701fdc97f2cf..d7bbf4e873b0 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java @@ -193,13 +193,18 @@ public void testDeleteWithDuplicateEntriesInManifest() { long pendingId = pending.snapshotId(); validateManifestEntries( - pending.allManifests(table.io()).get(0), ids(pendingId), files(FILE_C), statuses(ADDED)); + pending.allManifests(table.io()).get(0), + ids(pendingId), + files(FILE_C), + statuses(ADDED), + table.specs()); validateManifestEntries( pending.allManifests(table.io()).get(1), ids(pendingId, baseSnapshotId), files(FILE_A, FILE_B), - statuses(DELETED, EXISTING)); + statuses(DELETED, EXISTING), + table.specs()); // We should only get the 3 manifests that this test is expected to add. assertThat(listManifestFiles()).hasSize(3); @@ -225,13 +230,18 @@ public void testAddAndDelete() { long pendingId = pending.snapshotId(); validateManifestEntries( - pending.allManifests(table.io()).get(0), ids(pendingId), files(FILE_C), statuses(ADDED)); + pending.allManifests(table.io()).get(0), + ids(pendingId), + files(FILE_C), + statuses(ADDED), + table.specs()); validateManifestEntries( pending.allManifests(table.io()).get(1), ids(pendingId, baseSnapshotId), files(FILE_A, FILE_B), - statuses(DELETED, EXISTING)); + statuses(DELETED, EXISTING), + table.specs()); // We should only get the 3 manifests that this test is expected to add. assertThat(listManifestFiles()).hasSize(3); @@ -265,7 +275,8 @@ public void testRewriteDataAndDeleteFiles() { initialManifests.get(0), ids(baseSnapshotId, baseSnapshotId, baseSnapshotId), files(FILE_A, FILE_B, FILE_C), - statuses(ADDED, ADDED, ADDED)); + statuses(ADDED, ADDED, ADDED), + table.specs()); validateDeleteManifest( initialManifests.get(1), dataSeqs(1L, 1L), @@ -292,13 +303,18 @@ public void testRewriteDataAndDeleteFiles() { long pendingId = pending.snapshotId(); validateManifestEntries( - pending.allManifests(table.io()).get(0), ids(pendingId), files(FILE_D), statuses(ADDED)); + pending.allManifests(table.io()).get(0), + ids(pendingId), + files(FILE_D), + statuses(ADDED), + table.specs()); validateManifestEntries( pending.allManifests(table.io()).get(1), ids(pendingId, baseSnapshotId, baseSnapshotId), files(FILE_A, FILE_B, FILE_C), - statuses(DELETED, EXISTING, EXISTING)); + statuses(DELETED, EXISTING, EXISTING), + table.specs()); validateDeleteManifest( pending.allManifests(table.io()).get(2), @@ -340,7 +356,8 @@ public void testRewriteDataAndAssignOldSequenceNumber() { initialManifests.get(0), ids(baseSnapshotId, baseSnapshotId, baseSnapshotId), files(FILE_A, FILE_B, FILE_C), - statuses(ADDED, ADDED, ADDED)); + statuses(ADDED, ADDED, ADDED), + table.specs()); validateDeleteManifest( initialManifests.get(1), dataSeqs(1L, 1L), @@ -364,8 +381,9 @@ public void testRewriteDataAndAssignOldSequenceNumber() { long pendingId = pending.snapshotId(); ManifestFile newManifest = pending.allManifests(table.io()).get(0); - validateManifestEntries(newManifest, ids(pendingId), files(FILE_D), statuses(ADDED)); - assertThat(ManifestFiles.read(newManifest, FILE_IO).entries()) + validateManifestEntries( + newManifest, ids(pendingId), files(FILE_D), statuses(ADDED), table.specs()); + assertThat(ManifestFiles.read(newManifest, FILE_IO, table.specs()).entries()) .allSatisfy(entry -> assertThat(entry.dataSequenceNumber()).isEqualTo(oldSequenceNumber)); assertThat(newManifest.sequenceNumber()).isEqualTo(oldSequenceNumber + 1); @@ -373,7 +391,8 @@ public void testRewriteDataAndAssignOldSequenceNumber() { pending.allManifests(table.io()).get(1), ids(pendingId, baseSnapshotId, baseSnapshotId), files(FILE_A, FILE_B, FILE_C), - statuses(DELETED, EXISTING, EXISTING)); + statuses(DELETED, EXISTING, EXISTING), + table.specs()); validateDeleteManifest( pending.allManifests(table.io()).get(2), @@ -401,8 +420,10 @@ public void testFailure() { ManifestFile manifest1 = pending.allManifests(table.io()).get(0); ManifestFile manifest2 = pending.allManifests(table.io()).get(1); - validateManifestEntries(manifest1, ids(pending.snapshotId()), files(FILE_B), statuses(ADDED)); - validateManifestEntries(manifest2, ids(pending.snapshotId()), files(FILE_A), statuses(DELETED)); + validateManifestEntries( + manifest1, ids(pending.snapshotId()), files(FILE_B), statuses(ADDED), table.specs()); + validateManifestEntries( + manifest2, ids(pending.snapshotId()), files(FILE_A), statuses(DELETED), table.specs()); assertThatThrownBy(() -> commit(table, rewrite, branch)) .isInstanceOf(CommitFailedException.class) @@ -455,13 +476,15 @@ public void testFailureWhenRewriteBothDataAndDeleteFiles() { pending.allManifests(table.io()).get(0), ids(pending.snapshotId()), files(FILE_D), - statuses(ADDED)); + statuses(ADDED), + table.specs()); validateManifestEntries( pending.allManifests(table.io()).get(1), ids(pending.snapshotId(), baseSnapshotId, baseSnapshotId), files(FILE_A, FILE_B, FILE_C), - statuses(DELETED, EXISTING, EXISTING)); + statuses(DELETED, EXISTING, EXISTING), + table.specs()); validateDeleteManifest( pending.allManifests(table.io()).get(2), @@ -497,8 +520,10 @@ public void testRecovery() { ManifestFile manifest1 = pending.allManifests(table.io()).get(0); ManifestFile manifest2 = pending.allManifests(table.io()).get(1); - validateManifestEntries(manifest1, ids(pending.snapshotId()), files(FILE_B), statuses(ADDED)); - validateManifestEntries(manifest2, ids(pending.snapshotId()), files(FILE_A), statuses(DELETED)); + validateManifestEntries( + manifest1, ids(pending.snapshotId()), files(FILE_B), statuses(ADDED), table.specs()); + validateManifestEntries( + manifest2, ids(pending.snapshotId()), files(FILE_A), statuses(DELETED), table.specs()); commit(table, rewrite, branch); @@ -548,13 +573,15 @@ public void testRecoverWhenRewriteBothDataAndDeleteFiles() { ManifestFile manifest2 = pending.allManifests(table.io()).get(1); ManifestFile manifest3 = pending.allManifests(table.io()).get(2); - validateManifestEntries(manifest1, ids(pending.snapshotId()), files(FILE_D), statuses(ADDED)); + validateManifestEntries( + manifest1, ids(pending.snapshotId()), files(FILE_D), statuses(ADDED), table.specs()); validateManifestEntries( manifest2, ids(pending.snapshotId(), baseSnapshotId, baseSnapshotId), files(FILE_A, FILE_B, FILE_C), - statuses(DELETED, EXISTING, EXISTING)); + statuses(DELETED, EXISTING, EXISTING), + table.specs()); validateDeleteManifest( manifest3, @@ -604,7 +631,8 @@ public void testReplaceEqualityDeletesWithPositionDeletes() { ManifestFile manifest2 = pending.allManifests(table.io()).get(1); ManifestFile manifest3 = pending.allManifests(table.io()).get(2); - validateManifestEntries(manifest1, ids(baseSnapshotId), files(FILE_A2), statuses(ADDED)); + validateManifestEntries( + manifest1, ids(baseSnapshotId), files(FILE_A2), statuses(ADDED), table.specs()); validateDeleteManifest( manifest2, @@ -659,7 +687,8 @@ public void testRemoveAllDeletes() { ManifestFile manifest1 = pending.allManifests(table.io()).get(0); ManifestFile manifest2 = pending.allManifests(table.io()).get(1); - validateManifestEntries(manifest1, ids(pending.snapshotId()), files(FILE_A), statuses(DELETED)); + validateManifestEntries( + manifest1, ids(pending.snapshotId()), files(FILE_A), statuses(DELETED), table.specs()); validateDeleteManifest( manifest2, @@ -722,13 +751,18 @@ public void testAlreadyDeletedFile() { long pendingId = pending.snapshotId(); validateManifestEntries( - pending.allManifests(table.io()).get(0), ids(pendingId), files(FILE_B), statuses(ADDED)); + pending.allManifests(table.io()).get(0), + ids(pendingId), + files(FILE_B), + statuses(ADDED), + table.specs()); validateManifestEntries( pending.allManifests(table.io()).get(1), ids(pendingId, latestSnapshot(table, branch).snapshotId()), files(FILE_A), - statuses(DELETED)); + statuses(DELETED), + table.specs()); commit(table, rewrite, branch); diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index 0d5414eaf0d6..64c72f93ec16 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -68,7 +68,11 @@ public void testRewriteManifestsAppendedDirectly() throws IOException { assertThat(manifests).hasSize(1); validateManifestEntries( - manifests.get(0), ids(appendId), files(FILE_A), statuses(ManifestEntry.Status.EXISTING)); + manifests.get(0), + ids(appendId), + files(FILE_A), + statuses(ManifestEntry.Status.EXISTING), + table.specs()); } @TestTemplate @@ -133,7 +137,8 @@ public void testRewriteManifestsGeneratedAndAppendedDirectly() throws IOExceptio // get the correct file order List files; List ids; - try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) { + try (ManifestReader reader = + ManifestFiles.read(manifests.get(0), table.io(), table.specs())) { if (reader.iterator().next().location().equals(FILE_A.location())) { files = Arrays.asList(FILE_A, FILE_B); ids = Arrays.asList(manifestAppendId, fileAppendId); @@ -147,7 +152,8 @@ public void testRewriteManifestsGeneratedAndAppendedDirectly() throws IOExceptio manifests.get(0), ids.iterator(), files.iterator(), - statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING), + table.specs()); } @TestTemplate @@ -167,9 +173,17 @@ public void testReplaceManifestsSeparate() { manifests.sort(Comparator.comparing(ManifestFile::path)); validateManifestEntries( - manifests.get(0), ids(appendId), files(FILE_A), statuses(ManifestEntry.Status.EXISTING)); + manifests.get(0), + ids(appendId), + files(FILE_A), + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( - manifests.get(1), ids(appendId), files(FILE_B), statuses(ManifestEntry.Status.EXISTING)); + manifests.get(1), + ids(appendId), + files(FILE_B), + statuses(ManifestEntry.Status.EXISTING), + table.specs()); } @TestTemplate @@ -193,7 +207,8 @@ public void testReplaceManifestsConsolidate() throws IOException { // get the file order correct List files; List ids; - try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) { + try (ManifestReader reader = + ManifestFiles.read(manifests.get(0), table.io(), table.specs())) { if (reader.iterator().next().location().equals(FILE_A.location())) { files = Arrays.asList(FILE_A, FILE_B); ids = Arrays.asList(appendIdA, appendIdB); @@ -207,7 +222,8 @@ public void testReplaceManifestsConsolidate() throws IOException { manifests.get(0), ids.iterator(), files.iterator(), - statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING), + table.specs()); } @TestTemplate @@ -232,7 +248,8 @@ public void testReplaceManifestsWithFilter() throws IOException { .clusterBy(file -> "file") .rewriteIf( manifest -> { - try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) { + try (ManifestReader reader = + ManifestFiles.read(manifest, table.io(), table.specs())) { return !reader.iterator().next().location().equals(FILE_A.location()); } catch (IOException x) { throw new RuntimeIOException(x); @@ -246,7 +263,8 @@ public void testReplaceManifestsWithFilter() throws IOException { // get the file order correct List files; List ids; - try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) { + try (ManifestReader reader = + ManifestFiles.read(manifests.get(0), table.io(), table.specs())) { if (reader.iterator().next().location().equals(FILE_B.location())) { files = Arrays.asList(FILE_B, FILE_C); ids = Arrays.asList(appendIdB, appendIdC); @@ -260,9 +278,14 @@ public void testReplaceManifestsWithFilter() throws IOException { manifests.get(0), ids.iterator(), files.iterator(), - statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( - manifests.get(1), ids(appendIdA), files(FILE_A), statuses(ManifestEntry.Status.ADDED)); + manifests.get(1), + ids(appendIdA), + files(FILE_A), + statuses(ManifestEntry.Status.ADDED), + table.specs()); } @TestTemplate @@ -284,9 +307,17 @@ public void testReplaceManifestsMaxSize() { manifests.sort(Comparator.comparing(ManifestFile::path)); validateManifestEntries( - manifests.get(0), ids(appendId), files(FILE_A), statuses(ManifestEntry.Status.EXISTING)); + manifests.get(0), + ids(appendId), + files(FILE_A), + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( - manifests.get(1), ids(appendId), files(FILE_B), statuses(ManifestEntry.Status.EXISTING)); + manifests.get(1), + ids(appendId), + files(FILE_B), + statuses(ManifestEntry.Status.EXISTING), + table.specs()); } @TestTemplate @@ -307,7 +338,8 @@ public void testConcurrentRewriteManifest() throws IOException { .clusterBy(file -> "file") .rewriteIf( manifest -> { - try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) { + try (ManifestReader reader = + ManifestFiles.read(manifest, table.io(), table.specs())) { return !reader.iterator().next().location().equals(FILE_A.location()); } catch (IOException x) { throw new RuntimeIOException(x); @@ -327,7 +359,8 @@ public void testConcurrentRewriteManifest() throws IOException { // get the file order correct List files; List ids; - try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) { + try (ManifestReader reader = + ManifestFiles.read(manifests.get(0), table.io(), table.specs())) { if (reader.iterator().next().location().equals(FILE_A.location())) { files = Arrays.asList(FILE_A, FILE_B); ids = Arrays.asList(appendIdA, appendIdB); @@ -341,7 +374,8 @@ public void testConcurrentRewriteManifest() throws IOException { manifests.get(0), ids.iterator(), files.iterator(), - statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING), + table.specs()); } @TestTemplate @@ -371,9 +405,17 @@ public void testAppendDuringRewriteManifest() { assertThat(manifests).hasSize(2); validateManifestEntries( - manifests.get(0), ids(appendIdA), files(FILE_A), statuses(ManifestEntry.Status.EXISTING)); + manifests.get(0), + ids(appendIdA), + files(FILE_A), + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( - manifests.get(1), ids(appendIdB), files(FILE_B), statuses(ManifestEntry.Status.ADDED)); + manifests.get(1), + ids(appendIdB), + files(FILE_B), + statuses(ManifestEntry.Status.ADDED), + table.specs()); } @TestTemplate @@ -401,9 +443,17 @@ public void testRewriteManifestDuringAppend() { // last append should be the first in the list validateManifestEntries( - manifests.get(0), ids(appendIdB), files(FILE_B), statuses(ManifestEntry.Status.ADDED)); + manifests.get(0), + ids(appendIdB), + files(FILE_B), + statuses(ManifestEntry.Status.ADDED), + table.specs()); validateManifestEntries( - manifests.get(1), ids(appendIdA), files(FILE_A), statuses(ManifestEntry.Status.EXISTING)); + manifests.get(1), + ids(appendIdA), + files(FILE_A), + statuses(ManifestEntry.Status.EXISTING), + table.specs()); } @TestTemplate @@ -422,11 +472,11 @@ public void testBasicManifestReplacement() throws IOException { ManifestFile firstNewManifest = writeManifest( - "manifest-file-1.avro", + "manifest-file-1", manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A)); ManifestFile secondNewManifest = writeManifest( - "manifest-file-2.avro", + "manifest-file-2", manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B)); RewriteManifests rewriteManifests = table.rewriteManifests(); @@ -453,19 +503,22 @@ public void testBasicManifestReplacement() throws IOException { manifests.get(0), ids(firstSnapshot.snapshotId()), files(FILE_A), - statuses(ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( manifests.get(1), ids(firstSnapshot.snapshotId()), files(FILE_B), - statuses(ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( manifests.get(2), ids(secondSnapshot.snapshotId(), secondSnapshot.snapshotId()), files(FILE_C, FILE_D), - statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED)); + statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED), + table.specs()); } @TestTemplate @@ -512,19 +565,22 @@ public void testBasicManifestReplacementWithSnapshotIdInheritance() throws IOExc manifests.get(0), ids(firstSnapshot.snapshotId()), files(FILE_A), - statuses(ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( manifests.get(1), ids(firstSnapshot.snapshotId()), files(FILE_B), - statuses(ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( manifests.get(2), ids(secondSnapshot.snapshotId(), secondSnapshot.snapshotId()), files(FILE_C, FILE_D), - statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED)); + statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED), + table.specs()); // validate that any subsequent operation does not fail table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); @@ -702,19 +758,22 @@ public void testManifestReplacementConcurrentAppend() throws IOException { manifests.get(0), ids(firstSnapshot.snapshotId()), files(FILE_A), - statuses(ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( manifests.get(1), ids(firstSnapshot.snapshotId()), files(FILE_B), - statuses(ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( manifests.get(2), ids(secondSnapshot.snapshotId(), secondSnapshot.snapshotId()), files(FILE_C, FILE_D), - statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED)); + statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED), + table.specs()); } @TestTemplate @@ -762,19 +821,22 @@ public void testManifestReplacementConcurrentDelete() throws IOException { manifests.get(0), ids(firstSnapshot.snapshotId()), files(FILE_A), - statuses(ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( manifests.get(1), ids(firstSnapshot.snapshotId()), files(FILE_B), - statuses(ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( manifests.get(2), ids(thirdSnapshotId, secondSnapshotId), files(FILE_C, FILE_D), - statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING), + table.specs()); } @TestTemplate @@ -845,7 +907,8 @@ public void testManifestReplacementCombinedWithRewrite() throws IOException { .clusterBy(dataFile -> "const-value") .rewriteIf( manifest -> { - try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) { + try (ManifestReader reader = + ManifestFiles.read(manifest, table.io(), table.specs())) { return !reader.iterator().next().location().equals(FILE_B.location()); } catch (IOException x) { throw new RuntimeIOException(x); @@ -863,13 +926,15 @@ public void testManifestReplacementCombinedWithRewrite() throws IOException { manifests.get(1), ids(firstSnapshot.snapshotId()), files(FILE_A), - statuses(ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( manifests.get(2), ids(secondSnapshot.snapshotId()), files(FILE_B), - statuses(ManifestEntry.Status.ADDED)); + statuses(ManifestEntry.Status.ADDED), + table.specs()); } @TestTemplate @@ -922,13 +987,15 @@ public void testManifestReplacementCombinedWithRewriteConcurrentDelete() throws manifests.get(0), ids(secondSnapshot.snapshotId()), files(FILE_B), - statuses(ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING), + table.specs()); validateManifestEntries( manifests.get(1), ids(firstSnapshot.snapshotId()), files(FILE_A), - statuses(ManifestEntry.Status.EXISTING)); + statuses(ManifestEntry.Status.EXISTING), + table.specs()); } @TestTemplate diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshot.java b/core/src/test/java/org/apache/iceberg/TestSnapshot.java index 166db7e3e6a1..34180782a1c0 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshot.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshot.java @@ -35,14 +35,15 @@ public void testAppendFilesFromTable() { table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); // collect data files from deserialization - Iterable filesToAdd = table.currentSnapshot().addedDataFiles(table.io()); + SnapshotChanges changes = + SnapshotChanges.changesFrom(table.currentSnapshot(), table.io(), table.specs()); table.newDelete().deleteFile(FILE_A).deleteFile(FILE_B).commit(); Snapshot oldSnapshot = table.currentSnapshot(); AppendFiles fastAppend = table.newFastAppend(); - for (DataFile file : filesToAdd) { + for (DataFile file : changes.addedDataFiles()) { fastAppend.appendFile(file); } @@ -88,8 +89,10 @@ public void testCachedDataFiles() { table.newOverwrite().deleteFile(FILE_A).addFile(thirdSnapshotDataFile).commit(); Snapshot thirdSnapshot = table.currentSnapshot(); + SnapshotChanges thirdChanges = + SnapshotChanges.changesFrom(thirdSnapshot, FILE_IO, table.specs()); - Iterable removedDataFiles = thirdSnapshot.removedDataFiles(FILE_IO); + Iterable removedDataFiles = thirdChanges.removedDataFiles(); assertThat(removedDataFiles).as("Must have 1 removed data file").hasSize(1); DataFile removedDataFile = Iterables.getOnlyElement(removedDataFiles); @@ -97,7 +100,7 @@ public void testCachedDataFiles() { assertThat(removedDataFile.specId()).isEqualTo(FILE_A.specId()); assertThat(removedDataFile.partition()).isEqualTo(FILE_A.partition()); - Iterable addedDataFiles = thirdSnapshot.addedDataFiles(FILE_IO); + Iterable addedDataFiles = thirdChanges.addedDataFiles(); assertThat(addedDataFiles).as("Must have 1 added data file").hasSize(1); DataFile addedDataFile = Iterables.getOnlyElement(addedDataFiles); @@ -134,8 +137,10 @@ public void testCachedDeleteFiles() { .commit(); Snapshot thirdSnapshot = table.currentSnapshot(); + SnapshotChanges thirdChanges = + SnapshotChanges.changesFrom(thirdSnapshot, FILE_IO, table.specs()); - Iterable removedDeleteFiles = thirdSnapshot.removedDeleteFiles(FILE_IO); + Iterable removedDeleteFiles = thirdChanges.removedDeleteFiles(); assertThat(removedDeleteFiles).as("Must have 1 removed delete file").hasSize(1); DeleteFile removedDeleteFile = Iterables.getOnlyElement(removedDeleteFiles); @@ -143,7 +148,7 @@ public void testCachedDeleteFiles() { assertThat(removedDeleteFile.specId()).isEqualTo(secondSnapshotDeleteFile.specId()); assertThat(removedDeleteFile.partition()).isEqualTo(secondSnapshotDeleteFile.partition()); - Iterable addedDeleteFiles = thirdSnapshot.addedDeleteFiles(FILE_IO); + Iterable addedDeleteFiles = thirdChanges.addedDeleteFiles(); assertThat(addedDeleteFiles).as("Must have 1 added delete file").hasSize(1); DeleteFile addedDeleteFile = Iterables.getOnlyElement(addedDeleteFiles); @@ -172,7 +177,8 @@ private void runAddedDataFileSequenceNumberTest(long expectedSequenceNumber) { table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); Snapshot snapshot = table.currentSnapshot(); - Iterable addedDataFiles = snapshot.addedDataFiles(table.io()); + Iterable addedDataFiles = + SnapshotChanges.changesFrom(snapshot, table.io(), table.specs()).addedDataFiles(); assertThat(snapshot.sequenceNumber()) .as("Sequence number mismatch in Snapshot") @@ -218,7 +224,8 @@ private void runRemovedDataFileSequenceNumberTest( table.newDelete().deleteFile(fileToRemove).commit(); Snapshot snapshot = table.currentSnapshot(); - Iterable removedDataFiles = snapshot.removedDataFiles(table.io()); + SnapshotChanges changes = SnapshotChanges.changesFrom(snapshot, table.io(), table.specs()); + Iterable removedDataFiles = changes.removedDataFiles(); assertThat(removedDataFiles).as("Must have 1 removed data file").hasSize(1); DataFile removedDataFile = Iterables.getOnlyElement(removedDataFiles); @@ -250,7 +257,8 @@ private void runAddedDeleteFileSequenceNumberTest( table.newRowDelta().addDeletes(deleteFileToAdd).commit(); Snapshot snapshot = table.currentSnapshot(); - Iterable addedDeleteFiles = snapshot.addedDeleteFiles(table.io()); + SnapshotChanges changes = SnapshotChanges.changesFrom(snapshot, table.io(), table.specs()); + Iterable addedDeleteFiles = changes.addedDeleteFiles(); assertThat(addedDeleteFiles).as("Must have 1 added delete file").hasSize(1); DeleteFile addedDeleteFile = Iterables.getOnlyElement(addedDeleteFiles); diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotSelection.java b/core/src/test/java/org/apache/iceberg/TestSnapshotSelection.java index 765b93574210..7104504a2d7c 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotSelection.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotSelection.java @@ -67,7 +67,8 @@ public void testSnapshotStatsForAddedFiles() { table.newFastAppend().appendFile(fileWithStats).commit(); Snapshot snapshot = table.currentSnapshot(); - Iterable addedFiles = snapshot.addedDataFiles(table.io()); + SnapshotChanges changes = SnapshotChanges.changesFrom(snapshot, table.io(), table.specs()); + Iterable addedFiles = changes.addedDataFiles(); assertThat(addedFiles).hasSize(1); DataFile dataFile = Iterables.getOnlyElement(addedFiles); assertThat(dataFile.valueCounts()).isNotNull(); diff --git a/core/src/test/java/org/apache/iceberg/TestTimestampPartitions.java b/core/src/test/java/org/apache/iceberg/TestTimestampPartitions.java index 1d6b7dd3b751..9945c6842be7 100644 --- a/core/src/test/java/org/apache/iceberg/TestTimestampPartitions.java +++ b/core/src/test/java/org/apache/iceberg/TestTimestampPartitions.java @@ -59,6 +59,7 @@ public void testPartitionAppend() throws IOException { table.currentSnapshot().allManifests(table.io()).get(0), ids(id), files(dataFile), - statuses(ManifestEntry.Status.ADDED)); + statuses(ManifestEntry.Status.ADDED), + table.specs()); } } diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index 7715c045bd9e..d468b51ad0c1 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -116,14 +116,16 @@ public void testMultipleOperationTransaction() { readMetadata().currentSnapshot().allManifests(table.io()).get(0), ids(deleteSnapshot.snapshotId(), appendSnapshot.snapshotId()), files(FILE_A, FILE_B), - statuses(Status.DELETED, Status.EXISTING)); + statuses(Status.DELETED, Status.EXISTING), + table.specs()); assertThat(readMetadata().snapshots()).hasSize(3); validateManifestEntries( readMetadata().snapshots().get(1).allManifests(table.io()).get(0), ids(appendSnapshot.snapshotId(), appendSnapshot.snapshotId()), files(FILE_A, FILE_B), - statuses(Status.ADDED, Status.ADDED)); + statuses(Status.ADDED, Status.ADDED), + table.specs()); assertThat(table.history()).containsAll(initialHistory); } @@ -162,14 +164,16 @@ public void testMultipleOperationTransactionFromTable() { readMetadata().currentSnapshot().allManifests(table.io()).get(0), ids(deleteSnapshot.snapshotId(), appendSnapshot.snapshotId()), files(FILE_A, FILE_B), - statuses(Status.DELETED, Status.EXISTING)); + statuses(Status.DELETED, Status.EXISTING), + table.specs()); assertThat(readMetadata().snapshots()).hasSize(2); validateManifestEntries( readMetadata().snapshots().get(0).allManifests(table.io()).get(0), ids(appendSnapshot.snapshotId(), appendSnapshot.snapshotId()), files(FILE_A, FILE_B), - statuses(Status.ADDED, Status.ADDED)); + statuses(Status.ADDED, Status.ADDED), + table.specs()); } @TestTemplate @@ -682,7 +686,8 @@ public void testTransactionRewriteManifestsAppendedDirectly() throws IOException statuses( ManifestEntry.Status.ADDED, ManifestEntry.Status.EXISTING, - ManifestEntry.Status.EXISTING)); + ManifestEntry.Status.EXISTING), + table.specs()); table.expireSnapshots().expireOlderThan(finalSnapshotTimestamp + 1).retainLast(1).commit(); diff --git a/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java b/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java index 02d42c81a196..d475ff99b8fc 100644 --- a/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java +++ b/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java @@ -110,10 +110,12 @@ public void testCurrentSnapshotOperation() { table.manageSnapshots().setCurrentSnapshot(wapSnapshot.snapshotId()).commit(); base = readMetadata(); + SnapshotChanges changes = + SnapshotChanges.changesFrom(base.currentSnapshot(), table.io(), table.specs()); assertThat(base.currentSnapshot().snapshotId()).isEqualTo(wapSnapshot.snapshotId()); assertThat(base.snapshots()).hasSize(2); assertThat(base.currentSnapshot().allManifests(table.io())).hasSize(2); - assertThat(base.currentSnapshot().addedDataFiles(table.io())).hasSize(1); + assertThat(changes.addedDataFiles()).hasSize(1); assertThat(base.snapshotLog()) .as("Snapshot log should indicate number of snapshots committed") .hasSize(2); @@ -133,10 +135,12 @@ public void testSetCurrentSnapshotNoWAP() { table.manageSnapshots().setCurrentSnapshot(firstSnapshotId).commit(); base = readMetadata(); + SnapshotChanges changes = + SnapshotChanges.changesFrom(base.currentSnapshot(), table.io(), table.specs()); assertThat(base.currentSnapshot().snapshotId()).isEqualTo(firstSnapshotId); assertThat(base.snapshots()).hasSize(2); assertThat(base.currentSnapshot().allManifests(table.io())).hasSize(1); - assertThat(base.currentSnapshot().addedDataFiles(table.io())).hasSize(1); + assertThat(changes.addedDataFiles()).hasSize(1); assertThat(base.snapshotLog()) .as("Snapshot log should indicate number of snapshots committed") .hasSize(3); @@ -169,10 +173,12 @@ public void testRollbackOnInvalidNonAncestor() { .hasMessage("Cannot roll back to snapshot, not an ancestor of the current state: 2"); base = readMetadata(); + SnapshotChanges changes = + SnapshotChanges.changesFrom(base.currentSnapshot(), table.io(), table.specs()); assertThat(base.currentSnapshot().snapshotId()).isEqualTo(firstSnapshotId); assertThat(base.snapshots()).hasSize(2); assertThat(base.currentSnapshot().allManifests(table.io())).hasSize(1); - assertThat(base.currentSnapshot().addedDataFiles(table.io())).hasSize(1); + assertThat(changes.addedDataFiles()).hasSize(1); assertThat(base.snapshotLog()) .as("Snapshot log should indicate number of snapshots committed") .hasSize(1); @@ -264,12 +270,15 @@ public void testWithCherryPicking() { table.manageSnapshots().cherrypick(wapSnapshot.snapshotId()).commit(); base = readMetadata(); + SnapshotChanges changes = + SnapshotChanges.changesFrom(base.currentSnapshot(), table.io(), table.specs()); + // check if the effective current snapshot is set to the new snapshot created // as a result of the cherry-pick operation assertThat(base.currentSnapshot().snapshotId()).isEqualTo(wapSnapshot.snapshotId()); assertThat(base.snapshots()).hasSize(2); assertThat(base.currentSnapshot().allManifests(table.io())).hasSize(2); - assertThat(base.currentSnapshot().addedDataFiles(table.io())).hasSize(1); + assertThat(changes.addedDataFiles()).hasSize(1); assertThat(base.snapshotLog()) .as("Snapshot log should indicate number of snapshots committed") .hasSize(2); @@ -311,11 +320,14 @@ public void testWithTwoPhaseCherryPicking() { table.manageSnapshots().cherrypick(wap1Snapshot.snapshotId()).commit(); base = readMetadata(); + SnapshotChanges changes = + SnapshotChanges.changesFrom(base.currentSnapshot(), table.io(), table.specs()); + // check if the effective current snapshot is set to the new snapshot created // as a result of the cherry-pick operation assertThat(base.currentSnapshot().snapshotId()).isEqualTo(parentSnapshot.snapshotId() + 1); assertThat(base.currentSnapshot().allManifests(table.io())).hasSize(2); - assertThat(base.currentSnapshot().addedDataFiles(table.io())).hasSize(1); + assertThat(changes.addedDataFiles()).hasSize(1); assertThat(base.currentSnapshot().parentId()) .as("Parent snapshot id should change to latest snapshot before commit") .isEqualTo(parentSnapshot.snapshotId()); @@ -329,12 +341,14 @@ public void testWithTwoPhaseCherryPicking() { table.manageSnapshots().cherrypick(wap2Snapshot.snapshotId()).commit(); base = readMetadata(); + changes = SnapshotChanges.changesFrom(base.currentSnapshot(), table.io(), table.specs()); + // check if the effective current snapshot is set to the new snapshot created // as a result of the cherry-pick operation assertThat(base.currentSnapshot().snapshotId()) .isEqualTo(parentSnapshot.snapshotId() + 1 /* one fast-forwarded snapshot */ + 1); assertThat(base.currentSnapshot().allManifests(table.io())).hasSize(3); - assertThat(base.currentSnapshot().addedDataFiles(table.io())).hasSize(1); + assertThat(changes.addedDataFiles()).hasSize(1); assertThat(base.currentSnapshot().parentId()) .as("Parent snapshot id should change to latest snapshot before commit") .isEqualTo(parentSnapshot.snapshotId()); @@ -392,12 +406,15 @@ public void testWithCommitsBetweenCherryPicking() { table.manageSnapshots().cherrypick(wap1Snapshot.snapshotId()).commit(); base = readMetadata(); + SnapshotChanges changes = + SnapshotChanges.changesFrom(base.currentSnapshot(), table.io(), table.specs()); + // check if the effective current snapshot is set to the new snapshot created // as a result of the cherry-pick operation assertThat(base.snapshots()).hasSize(5); assertThat(base.currentSnapshot().snapshotId()).isEqualTo(parentSnapshot.snapshotId() + 1); assertThat(base.currentSnapshot().allManifests(table.io())).hasSize(3); - assertThat(base.currentSnapshot().addedDataFiles(table.io())).hasSize(1); + assertThat(changes.addedDataFiles()).hasSize(1); assertThat(base.currentSnapshot().parentId()).isEqualTo(parentSnapshot.snapshotId()); assertThat(base.snapshotLog()) .as("Snapshot log should indicate number of snapshots committed") @@ -409,12 +426,14 @@ public void testWithCommitsBetweenCherryPicking() { table.manageSnapshots().cherrypick(wap2Snapshot.snapshotId()).commit(); base = readMetadata(); + changes = SnapshotChanges.changesFrom(base.currentSnapshot(), table.io(), table.specs()); + // check if the effective current snapshot is set to the new snapshot created // as a result of the cherry-pick operation assertThat(base.snapshots()).hasSize(6); assertThat(base.currentSnapshot().snapshotId()).isEqualTo(parentSnapshot.snapshotId() + 1); assertThat(base.currentSnapshot().allManifests(table.io())).hasSize(4); - assertThat(base.currentSnapshot().addedDataFiles(table.io())).hasSize(1); + assertThat(changes.addedDataFiles()).hasSize(1); assertThat(base.currentSnapshot().parentId()).isEqualTo(parentSnapshot.snapshotId()); assertThat(base.snapshotLog()) .as("Snapshot log should indicate number of snapshots committed") @@ -454,11 +473,14 @@ public void testWithCherryPickingWithCommitRetry() { table.manageSnapshots().cherrypick(wap1Snapshot.snapshotId()).commit(); base = readMetadata(); + SnapshotChanges changes = + SnapshotChanges.changesFrom(base.currentSnapshot(), table.io(), table.specs()); + // check if the effective current snapshot is set to the new snapshot created // as a result of the cherry-pick operation assertThat(base.currentSnapshot().snapshotId()).isEqualTo(parentSnapshot.snapshotId() + 1); assertThat(base.currentSnapshot().allManifests(table.io())).hasSize(2); - assertThat(base.currentSnapshot().addedDataFiles(table.io())).hasSize(1); + assertThat(changes.addedDataFiles()).hasSize(1); assertThat(base.currentSnapshot().parentId()).isEqualTo(parentSnapshot.snapshotId()); assertThat(base.snapshotLog()) .as("Snapshot log should indicate number of snapshots committed") @@ -491,12 +513,15 @@ public void testCherrypickingAncestor() { base = readMetadata(); long wapPublishedId = table.currentSnapshot().snapshotId(); + SnapshotChanges changes = + SnapshotChanges.changesFrom(base.currentSnapshot(), table.io(), table.specs()); + // check if the effective current snapshot is set to the new snapshot created // as a result of the cherry-pick operation assertThat(base.currentSnapshot().snapshotId()).isEqualTo(wapPublishedId); assertThat(base.snapshots()).hasSize(2); assertThat(base.currentSnapshot().allManifests(table.io())).hasSize(2); - assertThat(base.currentSnapshot().addedDataFiles(table.io())).hasSize(1); + assertThat(changes.addedDataFiles()).hasSize(1); assertThat(base.snapshotLog()) .as("Snapshot log should indicate number of snapshots committed") .hasSize(2); @@ -537,8 +562,10 @@ public void testDuplicateCherrypick() { base = readMetadata(); assertThat(base.snapshots()).hasSize(3); + SnapshotChanges changes = + SnapshotChanges.changesFrom(base.currentSnapshot(), table.io(), table.specs()); assertThat(base.currentSnapshot().allManifests(table.io())).hasSize(2); - assertThat(base.currentSnapshot().addedDataFiles(table.io())).hasSize(1); + assertThat(changes.addedDataFiles()).hasSize(1); assertThat(base.snapshotLog()) .as("Snapshot log should indicate number of snapshots committed") .hasSize(2); diff --git a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java index 6d4be7671157..901f8080ff1e 100644 --- a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java @@ -34,7 +34,6 @@ public static EncryptionManager createEncryptionManager() { CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName()); Map tableProperties = Maps.newHashMap(); tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, UnitestKMS.MASTER_KEY_NAME1); - tableProperties.put(TableProperties.FORMAT_VERSION, "2"); return EncryptionUtil.createEncryptionManager( List.of(), tableProperties, EncryptionUtil.createKmsClient(catalogProperties)); diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java index e4901f4e8cca..df03d28d5a4b 100644 --- a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java +++ b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java @@ -29,6 +29,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; +import org.apache.iceberg.InternalData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.PartitionStats; @@ -39,7 +40,6 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.avro.InternalReader; import org.apache.iceberg.data.parquet.InternalWriter; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DataWriter; @@ -230,24 +230,7 @@ private static CloseableIterable dataReader(Schema schema, InputFile Preconditions.checkArgument( fileFormat != null, "Unable to determine format of file: %s", inputFile.location()); - switch (fileFormat) { - case PARQUET: - return Parquet.read(inputFile) - .project(schema) - .createReaderFunc( - fileSchema -> - org.apache.iceberg.data.parquet.InternalReader.create(schema, fileSchema)) - .build(); - case AVRO: - return Avro.read(inputFile) - .project(schema) - .createReaderFunc(fileSchema -> InternalReader.create(schema)) - .build(); - case ORC: - // Internal readers are not supported for ORC yet. - default: - throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name()); - } + return InternalData.read(fileFormat, inputFile).project(schema).build(); } private static PartitionStats recordToPartitionStats(StructLike record) { diff --git a/gradle.properties b/gradle.properties index 5da56c59de41..28ebd60d7c38 100644 --- a/gradle.properties +++ b/gradle.properties @@ -18,7 +18,7 @@ jmhJsonOutputPath=build/reports/jmh/results.json jmhIncludeRegex=.* systemProp.defaultFlinkVersions=2.0 systemProp.knownFlinkVersions=1.19,1.20,2.0 -systemProp.defaultSparkVersions=4.0 +systemProp.defaultSparkVersions=3.4,3.5,4.0 systemProp.knownSparkVersions=3.4,3.5,4.0 systemProp.defaultKafkaVersions=3 systemProp.knownKafkaVersions=3 diff --git a/parquet/src/main/java/org/apache/iceberg/InternalParquet.java b/parquet/src/main/java/org/apache/iceberg/InternalParquet.java index 54fac0c55179..4b5910e0a71c 100644 --- a/parquet/src/main/java/org/apache/iceberg/InternalParquet.java +++ b/parquet/src/main/java/org/apache/iceberg/InternalParquet.java @@ -37,6 +37,6 @@ private static Parquet.WriteBuilder writeInternal(OutputFile outputFile) { } private static Parquet.ReadBuilder readInternal(InputFile inputFile) { - return Parquet.read(inputFile).createReaderFunc(InternalReader::create); + return Parquet.read(inputFile).createInternalReader(new InternalReader<>()); } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 8f2957e1c60d..a71852e1a7f9 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import javax.annotation.Nullable; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; @@ -53,6 +54,8 @@ import org.apache.parquet.schema.Type; abstract class BaseParquetReaders { + public static final int ROOT_ID = -1; + protected BaseParquetReaders() {} protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) { @@ -78,6 +81,17 @@ protected ParquetValueReader createReader( protected abstract ParquetValueReader createStructReader( List> fieldReaders, Types.StructType structType); + // This method can be overridden to provide a custom implementation which also uses the fieldId of + // the struct like + // in the case of InternalReader which can read into a custom StructLike type. + protected ParquetValueReader createStructReader( + List> fieldReaders, + Types.StructType structType, + @Nullable Integer fieldId) { + // Fallback to the signature without fieldId if not overridden + return createStructReader(fieldReaders, structType); + } + protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); protected abstract ParquetValueReader dateReader(ColumnDescriptor desc); @@ -118,8 +132,8 @@ public ParquetValueReader struct( newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader)); } } - - return createStructReader(newFields, expected); + Integer id = struct.getId() != null ? struct.getId().intValue() : null; + return createStructReader(newFields, expected, id); } } @@ -216,14 +230,17 @@ private ReadBuilder(MessageType type, Map idToConstant) { @Override public ParquetValueReader message( Types.StructType expected, MessageType message, List> fieldReaders) { - return struct(expected, message.asGroupType(), fieldReaders); + // We need to mark the root struct, otherwise our visitor can't differentiate between the root + // and nested structs. + return struct(expected, message.asGroupType().withId(ROOT_ID), fieldReaders); } @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { if (null == expected) { - return createStructReader(ImmutableList.of(), null); + Integer id = struct.getId() != null ? struct.getId().intValue() : null; + return createStructReader(ImmutableList.of(), null, id); } // match the expected struct's order @@ -252,7 +269,8 @@ public ParquetValueReader struct( reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); } - return createStructReader(reorderedFields, expected); + Integer id = struct.getId() != null ? struct.getId().intValue() : null; + return createStructReader(reorderedFields, expected, id); } private ParquetValueReader defaultReader( diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java index 03585c55c9b6..f5e5eec25788 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.types.Types.StructType; @@ -30,9 +31,26 @@ public class InternalReader extends BaseParquetReaders { + private Class rootType = Record.class; + private Map> typesById = new java.util.HashMap<>(); + private static final InternalReader INSTANCE = new InternalReader<>(); - private InternalReader() {} + public InternalReader() {} + + @Override + protected ParquetValueReader createStructReader( + List> fieldReaders, StructType structType) { + throw new UnsupportedOperationException( + "createStructReader(List>, StructType) is not supported because " + + "InternalReader needs the fieldId to determine the type of struct to return"); + } + + @SuppressWarnings("unchecked") + public static ParquetValueReader create( + Schema expectedSchema, MessageType fileSchema, Map idToConstant) { + return (ParquetValueReader) INSTANCE.createReader(expectedSchema, fileSchema, idToConstant); + } @SuppressWarnings("unchecked") public static ParquetValueReader create( @@ -41,16 +59,18 @@ public static ParquetValueReader create( } @SuppressWarnings("unchecked") - public static ParquetValueReader create( - Schema expectedSchema, MessageType fileSchema, Map idToConstant) { - return (ParquetValueReader) INSTANCE.createReader(expectedSchema, fileSchema, idToConstant); + public ParquetValueReader reader( + Schema expectedSchema, MessageType fileSchema) { + return (ParquetValueReader) createReader(expectedSchema, fileSchema); } @Override @SuppressWarnings("unchecked") protected ParquetValueReader createStructReader( - List> fieldReaders, StructType structType) { - return (ParquetValueReader) ParquetValueReaders.recordReader(fieldReaders, structType); + List> fieldReaders, StructType structType, Integer fieldId) { + return (ParquetValueReader) + ParquetValueReaders.structLikeReader( + fieldReaders, structType, typesById.getOrDefault(fieldId, rootType)); } @Override @@ -72,4 +92,8 @@ protected ParquetValueReader timeReader(ColumnDescriptor desc) { protected ParquetValueReader timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) { return ParquetValueReaders.timestamps(desc); } + + public void setCustomType(int fieldId, Class structClass) { + this.typesById.put(fieldId, structClass); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 6f68fbe150ff..c87e5ca82849 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -75,6 +75,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.data.parquet.InternalReader; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -1162,6 +1163,8 @@ public static class ReadBuilder implements InternalData.ReadBuilder { private ByteBuffer fileEncryptionKey = null; private ByteBuffer fileAADPrefix = null; + private InternalReader internalReader; + private ReadBuilder(InputFile file) { this.file = file; } @@ -1238,6 +1241,11 @@ public ReadBuilder createReaderFunc( return this; } + public ReadBuilder createInternalReader(InternalReader newInternalReader) { + this.internalReader = newInternalReader; + return this; + } + public ReadBuilder createBatchedReaderFunc(Function> func) { Preconditions.checkArgument( this.readerFunc == null, @@ -1281,12 +1289,18 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) { @Override public ReadBuilder setRootType(Class rootClass) { - throw new UnsupportedOperationException("Custom types are not yet supported"); + Preconditions.checkArgument( + this.internalReader != null, "Cannot set Custom Root Type: InternalReader not set"); + internalReader.setCustomType(-1, rootClass); + return this; } @Override public ReadBuilder setCustomType(int fieldId, Class structClass) { - throw new UnsupportedOperationException("Custom types are not yet supported"); + Preconditions.checkArgument( + this.internalReader != null, "Cannot set Custom Type: InternalReader not set"); + internalReader.setCustomType(fieldId, structClass); + return this; } public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { @@ -1315,7 +1329,10 @@ public CloseableIterable build() { Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); } - if (readerFunc != null || readerFuncWithSchema != null || batchedReaderFunc != null) { + if (readerFunc != null + || readerFuncWithSchema != null + || batchedReaderFunc != null + || internalReader != null) { ParquetReadOptions.Builder optionsBuilder; if (file instanceof HadoopInputFile) { // remove read properties already set that may conflict with this read @@ -1363,10 +1380,15 @@ public CloseableIterable build() { caseSensitive, maxRecordsPerBatch); } else { - Function> readBuilder = - readerFuncWithSchema != null - ? fileType -> readerFuncWithSchema.apply(schema, fileType) - : readerFunc; + Function> readBuilder; + if (internalReader != null) { + readBuilder = fileType -> internalReader.reader(schema, fileType); + } else { + readBuilder = + readerFuncWithSchema != null + ? fileType -> readerFuncWithSchema.apply(schema, fileType) + : readerFunc; + } return new org.apache.iceberg.parquet.ParquetReader<>( file, schema, options, readBuilder, mapping, filter, reuseContainers, caseSensitive); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index a5689bf43902..8aa9aa4779d9 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.UUID; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -189,6 +191,16 @@ public static ParquetValueReader int96Timestamps(ColumnDescriptor desc) { return new TimestampInt96Reader(desc); } + @SuppressWarnings("unchecked") + public static ParquetValueReader structLikeReader( + List> readers, Types.StructType struct, Class structClass) { + if (structClass.equals(Record.class)) { + return ((ParquetValueReader) recordReader(readers, struct)); + } else { + return new StructLikeReader<>(readers, struct, structClass); + } + } + public static ParquetValueReader recordReader( List> readers, Types.StructType struct) { return new RecordReader(readers, struct); @@ -1132,6 +1144,46 @@ private TripleIterator firstNonNullColumn(List> columns) { } } + private static class StructLikeReader extends StructReader { + private final Types.StructType struct; + private final DynConstructors.Ctor ctor; + + StructLikeReader( + List> readers, Types.StructType struct, Class structLikeClass) { + super(readers); + this.struct = struct; + this.ctor = + DynConstructors.builder(StructLike.class) + .hiddenImpl(structLikeClass, Types.StructType.class) + .hiddenImpl(structLikeClass) + .build(); + } + + @Override + protected T newStructData(T reuse) { + if (reuse != null) { + return reuse; + } else { + return ctor.newInstance(struct); + } + } + + @Override + protected Object getField(T intermediate, int pos) { + return intermediate.get(pos, Object.class); + } + + @Override + protected T buildStruct(T s) { + return s; + } + + @Override + protected void set(T s, int pos, Object value) { + s.set(pos, value); + } + } + private static class RecordReader extends StructReader { private final GenericRecord template; diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java index 754f0c9ee384..b9bba8d41e09 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantWriters.java @@ -26,11 +26,12 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.InternalData; import org.apache.iceberg.InternalTestHelpers; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.InternalReader; import org.apache.iceberg.data.parquet.InternalWriter; import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.CloseableIterable; @@ -249,10 +250,7 @@ private static List writeAndRead( } try (CloseableIterable reader = - Parquet.read(outputFile.toInputFile()) - .project(SCHEMA) - .createReaderFunc(fileSchema -> InternalReader.create(SCHEMA, fileSchema)) - .build()) { + InternalData.read(FileFormat.PARQUET, outputFile.toInputFile()).project(SCHEMA).build()) { return Lists.newArrayList(reader); } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java index bd4a41593c34..1b6527d3b043 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java @@ -162,4 +162,46 @@ public void testTruncate() { ImmutableList.of(), sql("SELECT * FROM %s ORDER BY id", tableName)); } + + @TestTemplate + public void testDeleteFromDatePartitionedTable() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id bigint, data string, date string) " + + "USING iceberg " + + "PARTITIONED BY (date)", + tableName); + + List records = + Lists.newArrayList( + row(1L, "a", "2024-08-01"), + row(2L, "b", "2024-08-02"), + row(3L, "c", "2024-08-03"), + row(4L, "d", "2024-08-04"), + row(5L, "e", "2024-08-05")); + + for (Object[] record : records) { + sql( + "INSERT INTO %s (id, data, date) VALUES (%s, '%s', '%s')", + tableName, record[0], record[1], record[2]); + } + + assertEquals( + "Should have all rows before delete", + records, + sql("SELECT * FROM %s ORDER BY id", tableName)); + + sql("DELETE FROM %s WHERE to_date(date, 'yyyy-MM-dd') = DATE('2024-08-04')", tableName); + + List expectedAfterDelete = + Lists.newArrayList( + row(1L, "a", "2024-08-01"), + row(2L, "b", "2024-08-02"), + row(3L, "c", "2024-08-03"), + row(5L, "e", "2024-08-05")); + + assertEquals( + "Should have all rows except 2024-08-04 partition", + expectedAfterDelete, + sql("SELECT * FROM %s ORDER BY id", tableName)); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java index bd4a41593c34..1b6527d3b043 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java @@ -162,4 +162,46 @@ public void testTruncate() { ImmutableList.of(), sql("SELECT * FROM %s ORDER BY id", tableName)); } + + @TestTemplate + public void testDeleteFromDatePartitionedTable() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id bigint, data string, date string) " + + "USING iceberg " + + "PARTITIONED BY (date)", + tableName); + + List records = + Lists.newArrayList( + row(1L, "a", "2024-08-01"), + row(2L, "b", "2024-08-02"), + row(3L, "c", "2024-08-03"), + row(4L, "d", "2024-08-04"), + row(5L, "e", "2024-08-05")); + + for (Object[] record : records) { + sql( + "INSERT INTO %s (id, data, date) VALUES (%s, '%s', '%s')", + tableName, record[0], record[1], record[2]); + } + + assertEquals( + "Should have all rows before delete", + records, + sql("SELECT * FROM %s ORDER BY id", tableName)); + + sql("DELETE FROM %s WHERE to_date(date, 'yyyy-MM-dd') = DATE('2024-08-04')", tableName); + + List expectedAfterDelete = + Lists.newArrayList( + row(1L, "a", "2024-08-01"), + row(2L, "b", "2024-08-02"), + row(3L, "c", "2024-08-03"), + row(5L, "e", "2024-08-05")); + + assertEquals( + "Should have all rows except 2024-08-04 partition", + expectedAfterDelete, + sql("SELECT * FROM %s ORDER BY id", tableName)); + } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java index bd4a41593c34..1b6527d3b043 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java @@ -162,4 +162,46 @@ public void testTruncate() { ImmutableList.of(), sql("SELECT * FROM %s ORDER BY id", tableName)); } + + @TestTemplate + public void testDeleteFromDatePartitionedTable() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id bigint, data string, date string) " + + "USING iceberg " + + "PARTITIONED BY (date)", + tableName); + + List records = + Lists.newArrayList( + row(1L, "a", "2024-08-01"), + row(2L, "b", "2024-08-02"), + row(3L, "c", "2024-08-03"), + row(4L, "d", "2024-08-04"), + row(5L, "e", "2024-08-05")); + + for (Object[] record : records) { + sql( + "INSERT INTO %s (id, data, date) VALUES (%s, '%s', '%s')", + tableName, record[0], record[1], record[2]); + } + + assertEquals( + "Should have all rows before delete", + records, + sql("SELECT * FROM %s ORDER BY id", tableName)); + + sql("DELETE FROM %s WHERE to_date(date, 'yyyy-MM-dd') = DATE('2024-08-04')", tableName); + + List expectedAfterDelete = + Lists.newArrayList( + row(1L, "a", "2024-08-01"), + row(2L, "b", "2024-08-02"), + row(3L, "c", "2024-08-03"), + row(5L, "e", "2024-08-05")); + + assertEquals( + "Should have all rows except 2024-08-04 partition", + expectedAfterDelete, + sql("SELECT * FROM %s ORDER BY id", tableName)); + } }