diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 40ea17aaa592..15beded07bdc 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -146,24 +146,37 @@ private > PartitionSpec readPartitionSpec(InputFile inp } private static > Map readMetadata(InputFile inputFile) { - Map metadata; - try { - try (CloseableIterable> headerReader = - InternalData.read(FileFormat.AVRO, inputFile) - .project(ManifestEntry.getSchema(Types.StructType.of()).select("status")) - .build()) { - - if (headerReader instanceof AvroIterable) { - metadata = ((AvroIterable>) headerReader).getMetadata(); - } else { - throw new RuntimeException( - "Reader does not support metadata reading: " + headerReader.getClass().getName()); - } + FileFormat format = FileFormat.fromFileName(inputFile.location()); + if (format == null) { + // Default to Avro for backward compatibility + format = FileFormat.AVRO; + } + + try (CloseableIterable> headerReader = + InternalData.read(format, inputFile) + .project(ManifestEntry.getSchema(Types.StructType.of()).select("status")) + .build()) { + + if (headerReader instanceof AvroIterable) { + return ((AvroIterable>) headerReader).getMetadata(); + } else { + return extractMetadataViaReflection(headerReader); } } catch (IOException e) { throw new RuntimeIOException(e); } - return metadata; + } + + private static > Map extractMetadataViaReflection( + CloseableIterable> reader) { + // For Parquet and other formats, use reflection to call getMetadata if available + try { + java.lang.reflect.Method getMetadataMethod = reader.getClass().getMethod("getMetadata"); + return (Map) getMetadataMethod.invoke(reader); + } catch (Exception e) { + throw new RuntimeException( + "Reader does not support metadata reading: " + reader.getClass().getName(), e); + } } public boolean isDeleteManifestReader() { diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 43b8e3ed7095..39fae95a8bf7 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -253,7 +253,7 @@ protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType()); try { - return InternalData.write(FileFormat.AVRO, file) + return InternalData.write(FileFormat.PARQUET, file) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) @@ -288,7 +288,7 @@ protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType()); try { - return InternalData.write(FileFormat.AVRO, file) + return InternalData.write(FileFormat.PARQUET, file) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index d11f466434ec..12c2d6c85c53 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -560,9 +560,12 @@ protected OutputFile manifestListPath() { } protected EncryptedOutputFile newManifestOutputFile() { + // Use Parquet for v4 manifests, Avro for earlier versions + FileFormat manifestFormat = + ops.current().formatVersion() >= 4 ? FileFormat.PARQUET : FileFormat.AVRO; String manifestFileLocation = ops.metadataFileLocation( - FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())); + manifestFormat.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())); return EncryptingFileIO.combine(ops.io(), ops.encryption()) .newEncryptingOutputFile(manifestFileLocation); } diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index 2b91e212627d..d86ebd867056 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -277,7 +277,8 @@ ManifestFile writeManifest(DataFile... files) throws IOException { } ManifestFile writeManifest(Long snapshotId, DataFile... files) throws IOException { - File manifestFile = temp.resolve("input.m0.avro").toFile(); + String extension = formatVersion >= 4 ? ".parquet" : ".avro"; + File manifestFile = temp.resolve("input.m0" + extension).toFile(); assertThat(manifestFile).doesNotExist(); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); @@ -299,7 +300,8 @@ ManifestFile writeManifest(String fileName, ManifestEntry... entries) throws } ManifestFile writeManifest(Long snapshotId, ManifestEntry... entries) throws IOException { - return writeManifest(snapshotId, "input.m0.avro", entries); + String extension = formatVersion >= 4 ? ".parquet" : ".avro"; + return writeManifest(snapshotId, "input.m0" + extension, entries); } @SuppressWarnings("unchecked") @@ -333,9 +335,10 @@ > ManifestFile writeManifest( ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFile... deleteFiles) throws IOException { + FileFormat manifestFormat = newFormatVersion >= 4 ? FileFormat.PARQUET : FileFormat.AVRO; OutputFile manifestFile = org.apache.iceberg.Files.localOutput( - FileFormat.AVRO.addExtension( + manifestFormat.addExtension( temp.resolve("junit" + System.nanoTime()).toFile().toString())); ManifestWriter writer = ManifestFiles.writeDeleteManifest(newFormatVersion, SPEC, manifestFile, snapshotId); @@ -350,7 +353,8 @@ ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFi } ManifestFile writeManifestWithName(String name, DataFile... files) throws IOException { - File manifestFile = temp.resolve(name + ".avro").toFile(); + String extension = formatVersion >= 4 ? ".parquet" : ".avro"; + File manifestFile = temp.resolve(name + extension).toFile(); assertThat(manifestFile).doesNotExist(); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index 7715c045bd9e..5eb5317eb619 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -652,9 +652,10 @@ public void testTransactionRewriteManifestsAppendedDirectly() throws IOException List manifests = table.currentSnapshot().allManifests(table.io()); assertThat(manifests).hasSize(2); + String extension = formatVersion >= 4 ? ".parquet" : ".avro"; ManifestFile newManifest = writeManifest( - "manifest-file-1.avro", + "manifest-file-1" + extension, manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshotId, FILE_A), manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshotId, FILE_B)); @@ -863,12 +864,13 @@ public void testOverwriteWithConcurrentManifestRewrite() throws IOException { overwriteFiles.commit(); // cause the overwrite transaction commit to fail and retry + String extension = formatVersion >= 4 ? ".parquet" : ".avro"; RewriteManifests rewriteManifests = table .rewriteManifests() .addManifest( writeManifest( - "new_manifest.avro", + "new_manifest" + extension, manifestEntry(Status.EXISTING, first.snapshotId(), FILE_A), manifestEntry(Status.EXISTING, first.snapshotId(), FILE_A2), manifestEntry(Status.EXISTING, second.snapshotId(), FILE_B))) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 2b2e460ee994..551f9af3a283 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1533,7 +1533,7 @@ public CloseableIterable build() { builder.withDecryption(fileDecryptionProperties); } - return new ParquetIterable<>(builder); + return new ParquetIterable<>(builder, file); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java index ac4e5c1f97ed..450ead86d897 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java @@ -28,9 +28,27 @@ public class ParquetIterable extends CloseableGroup implements CloseableIterable { private final ParquetReader.Builder builder; + private final org.apache.iceberg.io.InputFile inputFile; + private java.util.Map metadata = null; - ParquetIterable(ParquetReader.Builder builder) { + ParquetIterable(ParquetReader.Builder builder, org.apache.iceberg.io.InputFile inputFile) { this.builder = builder; + this.inputFile = inputFile; + } + + public java.util.Map getMetadata() { + if (metadata == null) { + try { + org.apache.parquet.hadoop.ParquetFileReader reader = + org.apache.parquet.hadoop.ParquetFileReader.open( + org.apache.iceberg.parquet.ParquetIO.file(inputFile)); + metadata = reader.getFooter().getFileMetaData().getKeyValueMetaData(); + reader.close(); + } catch (java.io.IOException e) { + throw new org.apache.iceberg.exceptions.RuntimeIOException(e); + } + } + return metadata; } @Override