Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 57 additions & 45 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.RuntimeIOException;
Expand All @@ -42,6 +43,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
Expand Down Expand Up @@ -79,7 +81,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private final ManifestFilterManager<DeleteFile> deleteFilterManager;

// update data
private final List<DataFile> newDataFiles = Lists.newArrayList();
private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = Maps.newHashMap();
private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
private Long newDataFilesDataSequenceNumber;
Expand All @@ -89,10 +91,9 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder();
private Expression deleteExpression = Expressions.alwaysFalse();
private PartitionSpec dataSpec;

// cache new data manifests after writing
private List<ManifestFile> cachedNewDataManifests = null;
private final List<ManifestFile> cachedNewDataManifests = Lists.newLinkedList();
private boolean hasNewDataFiles = false;

// cache new manifests for delete files
Expand All @@ -105,7 +106,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
super(ops);
this.tableName = tableName;
this.ops = ops;
this.dataSpec = null;
long targetSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
Expand Down Expand Up @@ -141,18 +141,31 @@ protected boolean isCaseSensitive() {
}

protected PartitionSpec dataSpec() {
Set<PartitionSpec> specs = dataSpecs();
Preconditions.checkState(
dataSpec != null, "Cannot determine partition spec: no data files have been added");
// the spec is set when the write is started
return dataSpec;
specs.size() == 1,
"Cannot return a single partition spec: data files with different partition specs have been added");
return specs.iterator().next();
}

protected Set<PartitionSpec> dataSpecs() {
Set<PartitionSpec> specs = newDataFilesBySpec.keySet();
Preconditions.checkState(
!specs.isEmpty(), "Cannot determine partition specs: no data files have been added");
return ImmutableSet.copyOf(specs);
}

protected Expression rowFilter() {
return deleteExpression;
}

protected List<DataFile> addedDataFiles() {
return ImmutableList.copyOf(newDataFiles);
return ImmutableList.copyOf(
newDataFilesBySpec.values().stream().flatMap(List::stream).collect(Collectors.toList()));
}

protected Map<PartitionSpec, List<DataFile>> addedDataFilesBySpec() {
return ImmutableMap.copyOf(newDataFilesBySpec);
}

protected void failAnyDelete() {
Expand Down Expand Up @@ -212,7 +225,7 @@ protected boolean deletesDeleteFiles() {
}

protected boolean addsDataFiles() {
return !newDataFiles.isEmpty();
return !newDataFilesBySpec.isEmpty();
}

protected boolean addsDeleteFiles() {
Expand All @@ -223,9 +236,17 @@ protected boolean addsDeleteFiles() {
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newDataFilePaths.add(file.path())) {
setDataSpec(file);
addedFilesSummary.addedFile(dataSpec(), file);
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkArgument(
fileSpec != null,
"Cannot find partition spec %s for data file: %s",
file.specId(),
file.path());

addedFilesSummary.addedFile(fileSpec, file);
hasNewDataFiles = true;
List<DataFile> newDataFiles =
newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList());
newDataFiles.add(file);
}
}
Expand Down Expand Up @@ -255,17 +276,6 @@ private void add(DeleteFileHolder fileHolder) {
}
}

private void setDataSpec(DataFile file) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkNotNull(
fileSpec, "Cannot find partition spec for data file: %s", file.path());
if (dataSpec == null) {
dataSpec = fileSpec;
} else if (dataSpec.specId() != file.specId()) {
throw new ValidationException("Invalid data file, expected spec id: %d", dataSpec.specId());
}
}

