Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
.executeWith(ThreadPools.getWorkerPool())
.onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc))
.run(manifest -> {
try (ManifestReader reader = ManifestReader.read(manifest, io)) {
try (ManifestReader reader = ManifestFiles.read(manifest, io)) {
for (ManifestEntry entry : reader.entries()) {
// intern the file path because the weak key map uses identity (==) instead of equals
String path = entry.file().path().toString().intern();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,10 @@ public RewriteManifests addManifest(ManifestFile manifest) {
}

private ManifestFile copyManifest(ManifestFile manifest) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), specsById)) {
try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), specsById)) {
OutputFile newFile = newManifestOutput();
return ManifestWriter.copyManifest(reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES);
return ManifestFiles.copyManifest(
ops.current().formatVersion(), reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
}
Expand Down Expand Up @@ -237,7 +238,7 @@ private void performRewrite(List<ManifestFile> currentManifests) {
keptManifests.add(manifest);
} else {
rewrittenManifests.add(manifest);
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
FilteredManifest filteredManifest = reader.select(Arrays.asList("*"));
filteredManifest.liveEntries().forEach(
entry -> appendEntry(entry, clusterByFunc.apply(entry.file()), manifest.partitionSpecId())
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/DataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask {
@Override
public CloseableIterable<StructLike> rows() {
return CloseableIterable.transform(
ManifestReader.read(manifest, io).project(schema),
ManifestFiles.read(manifest, io).project(schema),
file -> (GenericDataFile) file);
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ public FastAppend appendManifest(ManifestFile manifest) {
}

private ManifestFile copyManifest(ManifestFile manifest) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
OutputFile newManifestPath = newManifestOutput();
return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder);
return ManifestFiles.copyAppendManifest(
ops.current().formatVersion(), reader, newManifestPath, snapshotId(), summaryBuilder);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
}
Expand Down
143 changes: 143 additions & 0 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;

public class ManifestFiles {
private ManifestFiles() {
}

/**
* Returns a new {@link ManifestReader} for a {@link ManifestFile}.
* <p>
* <em>Note:</em> Callers should use {@link ManifestFiles#read(ManifestFile, FileIO, Map)} to ensure
* the schema used by filters is the latest table schema. This should be used only when reading
* a manifest without filters.
*
* @param manifest a ManifestFile
* @param io a FileIO
* @return a manifest reader
*/
public static ManifestReader read(ManifestFile manifest, FileIO io) {
return read(manifest, io, null);
}

/**
* Returns a new {@link ManifestReader} for a {@link ManifestFile}.
*
* @param manifest a {@link ManifestFile}
* @param io a {@link FileIO}
* @param specsById a Map from spec ID to partition spec
* @return a {@link ManifestReader}
*/
public static ManifestReader read(ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) {
InputFile file = io.newInputFile(manifest.path());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader(file, specsById, inheritableMetadata);
}

/**
* Create a new {@link ManifestWriter}.
* <p>
* Manifests created by this writer have all entry snapshot IDs set to null.
* All entries will inherit the snapshot ID that will be assigned to the manifest on commit.
*
* @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples
* @param outputFile the destination file location
* @return a manifest writer
*/
public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
// always use a v1 writer for appended manifests because sequence number must be inherited
return write(1, spec, outputFile, null);
}

/**
* Create a new {@link ManifestWriter} for the given format version.
*
* @param formatVersion a target format version
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link OutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @return a manifest writer
*/
static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
switch (formatVersion) {
case 1:
return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
}
throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
}

static ManifestFile copyAppendManifest(int formatVersion, ManifestReader reader, OutputFile outputFile,
long snapshotId, SnapshotSummary.Builder summaryBuilder) {
return copyManifest(
formatVersion, reader, outputFile, snapshotId, summaryBuilder, Sets.newHashSet(ManifestEntry.Status.ADDED));
}

static ManifestFile copyManifest(int formatVersion, ManifestReader reader, OutputFile outputFile, long snapshotId,
SnapshotSummary.Builder summaryBuilder,
Set<ManifestEntry.Status> allowedEntryStatuses) {
ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId);
boolean threw = true;
try {
for (ManifestEntry entry : reader.entries()) {
Preconditions.checkArgument(
allowedEntryStatuses.contains(entry.status()),
"Invalid manifest entry status: %s (allowed statuses: %s)",
entry.status(), allowedEntryStatuses);
switch (entry.status()) {
case ADDED:
summaryBuilder.addedFile(reader.spec(), entry.file());
writer.add(entry);
break;
case EXISTING:
writer.existing(entry);
break;
case DELETED:
summaryBuilder.deletedFile(reader.spec(), entry.file());
writer.delete(entry);
break;
}
}

threw = false;

} finally {
try {
writer.close();
} catch (IOException e) {
if (!threw) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", outputFile);
}
}
}

return writer.toManifestFile();
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/ManifestGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private <T> Iterable<CloseableIterable<T>> entries(
Iterable<CloseableIterable<T>> readers = Iterables.transform(
matchingManifests,
manifest -> {
ManifestReader reader = ManifestReader.read(manifest, io, specsById);
ManifestReader reader = ManifestFiles.read(manifest, io, specsById);

FilteredManifest filtered = reader
.filterRows(dataFilter)
Expand Down
65 changes: 19 additions & 46 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.exceptions.RuntimeIOException;
Expand All @@ -45,8 +46,7 @@
/**
* Reader for manifest files.
* <p>
* The preferable way to create readers is using {@link #read(ManifestFile, FileIO, Map)} as
* it allows entries to inherit manifest metadata such as snapshot id.
* Create readers using {@link ManifestFiles#read(ManifestFile, FileIO, Map)}.
*/
public class ManifestReader extends CloseableGroup implements Filterable<FilteredManifest> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class);
Expand All @@ -62,61 +62,29 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
/**
* Returns a new {@link ManifestReader} for an {@link InputFile}.
* <p>
* <em>Note:</em> Most callers should use {@link #read(ManifestFile, FileIO, Map)} to ensure that
* <em>Note:</em> Callers should use {@link ManifestFiles#read(ManifestFile, FileIO, Map)} to ensure that
* manifest entries with partial metadata can inherit missing properties from the manifest metadata.
* <p>
* <em>Note:</em> Most callers should use {@link #read(InputFile, Map)} if all manifest entries
* contain full metadata and they want to ensure that the schema used by filters is the latest
* table schema. This should be used only when reading a manifest without filters.
*
* @param file an InputFile
* @return a manifest reader
* @deprecated will be removed in 0.9.0; use {@link ManifestFiles#read(ManifestFile, FileIO, Map)} instead.
*/
@Deprecated
public static ManifestReader read(InputFile file) {
return new ManifestReader(file, null, InheritableMetadataFactory.empty());
return read(file, null);
}

/**
* Returns a new {@link ManifestReader} for an {@link InputFile}.
* <p>
* <em>Note:</em> Most callers should use {@link #read(ManifestFile, FileIO, Map)} to ensure that
* manifest entries with partial metadata can inherit missing properties from the manifest metadata.
*
* @param file an InputFile
* @param specsById a Map from spec ID to partition spec
* @return a manifest reader
*/
public static ManifestReader read(InputFile file, Map<Integer, PartitionSpec> specsById) {
return new ManifestReader(file, specsById, InheritableMetadataFactory.empty());
}

/**
* Returns a new {@link ManifestReader} for a {@link ManifestFile}.
* <p>
* <em>Note:</em> Most callers should use {@link #read(ManifestFile, FileIO, Map)} to ensure
* the schema used by filters is the latest table schema. This should be used only when reading
* a manifest without filters.
*
* @param manifest a ManifestFile
* @param io a FileIO
* @param specLookup a function to look up the manifest's partition spec by ID
* @return a manifest reader
* @deprecated will be removed in 0.9.0; use {@link ManifestFiles#read(ManifestFile, FileIO, Map)} instead.
*/
public static ManifestReader read(ManifestFile manifest, FileIO io) {
return read(manifest, io, null);
}

/**
* Returns a new {@link ManifestReader} for a {@link ManifestFile}.
*
* @param manifest a ManifestFile
* @param io a FileIO
* @param specsById a Map from spec ID to partition spec
* @return a manifest reader
*/
public static ManifestReader read(ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) {
InputFile file = io.newInputFile(manifest.path());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader(file, specsById, inheritableMetadata);
@Deprecated
public static ManifestReader read(InputFile file, Function<Integer, PartitionSpec> specLookup) {
return new ManifestReader(file, specLookup, InheritableMetadataFactory.empty());
}

private final InputFile file;
Expand All @@ -129,7 +97,12 @@ public static ManifestReader read(ManifestFile manifest, FileIO io, Map<Integer,
private List<ManifestEntry> cachedAdds = null;
private List<ManifestEntry> cachedDeletes = null;

private ManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById,
ManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById,
InheritableMetadata inheritableMetadata) {
this(file, specsById != null ? specsById::get : null, inheritableMetadata);
}

private ManifestReader(InputFile file, Function<Integer, PartitionSpec> specLookup,
InheritableMetadata inheritableMetadata) {
this.file = file;
this.inheritableMetadata = inheritableMetadata;
Expand All @@ -150,8 +123,8 @@ private ManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById,
specId = Integer.parseInt(specProperty);
}

if (specsById != null) {
this.spec = specsById.get(specId);
if (specLookup != null) {
this.spec = specLookup.apply(specId);
} else {
Schema schema = SchemaParser.fromJson(metadata.get("schema"));
this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
Expand Down
Loading