Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,37 @@ private <T extends ContentFile<T>> PartitionSpec readPartitionSpec(InputFile inp
}

private static <T extends ContentFile<T>> Map<String, String> readMetadata(InputFile inputFile) {
Map<String, String> metadata;
try {
try (CloseableIterable<ManifestEntry<T>> headerReader =
InternalData.read(FileFormat.AVRO, inputFile)
.project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
.build()) {

if (headerReader instanceof AvroIterable) {
metadata = ((AvroIterable<ManifestEntry<T>>) headerReader).getMetadata();
} else {
throw new RuntimeException(
"Reader does not support metadata reading: " + headerReader.getClass().getName());
}
FileFormat format = FileFormat.fromFileName(inputFile.location());
if (format == null) {
// Default to Avro for backward compatibility
format = FileFormat.AVRO;
}

try (CloseableIterable<ManifestEntry<T>> headerReader =
InternalData.read(format, inputFile)
.project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
.build()) {

if (headerReader instanceof AvroIterable) {
return ((AvroIterable<ManifestEntry<T>>) headerReader).getMetadata();
} else {
return extractMetadataViaReflection(headerReader);
}
} catch (IOException e) {
throw new RuntimeIOException(e);
}
return metadata;
}

private static <T extends ContentFile<T>> Map<String, String> extractMetadataViaReflection(
CloseableIterable<ManifestEntry<T>> reader) {
// For Parquet and other formats, use reflection to call getMetadata if available
try {
java.lang.reflect.Method getMetadataMethod = reader.getClass().getMethod("getMetadata");
return (Map<String, String>) getMetadataMethod.invoke(reader);
} catch (Exception e) {
throw new RuntimeException(
"Reader does not support metadata reading: " + reader.getClass().getName(), e);
}
}

public boolean isDeleteManifestReader() {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ protected FileAppender<ManifestEntry<DataFile>> newAppender(
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
try {
return InternalData.write(FileFormat.AVRO, file)
return InternalData.write(FileFormat.PARQUET, file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
Expand Down Expand Up @@ -288,7 +288,7 @@ protected FileAppender<ManifestEntry<DeleteFile>> newAppender(
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
try {
return InternalData.write(FileFormat.AVRO, file)
return InternalData.write(FileFormat.PARQUET, file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,12 @@ protected OutputFile manifestListPath() {
}

protected EncryptedOutputFile newManifestOutputFile() {
// Use Parquet for v4 manifests, Avro for earlier versions
FileFormat manifestFormat =
ops.current().formatVersion() >= 4 ? FileFormat.PARQUET : FileFormat.AVRO;
String manifestFileLocation =
ops.metadataFileLocation(
FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()));
manifestFormat.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()));
return EncryptingFileIO.combine(ops.io(), ops.encryption())
.newEncryptingOutputFile(manifestFileLocation);
}
Expand Down
12 changes: 8 additions & 4 deletions core/src/test/java/org/apache/iceberg/TestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ ManifestFile writeManifest(DataFile... files) throws IOException {
}

ManifestFile writeManifest(Long snapshotId, DataFile... files) throws IOException {
File manifestFile = temp.resolve("input.m0.avro").toFile();
String extension = formatVersion >= 4 ? ".parquet" : ".avro";
File manifestFile = temp.resolve("input.m0" + extension).toFile();
assertThat(manifestFile).doesNotExist();
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());

Expand All @@ -299,7 +300,8 @@ ManifestFile writeManifest(String fileName, ManifestEntry<?>... entries) throws
}

ManifestFile writeManifest(Long snapshotId, ManifestEntry<?>... entries) throws IOException {
return writeManifest(snapshotId, "input.m0.avro", entries);
String extension = formatVersion >= 4 ? ".parquet" : ".avro";
return writeManifest(snapshotId, "input.m0" + extension, entries);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -333,9 +335,10 @@ <F extends ContentFile<F>> ManifestFile writeManifest(

ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFile... deleteFiles)
throws IOException {
FileFormat manifestFormat = newFormatVersion >= 4 ? FileFormat.PARQUET : FileFormat.AVRO;
OutputFile manifestFile =
org.apache.iceberg.Files.localOutput(
FileFormat.AVRO.addExtension(
manifestFormat.addExtension(
temp.resolve("junit" + System.nanoTime()).toFile().toString()));
ManifestWriter<DeleteFile> writer =
ManifestFiles.writeDeleteManifest(newFormatVersion, SPEC, manifestFile, snapshotId);
Expand All @@ -350,7 +353,8 @@ ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFi
}

ManifestFile writeManifestWithName(String name, DataFile... files) throws IOException {
File manifestFile = temp.resolve(name + ".avro").toFile();
String extension = formatVersion >= 4 ? ".parquet" : ".avro";
File manifestFile = temp.resolve(name + extension).toFile();
assertThat(manifestFile).doesNotExist();
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());

Expand Down
6 changes: 4 additions & 2 deletions core/src/test/java/org/apache/iceberg/TestTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,10 @@ public void testTransactionRewriteManifestsAppendedDirectly() throws IOException
List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
assertThat(manifests).hasSize(2);

String extension = formatVersion >= 4 ? ".parquet" : ".avro";
ManifestFile newManifest =
writeManifest(
"manifest-file-1.avro",
"manifest-file-1" + extension,
manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshotId, FILE_A),
manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshotId, FILE_B));

Expand Down Expand Up @@ -863,12 +864,13 @@ public void testOverwriteWithConcurrentManifestRewrite() throws IOException {
overwriteFiles.commit();

// cause the overwrite transaction commit to fail and retry
String extension = formatVersion >= 4 ? ".parquet" : ".avro";
RewriteManifests rewriteManifests =
table
.rewriteManifests()
.addManifest(
writeManifest(
"new_manifest.avro",
"new_manifest" + extension,
manifestEntry(Status.EXISTING, first.snapshotId(), FILE_A),
manifestEntry(Status.EXISTING, first.snapshotId(), FILE_A2),
manifestEntry(Status.EXISTING, second.snapshotId(), FILE_B)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,7 @@ public <D> CloseableIterable<D> build() {
builder.withDecryption(fileDecryptionProperties);
}

return new ParquetIterable<>(builder);
return new ParquetIterable<>(builder, file);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,27 @@

public class ParquetIterable<T> extends CloseableGroup implements CloseableIterable<T> {
private final ParquetReader.Builder<T> builder;
private final org.apache.iceberg.io.InputFile inputFile;
private java.util.Map<String, String> metadata = null;

ParquetIterable(ParquetReader.Builder<T> builder) {
ParquetIterable(ParquetReader.Builder<T> builder, org.apache.iceberg.io.InputFile inputFile) {
this.builder = builder;
this.inputFile = inputFile;
}

public java.util.Map<String, String> getMetadata() {
if (metadata == null) {
try {
org.apache.parquet.hadoop.ParquetFileReader reader =
org.apache.parquet.hadoop.ParquetFileReader.open(
org.apache.iceberg.parquet.ParquetIO.file(inputFile));
metadata = reader.getFooter().getFileMetaData().getKeyValueMetaData();
reader.close();
} catch (java.io.IOException e) {
throw new org.apache.iceberg.exceptions.RuntimeIOException(e);
}
}
return metadata;
}

@Override
Expand Down
Loading