/** Add all files in a manifest to the new snapshot. */
protected void add(ManifestFile manifest) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -885,7 +895,7 @@ public Object updateEvent() {

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private void cleanUncommittedAppends(Set<ManifestFile> committed) {
if (cachedNewDataManifests != null) {
if (!cachedNewDataManifests.isEmpty()) {
boolean hasDeletes = false;
for (ManifestFile manifest : cachedNewDataManifests) {
if (!committed.contains(manifest)) {
Expand All @@ -895,7 +905,7 @@ private void cleanUncommittedAppends(Set<ManifestFile> committed) {
}

if (hasDeletes) {
this.cachedNewDataManifests = null;
this.cachedNewDataManifests.clear();
}
}

Expand Down Expand Up @@ -941,7 +951,7 @@ protected void cleanUncommitted(Set<ManifestFile> committed) {

private Iterable<ManifestFile> prepareNewDataManifests() {
Iterable<ManifestFile> newManifests;
if (!newDataFiles.isEmpty()) {
if (!newDataFilesBySpec.isEmpty()) {
List<ManifestFile> dataFileManifests = newDataFilesAsManifests();
newManifests = Iterables.concat(dataFileManifests, appendManifests, rewrittenAppendManifests);
} else {
Expand All @@ -954,29 +964,31 @@ private Iterable<ManifestFile> prepareNewDataManifests() {
}

private List<ManifestFile> newDataFilesAsManifests() {
if (hasNewDataFiles && cachedNewDataManifests != null) {
if (hasNewDataFiles && !cachedNewDataManifests.isEmpty()) {
cachedNewDataManifests.forEach(file -> deleteFile(file.path()));
cachedNewDataManifests = null;
cachedNewDataManifests.clear();
}

if (cachedNewDataManifests == null) {
try {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(dataSpec());
try {
if (newDataFilesDataSequenceNumber == null) {
newDataFiles.forEach(writer::add);
} else {
newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber));
}
} finally {
writer.close();
}

this.cachedNewDataManifests = writer.toManifestFiles();
this.hasNewDataFiles = false;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
if (cachedNewDataManifests.isEmpty()) {
newDataFilesBySpec.forEach(
(dataSpec, newDataFiles) -> {
try {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(dataSpec);
try {
if (newDataFilesDataSequenceNumber == null) {
newDataFiles.forEach(writer::add);
} else {
newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber));
}
} finally {
writer.close();
}
this.cachedNewDataManifests.addAll(writer.toManifestFiles());
this.hasNewDataFiles = false;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
});
}

return cachedNewDataManifests;
Expand Down
88 changes: 88 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestMergeAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -90,6 +92,92 @@ public void testEmptyTableAppend() {
statuses(Status.ADDED, Status.ADDED));
}

@TestTemplate
public void testEmptyTableAppendFilesWithDifferentSpecs() {
assertThat(listManifestFiles()).as("Table should start empty").isEmpty();

TableMetadata base = readMetadata();
assertThat(base.currentSnapshot()).as("Should not have a current snapshot").isNull();
assertThat(base.lastSequenceNumber()).as("Last sequence number should be 0").isEqualTo(0);

table.updateSpec().addField("id").commit();
PartitionSpec newSpec = table.spec();

assertThat(table.specs()).as("Table should have 2 specs").hasSize(2);

DataFile fileNewSpec =
DataFiles.builder(newSpec)
.withPath("/path/to/data-b.parquet")
.withPartitionPath("data_bucket=0/id=0")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();

Snapshot committedSnapshot =
commit(table, table.newAppend().appendFile(FILE_A).appendFile(fileNewSpec), branch);

assertThat(committedSnapshot).as("Should create a snapshot").isNotNull();
V1Assert.assertEquals(
"Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 1", 1, table.ops().current().lastSequenceNumber());

assertThat(committedSnapshot.allManifests(table.io()))
.as("Should create 2 manifests for initial write, 1 manifest per spec")
.hasSize(2);

long snapshotId = committedSnapshot.snapshotId();

ImmutableMap<Integer, DataFile> expectedFileBySpec =
ImmutableMap.of(SPEC.specId(), FILE_A, newSpec.specId(), fileNewSpec);

expectedFileBySpec.forEach(
(specId, expectedDataFile) -> {
ManifestFile manifestFileForSpecId =
committedSnapshot.allManifests(table.io()).stream()
.filter(m -> Objects.equals(m.partitionSpecId(), specId))
.findAny()
.get();

validateManifest(
manifestFileForSpecId,
dataSeqs(1L),
fileSeqs(1L),
ids(snapshotId),
files(expectedDataFile),
statuses(Status.ADDED));
});
}

@TestTemplate
public void testDataSpecThrowsExceptionIfDataFilesWithDifferentSpecsAreAdded() {
assertThat(listManifestFiles()).as("Table should start empty").isEmpty();

TableMetadata base = readMetadata();
assertThat(base.currentSnapshot()).as("Should not have a current snapshot").isNull();
assertThat(base.lastSequenceNumber()).as("Last sequence number should be 0").isEqualTo(0);

table.updateSpec().addField("id").commit();
PartitionSpec newSpec = table.spec();

assertThat(table.specs()).as("Table should have 2 specs").hasSize(2);

DataFile fileNewSpec =
DataFiles.builder(newSpec)
.withPath("/path/to/data-b.parquet")
.withPartitionPath("data_bucket=0/id=0")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();

MergeAppend mergeAppend =
(MergeAppend) table.newAppend().appendFile(FILE_A).appendFile(fileNewSpec);
assertThatThrownBy(mergeAppend::dataSpec)
.isInstanceOf(IllegalStateException.class)
.hasMessage(
"Cannot return a single partition spec: data files with different partition specs have been added");
}

@TestTemplate
public void testEmptyTableAppendManifest() throws IOException {
assertThat(listManifestFiles()).isEmpty();
Expand Down