Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion api/src/main/java/org/apache/iceberg/AppendFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,21 @@ public interface AppendFiles extends SnapshotUpdate<AppendFiles> {
AppendFiles appendFile(DataFile file);

/**
* Append the contents of a manifest to the table.
* Append a {@link ManifestFile} to the table.
* <p>
* The manifest must contain only appended files. All files in the manifest will be appended to
* the table in the snapshot created by this update.
* <p>
* By default, the manifest will be rewritten to assign all entries this update's snapshot ID.
* In that case, it is always the responsibility of the caller to manage the lifecycle of
* the original manifest.
* <p>
* If manifest entries are allowed to inherit the snapshot ID assigned on commit, the manifest
* should never be deleted manually if the commit succeeds as it will become part of the table
* metadata and will be cleaned up on expiry. If the manifest gets merged with others while
* preparing a new snapshot, it will be deleted automatically if this operation is successful.
* If the commit fails, the manifest will never be deleted and it is up to the caller whether
* to delete or reuse it.
*
* @param file a manifest file
* @return this for method chaining
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/RewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ public interface RewriteManifests extends SnapshotUpdate<RewriteManifests> {
/**
* Adds a {@link ManifestFile manifest file} to the table. The added manifest cannot contain new
* or deleted files.
* <p>
* By default, the manifest will be rewritten to ensure all entries have explicit snapshot IDs.
* In that case, it is always the responsibility of the caller to manage the lifecycle of
* the original manifest.
* <p>
* If manifest entries are allowed to inherit the snapshot ID assigned on commit, the manifest
* should never be deleted manually if the commit succeeds as it will become part of the table
* metadata and will be cleaned up on expiry. If the manifest gets merged with others while
* preparing a new snapshot, it will be deleted automatically if this operation is successful.
* If the commit fails, the manifest will never be deleted and it is up to the caller whether
* to delete or reuse it.
*
* @param manifest a manifest to add
* @return this for method chaining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
.executeWith(ThreadPools.getWorkerPool())
.onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc))
.run(manifest -> {
try (ManifestReader reader = ManifestReader.read(io.newInputFile(manifest.path()))) {
try (ManifestReader reader = ManifestReader.read(manifest, io)) {
for (ManifestEntry entry : reader.entries()) {
// intern the file path because the weak key map uses identity (==) instead of equals
String path = entry.file().path().toString().intern();
Expand Down
52 changes: 38 additions & 14 deletions core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand All @@ -45,6 +46,8 @@

import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;


public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> implements RewriteManifests {
Expand All @@ -59,9 +62,11 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
private final TableOperations ops;
private final Map<Integer, PartitionSpec> specsById;
private final long manifestTargetSizeBytes;
private final boolean snapshotIdInheritanceEnabled;

private final Set<ManifestFile> deletedManifests = Sets.newHashSet();
private final List<ManifestFile> addedManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAddedManifests = Lists.newArrayList();

private final List<ManifestFile> keptManifests = Collections.synchronizedList(new ArrayList<>());
private final List<ManifestFile> newManifests = Collections.synchronizedList(new ArrayList<>());
Expand All @@ -82,6 +87,8 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
this.specsById = ops.current().specsById();
this.manifestTargetSizeBytes =
ops.current().propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
this.snapshotIdInheritanceEnabled = ops.current()
.propertyAsBoolean(SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
}

@Override
Expand All @@ -102,8 +109,9 @@ public RewriteManifests set(String property, String value) {

@Override
protected Map<String, String> summary() {
int createdManifestsCount = newManifests.size() + addedManifests.size() + rewrittenAddedManifests.size();
summaryBuilder.set(CREATED_MANIFESTS_COUNT, String.valueOf(createdManifestsCount));
summaryBuilder.set(KEPT_MANIFESTS_COUNT, String.valueOf(keptManifests.size()));
summaryBuilder.set(CREATED_MANIFESTS_COUNT, String.valueOf(newManifests.size() + addedManifests.size()));
summaryBuilder.set(REPLACED_MANIFESTS_COUNT, String.valueOf(rewrittenManifests.size() + deletedManifests.size()));
summaryBuilder.set(PROCESSED_ENTRY_COUNT, String.valueOf(entryCount.get()));
return summaryBuilder.build();
Expand All @@ -129,17 +137,25 @@ public RewriteManifests deleteManifest(ManifestFile manifest) {

@Override
public RewriteManifests addManifest(ManifestFile manifest) {
try {
// the appended manifest must be rewritten with this update's snapshot ID
addedManifests.add(copyManifest(manifest));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Cannot append manifest: " + e.getMessage());
Preconditions.checkArgument(!manifest.hasAddedFiles(), "Cannot add manifest with added files");
Preconditions.checkArgument(!manifest.hasDeletedFiles(), "Cannot add manifest with deleted files");
Preconditions.checkArgument(
manifest.snapshotId() == null || manifest.snapshotId() == -1,
"Snapshot id must be assigned during commit");

if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
addedManifests.add(manifest);
} else {
// the manifest must be rewritten with this update's snapshot ID
ManifestFile copiedManifest = copyManifest(manifest);
rewrittenAddedManifests.add(copiedManifest);
}

return this;
}

private ManifestFile copyManifest(ManifestFile manifest) {
try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()), specsById)) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), specsById)) {
OutputFile newFile = manifestPath(manifestSuffix.getAndIncrement());
return ManifestWriter.copyManifest(reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES);
} catch (IOException e) {
Expand All @@ -162,10 +178,14 @@ public List<ManifestFile> apply(TableMetadata base) {

validateFilesCounts();

// TODO: add sequence numbers here
Iterable<ManifestFile> newManifestsWithMetadata = Iterables.transform(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This place requires attention. When we introduce sequence numbers, we will have to iterate through all new manifests. For now, iterating through newManifests and rewrittenAddedManifests is redundant. However, I expect we will add sequence number quickly and we will simply need to change the closure.

Iterables.concat(newManifests, addedManifests, rewrittenAddedManifests),
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());

// put new manifests at the beginning
List<ManifestFile> apply = new ArrayList<>();
apply.addAll(newManifests);
apply.addAll(addedManifests);
Iterables.addAll(apply, newManifestsWithMetadata);
apply.addAll(keptManifests);

return apply;
Expand Down Expand Up @@ -218,8 +238,7 @@ private void performRewrite(List<ManifestFile> currentManifests) {
keptManifests.add(manifest);
} else {
rewrittenManifests.add(manifest);
try (ManifestReader reader =
ManifestReader.read(ops.io().newInputFile(manifest.path()), ops.current().specsById())) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
FilteredManifest filteredManifest = reader.select(Arrays.asList("*"));
filteredManifest.liveEntries().forEach(
entry -> appendEntry(entry, clusterByFunc.apply(entry.file()), manifest.partitionSpecId())
Expand All @@ -246,8 +265,11 @@ private void validateDeletedManifests(Set<ManifestFile> currentManifests) {
}

private void validateFilesCounts() {
int createdManifestsFilesCount = activeFilesCount(newManifests) + activeFilesCount(addedManifests);
int replacedManifestsFilesCount = activeFilesCount(rewrittenManifests) + activeFilesCount(deletedManifests);
Iterable<ManifestFile> createdManifests = Iterables.concat(newManifests, addedManifests, rewrittenAddedManifests);
int createdManifestsFilesCount = activeFilesCount(createdManifests);

Iterable<ManifestFile> replacedManifests = Iterables.concat(rewrittenManifests, deletedManifests);
int replacedManifestsFilesCount = activeFilesCount(replacedManifests);

if (createdManifestsFilesCount != replacedManifestsFilesCount) {
throw new ValidationException(
Expand Down Expand Up @@ -295,7 +317,9 @@ private WriterWrapper getWriter(Object key) {
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
cleanUncommitted(newManifests, committed);
cleanUncommitted(addedManifests, committed);
// clean up only rewrittenAddedManifests as they are always owned by the table
// don't clean up addedManifests as they are added to the manifest list and are not compacted
cleanUncommitted(rewrittenAddedManifests, committed);
}

private void cleanUncommitted(Iterable<ManifestFile> manifests, Set<ManifestFile> committedManifests) {
Expand Down
42 changes: 9 additions & 33 deletions core/src/main/java/org/apache/iceberg/DataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,61 +119,37 @@ protected CloseableIterable<FileScanTask> planFiles(

String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);

// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
// This data task needs to use the table schema, which may not include a partition schema to avoid having an
// empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
// all cases.
return CloseableIterable.transform(manifests, manifest ->
new ManifestReadTask(ops.io(), new BaseFileScanTask(
DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter)),
fileSchema));
new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals));
}
}

private static class ManifestReadTask implements DataTask {
private static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final FileIO io;
private final FileScanTask manifestTask;
private final ManifestFile manifest;
private final Schema schema;

private ManifestReadTask(FileIO io, FileScanTask manifestTask, Schema schema) {
private ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString,
String specString, ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), schemaString, specString, residuals);
this.io = io;
this.manifestTask = manifestTask;
this.manifest = manifest;
this.schema = schema;
}

@Override
public CloseableIterable<StructLike> rows() {
return CloseableIterable.transform(
ManifestReader.read(io.newInputFile(manifestTask.file().path().toString())).project(schema),
ManifestReader.read(manifest, io).project(schema),
file -> (GenericDataFile) file);
}

@Override
public DataFile file() {
return manifestTask.file();
}

@Override
public PartitionSpec spec() {
return manifestTask.spec();
}

@Override
public long start() {
return 0;
}

@Override
public long length() {
return manifestTask.length();
}

@Override
public Expression residual() {
return manifestTask.residual();
}

@Override
public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
Expand Down
46 changes: 38 additions & 8 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
Expand All @@ -29,6 +31,9 @@
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;

import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;

/**
* {@link AppendFiles Append} implementation that adds a new manifest file for the write.
* <p>
Expand All @@ -37,9 +42,11 @@
class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final TableOperations ops;
private final PartitionSpec spec;
private final boolean snapshotIdInheritanceEnabled;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private ManifestFile newManifest = null;
private final AtomicInteger manifestCount = new AtomicInteger(0);
private boolean hasNewFiles = false;
Expand All @@ -48,6 +55,8 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
super(ops);
this.ops = ops;
this.spec = ops.current().spec();
this.snapshotIdInheritanceEnabled = ops.current()
.propertyAsBoolean(SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
}

@Override
Expand Down Expand Up @@ -81,16 +90,31 @@ public FastAppend appendFile(DataFile file) {

@Override
public FastAppend appendManifest(ManifestFile manifest) {
// the manifest must be rewritten with this update's snapshot ID
try (ManifestReader reader = ManifestReader.read(
ops.io().newInputFile(manifest.path()), ops.current().specsById())) {
Preconditions.checkArgument(!manifest.hasExistingFiles(), "Cannot append manifest with existing files");
Preconditions.checkArgument(!manifest.hasDeletedFiles(), "Cannot append manifest with deleted files");
Preconditions.checkArgument(
manifest.snapshotId() == null || manifest.snapshotId() == -1,
"Snapshot id must be assigned during commit");

if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) {
summaryBuilder.addedManifest(manifest);
appendManifests.add(manifest);
} else {
// the manifest must be rewritten with this update's snapshot ID
ManifestFile copiedManifest = copyManifest(manifest);
rewrittenAppendManifests.add(copiedManifest);
}

return this;
}

private ManifestFile copyManifest(ManifestFile manifest) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to collect more metadata while writing manifests so that we don't have to read manifests to simply get stats. Clearly, this kills all the benefits of inhering the snapshot id.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to collect more metadata to build the summary without reading the passing manifest from file system?

Copy link
Contributor

@rdblue rdblue Dec 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can keep the summary in ManifestFile instead. For now, this is fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we can use the manifest's summary stats for top-level properties. Partition-level properties are optional so we should just not include them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjunjiedada, yes, I would like to avoid reading passed manifests for better performance.

@rdblue, could you elaborate on what you mean by top-level and partition-level properties? Do you mean changed-partition-count is optional while added-records, added-data-files and others aren't?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we would produce all of the summary stats, but the most important ones are total-data-files, total-records, and the added- or deleted- properties that are used to produce totals. I think it's okay to not write the changed-partition-count metrics if they require scanning the appended manifest.

I think my response was confusing because we keep additional summary information about each partition in our version. I can move that upstream if everyone wants it, but it can make the metadata files quite large. Without a use case for doing this upstream, I didn't think it was a good idea to make everyone's metadata significantly larger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a few examples of what stats you collect per partition? One of our customers was asking about some stats about what partitions were modified, for example.

OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement());
appendManifests.add(ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder));
return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
}

return this;
}

@Override
Expand All @@ -106,7 +130,11 @@ public List<ManifestFile> apply(TableMetadata base) {
throw new RuntimeIOException(e, "Failed to write manifest");
}

newManifests.addAll(appendManifests);
// TODO: add sequence numbers here
Iterable<ManifestFile> appendManifestsWithMetadata = Iterables.transform(
Iterables.concat(appendManifests, rewrittenAppendManifests),
manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build());
Iterables.addAll(newManifests, appendManifestsWithMetadata);

if (base.currentSnapshot() != null) {
newManifests.addAll(base.currentSnapshot().manifests());
Expand All @@ -121,7 +149,9 @@ protected void cleanUncommitted(Set<ManifestFile> committed) {
deleteFile(newManifest.path());
}

for (ManifestFile manifest : appendManifests) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to clean appendManifestsWithMetadata in case of uncommitted? Or leave to the caller as you comment below?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The appended manifests become part of the table, so there is no need to delete them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean even manifests that are not committed are part of the table? Or all appended manifests must be committed successfully so we don't have to care for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjunjiedada, yes, all appended manifests must be successfully committed. They are not generated by Iceberg anymore. Instead, they are produced by users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One case we need to consider is when we add a manifest using merge append and it gets combined with other manifests/files in apply. In that case, the added manifest will never be part of the table and can become orphan.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems reasonable to clean up the external manifest if the commit is successful but that manifest is not part of the table metadata in MergingSnapshotProducer. Otherwise, the caller will have to detect which of the appended was merged and delete only those to ensure we don't have orphan manifests. It is different from what we have right now because all manifests are copied before they are appended to the metadata. What do you think, guys?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

// clean up only rewrittenAppendManifests as they are always owned by the table
// don't clean up appendManifests as they are added to the manifest list and are not compacted
for (ManifestFile manifest : rewrittenAppendManifests) {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
}
Expand Down
Loading