Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,45 @@ acceptedBreaks:
\ java.util.List<org.apache.iceberg.PositionDeletesScanTask>)"
justification: "Removing deprecations for 1.10.0"
org.apache.iceberg:iceberg-parquet:
- code: "java.annotation.added"
old: "parameter org.apache.iceberg.parquet.ParquetValueReader<org.apache.iceberg.data.Record>\
\ org.apache.iceberg.data.parquet.GenericParquetReaders::createStructReader(java.util.List<org.apache.parquet.schema.Type>,\
\ java.util.List<org.apache.iceberg.parquet.ParquetValueReader<?>>, ===org.apache.iceberg.types.Types.StructType===)"
new: "parameter org.apache.iceberg.parquet.ParquetValueReader<T> org.apache.iceberg.data.parquet.BaseParquetReaders<T>::createStructReader(java.util.List<org.apache.iceberg.parquet.ParquetValueReader<?>>,\
\ org.apache.iceberg.types.Types.StructType, ===java.lang.Integer===) @ org.apache.iceberg.data.parquet.GenericParquetReaders"
justification: "Changes to Internal Reader Base Implementation"
- code: "java.method.parameterTypeChanged"
old: "parameter org.apache.iceberg.parquet.ParquetValueReader<T> org.apache.iceberg.data.parquet.InternalReader<T\
\ extends org.apache.iceberg.StructLike>::createStructReader(java.util.List<org.apache.parquet.schema.Type>,\
\ ===java.util.List<org.apache.iceberg.parquet.ParquetValueReader<?>>===,\
\ org.apache.iceberg.types.Types.StructType)"
new: "parameter org.apache.iceberg.parquet.ParquetValueReader<T> org.apache.iceberg.data.parquet.InternalReader<T\
\ extends org.apache.iceberg.StructLike>::createStructReader(java.util.List<org.apache.iceberg.parquet.ParquetValueReader<?>>,\
\ ===org.apache.iceberg.types.Types.StructType===, java.lang.Integer)"
justification: "Changes to Internal Reader Base Implementation"
- code: "java.method.parameterTypeChanged"
old: "parameter org.apache.iceberg.parquet.ParquetValueReader<T> org.apache.iceberg.data.parquet.InternalReader<T\
\ extends org.apache.iceberg.StructLike>::createStructReader(java.util.List<org.apache.parquet.schema.Type>,\
\ java.util.List<org.apache.iceberg.parquet.ParquetValueReader<?>>, ===org.apache.iceberg.types.Types.StructType===)"
new: "parameter org.apache.iceberg.parquet.ParquetValueReader<T> org.apache.iceberg.data.parquet.InternalReader<T\
\ extends org.apache.iceberg.StructLike>::createStructReader(java.util.List<org.apache.iceberg.parquet.ParquetValueReader<?>>,\
\ org.apache.iceberg.types.Types.StructType, ===java.lang.Integer===)"
justification: "Changes to Internal Reader Base Implementation"
- code: "java.method.parameterTypeChanged"
old: "parameter org.apache.iceberg.parquet.ParquetValueReader<org.apache.iceberg.data.Record>\
\ org.apache.iceberg.data.parquet.GenericParquetReaders::createStructReader(java.util.List<org.apache.parquet.schema.Type>,\
\ ===java.util.List<org.apache.iceberg.parquet.ParquetValueReader<?>>===,\
\ org.apache.iceberg.types.Types.StructType)"
new: "parameter org.apache.iceberg.parquet.ParquetValueReader<T> org.apache.iceberg.data.parquet.BaseParquetReaders<T>::createStructReader(java.util.List<org.apache.iceberg.parquet.ParquetValueReader<?>>,\
\ ===org.apache.iceberg.types.Types.StructType===, java.lang.Integer) @ org.apache.iceberg.data.parquet.GenericParquetReaders"
justification: "Changes to Internal Reader Base Implementation"
- code: "java.method.parameterTypeChanged"
old: "parameter org.apache.iceberg.parquet.ParquetValueReader<org.apache.iceberg.data.Record>\
\ org.apache.iceberg.data.parquet.GenericParquetReaders::createStructReader(java.util.List<org.apache.parquet.schema.Type>,\
\ java.util.List<org.apache.iceberg.parquet.ParquetValueReader<?>>, ===org.apache.iceberg.types.Types.StructType===)"
new: "parameter org.apache.iceberg.parquet.ParquetValueReader<T> org.apache.iceberg.data.parquet.BaseParquetReaders<T>::createStructReader(java.util.List<org.apache.iceberg.parquet.ParquetValueReader<?>>,\
\ org.apache.iceberg.types.Types.StructType, ===java.lang.Integer===) @ org.apache.iceberg.data.parquet.GenericParquetReaders"
justification: "Changes to Internal Reader Base Implementation"
- code: "java.method.removed"
old: "method <T extends org.apache.iceberg.StructLike> org.apache.iceberg.parquet.ParquetValueWriter<T>\
\ org.apache.iceberg.data.parquet.InternalWriter<T extends org.apache.iceberg.StructLike>::create(org.apache.parquet.schema.MessageType)"
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ project(':iceberg-core') {
api project(':iceberg-api')
implementation project(':iceberg-common')
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
testRuntimeOnly project(':iceberg-parquet')
annotationProcessor libs.immutables.value
compileOnly libs.immutables.value

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,9 @@ public ByteBuffer keyMetadata() {
@Override
public List<Long> splitOffsets() {
if (hasWellDefinedOffsets()) {
return ArrayUtil.toUnmodifiableLongList(splitOffsets);
// We want to use this as a re-usable container so we can't have an immutable list here
// Fixes Find Files TestFindFiles
return ArrayUtil.toLongList(splitOffsets);
}

return null;
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/java/org/apache/iceberg/CherryPickOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.SnapshotChanges.changesFrom;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
Expand Down Expand Up @@ -71,6 +73,8 @@ public CherryPickOperation cherrypick(long snapshotId) {
ValidationException.check(
cherrypickSnapshot != null, "Cannot cherry-pick unknown snapshot ID: %s", snapshotId);

SnapshotChanges changes = SnapshotChanges.changesFrom(cherrypickSnapshot, io, specsById);

if (cherrypickSnapshot.operation().equals(DataOperations.APPEND)) {
// this property is set on target snapshot that will get published
String wapId = WapUtil.validateWapPublish(current, snapshotId);
Expand All @@ -82,7 +86,7 @@ public CherryPickOperation cherrypick(long snapshotId) {
set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(snapshotId));

// Pick modifications from the snapshot
for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) {
for (DataFile addedFile : changes.addedDataFiles()) {
add(addedFile);
}

Expand Down Expand Up @@ -114,13 +118,13 @@ public CherryPickOperation cherrypick(long snapshotId) {

// copy adds from the picked snapshot
this.replacedPartitions = PartitionSet.create(specsById);
for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) {
for (DataFile addedFile : changes.addedDataFiles()) {
add(addedFile);
replacedPartitions.add(addedFile.specId(), addedFile.partition());
}

// copy deletes from the picked snapshot
for (DataFile deletedFile : cherrypickSnapshot.removedDataFiles(io)) {
for (DataFile deletedFile : changes.removedDataFiles()) {
delete(deletedFile);
}

Expand Down
22 changes: 14 additions & 8 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -126,7 +127,13 @@ protected ManifestReader(
if (specsById != null) {
this.spec = specsById.get(specId);
} else {
this.spec = readPartitionSpec(file);
if (FileFormat.fromFileName(file.location()) == FileFormat.PARQUET) {
// We don't want to have to rely on touching footer information to read the partition spec
throw new UnsupportedOperationException(
"Reading partition spec from Parquet manifest files is not supported");
} else {
this.spec = readPartitionSpec(file);
}
}

this.fileSchema = new Schema(DataFile.getType(spec.rawPartitionType()).fields());
Expand All @@ -146,18 +153,17 @@ private <T extends ContentFile<T>> PartitionSpec readPartitionSpec(InputFile inp
}

private static <T extends ContentFile<T>> Map<String, String> readMetadata(InputFile inputFile) {
Map<String, String> metadata;
Map<String, String> metadata = Collections.emptyMap();
FileFormat manifestFormat = FileFormat.fromFileName(inputFile.location());

try {
try (CloseableIterable<ManifestEntry<T>> headerReader =
InternalData.read(FileFormat.AVRO, inputFile)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks a backwards compatibility, previously an input file didn't actually have to have an avro suffix

InternalData.read(manifestFormat, inputFile)
.project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
.build()) {

if (headerReader instanceof AvroIterable) {
metadata = ((AvroIterable<ManifestEntry<T>>) headerReader).getMetadata();
} else {
throw new RuntimeException(
"Reader does not support metadata reading: " + headerReader.getClass().getName());
if (manifestFormat == FileFormat.AVRO) {
metadata = ((AvroIterable) headerReader).getMetadata();
}
}
} catch (IOException e) {
Expand Down
34 changes: 31 additions & 3 deletions core/src/main/java/org/apache/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.NativeEncryptionOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -57,7 +58,7 @@ public abstract class ManifestWriter<F extends ContentFile<F>> implements FileAp

private ManifestWriter(
PartitionSpec spec, EncryptedOutputFile file, Long snapshotId, Long firstRowId) {
this.file = file.encryptingOutputFile();
this.file = outputFile(file);
this.specId = spec.specId();
this.writer = newAppender(spec, this.file);
this.snapshotId = snapshotId;
Expand All @@ -73,6 +74,15 @@ private ManifestWriter(
protected abstract FileAppender<ManifestEntry<F>> newAppender(
PartitionSpec spec, OutputFile outputFile);

/**
* Gets the actual OutputFile that will be used to write the manifest taking into account
* encryption if needed. V3 and earlier use AVRO so whole file encryption is invoked . V4+ use
* parquet so they pass through the native encryption output file if it is available.
*/
protected OutputFile outputFile(EncryptedOutputFile encryptedFile) {
return encryptedFile.encryptingOutputFile();
}

protected ManifestContent content() {
return ManifestContent.DATA;
}
Expand Down Expand Up @@ -229,6 +239,15 @@ static class V4Writer extends ManifestWriter<DataFile> {
this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
}

@Override
protected OutputFile outputFile(EncryptedOutputFile encryptedFile) {
if (encryptedFile instanceof NativeEncryptionOutputFile) {
return (NativeEncryptionOutputFile) encryptedFile;
} else {
return encryptedFile.encryptingOutputFile();
}
}

@Override
protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {
return entryWrapper.wrap(entry);
Expand All @@ -239,7 +258,7 @@ protected FileAppender<ManifestEntry<DataFile>> newAppender(
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
try {
return InternalData.write(FileFormat.AVRO, file)
return InternalData.write(FileFormat.PARQUET, file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
Expand All @@ -264,6 +283,15 @@ static class V4DeleteWriter extends ManifestWriter<DeleteFile> {
this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
}

@Override
protected OutputFile outputFile(EncryptedOutputFile encryptedFile) {
if (encryptedFile instanceof NativeEncryptionOutputFile) {
return (NativeEncryptionOutputFile) encryptedFile;
} else {
return encryptedFile.encryptingOutputFile();
}
}

@Override
protected ManifestEntry<DeleteFile> prepare(ManifestEntry<DeleteFile> entry) {
return entryWrapper.wrap(entry);
Expand All @@ -274,7 +302,7 @@ protected FileAppender<ManifestEntry<DeleteFile>> newAppender(
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
try {
return InternalData.write(FileFormat.AVRO, file)
return InternalData.write(FileFormat.PARQUET, file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
Expand Down
154 changes: 154 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotChanges.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

/**
* A utility class to work around the current Snapshot interface and load cahnges from V4 Manfiests
* - needs to be discussed and changed
*/
public class SnapshotChanges {

private final Snapshot snapshot;
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;

// Lazy Cache
private List<DataFile> addedDataFiles = null;
private List<DataFile> removedDataFiles = null;
private List<DeleteFile> addedDeleteFiles = null;
private List<DeleteFile> removedDeleteFiles = null;

private SnapshotChanges(Snapshot snapshot, FileIO io, Map<Integer, PartitionSpec> specsById) {
this.snapshot = snapshot;
this.io = io;
this.specsById = specsById;
}

public List<DataFile> addedDataFiles() {
if (addedDataFiles == null) {
cacheDataFileChanges();
}
return addedDataFiles;
}

public List<DataFile> removedDataFiles() {
if (removedDataFiles == null) {
cacheDataFileChanges();
}
return removedDataFiles;
}

public List<DeleteFile> addedDeleteFiles() {
if (addedDeleteFiles == null) {
cacheDeleteFileChanges();
}
return addedDeleteFiles;
}

public List<DeleteFile> removedDeleteFiles() {
if (removedDeleteFiles == null) {
cacheDeleteFileChanges();
}
return removedDeleteFiles;
}

private void cacheDataFileChanges() {
ImmutableList.Builder<DataFile> addedDataFileBuilder = ImmutableList.builder();
ImmutableList.Builder<DataFile> removedDataFileBuilder = ImmutableList.builder();

// read only manifests that were created by this snapshot
Iterable<ManifestFile> changedDataManifests =
Iterables.filter(
snapshot.dataManifests(io),
manifest -> Objects.equal(manifest.snapshotId(), snapshot.snapshotId()));
try (CloseableIterable<ManifestEntry<DataFile>> entries =
new ManifestGroup(io, changedDataManifests)
.specsById(specsById)
.ignoreExisting()
.entries()) {
for (ManifestEntry<DataFile> entry : entries) {
switch (entry.status()) {
case ADDED:
addedDataFileBuilder.add(entry.file().copy());
break;
case DELETED:
removedDataFileBuilder.add(entry.file().copyWithoutStats());
break;
default:
throw new IllegalStateException(
"Unexpected entry status, not added or deleted: " + entry);
}
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close entries while caching changes");
}

this.addedDataFiles = addedDataFileBuilder.build();
this.removedDataFiles = removedDataFileBuilder.build();
}

private void cacheDeleteFileChanges() {
ImmutableList.Builder<DeleteFile> addedDeleteFilesBuilder = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> removedDeleteFilesBuilder = ImmutableList.builder();

Iterable<ManifestFile> changedDeleteManifests =
Iterables.filter(
snapshot.deleteManifests(io),
manifest -> Objects.equal(manifest.snapshotId(), snapshot.snapshotId()));

for (ManifestFile manifest : changedDeleteManifests) {
try (ManifestReader<DeleteFile> reader =
ManifestFiles.readDeleteManifest(manifest, io, specsById)) {
for (ManifestEntry<DeleteFile> entry : reader.entries()) {
switch (entry.status()) {
case ADDED:
addedDeleteFilesBuilder.add(entry.file().copy());
break;
case DELETED:
removedDeleteFilesBuilder.add(entry.file().copyWithoutStats());
break;
default:
// ignore existing
}
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to close manifest reader", e);
}
}

this.addedDeleteFiles = addedDeleteFilesBuilder.build();
this.removedDeleteFiles = removedDeleteFilesBuilder.build();
}

public static SnapshotChanges changesFrom(
Snapshot snapshot, FileIO io, Map<Integer, PartitionSpec> specsById) {
return new SnapshotChanges(snapshot, io, specsById);
}
}
Loading
Loading