Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -74,7 +73,6 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
private final Set<ManifestFile> rewrittenManifests = Sets.newConcurrentHashSet();
private final Map<Object, WriterWrapper> writers = Maps.newConcurrentMap();

private final AtomicInteger manifestSuffix = new AtomicInteger(0);
private final AtomicLong entryCount = new AtomicLong(0);

private Function<DataFile, Object> clusterByFunc;
Expand Down Expand Up @@ -157,7 +155,7 @@ public RewriteManifests addManifest(ManifestFile manifest) {

private ManifestFile copyManifest(ManifestFile manifest) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), specsById)) {
OutputFile newFile = manifestPath(manifestSuffix.getAndIncrement());
OutputFile newFile = newManifestOutput();
return ManifestWriter.copyManifest(reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
Expand Down Expand Up @@ -336,18 +334,14 @@ class WriterWrapper {

synchronized void addEntry(ManifestEntry entry) {
if (writer == null) {
writer = newWriter();
writer = newManifestWriter(spec);
} else if (writer.length() >= getManifestTargetSizeBytes()) {
close();
writer = newWriter();
writer = newManifestWriter(spec);
}
writer.existing(entry);
}

private ManifestWriter newWriter() {
return new ManifestWriter(spec, manifestPath(manifestSuffix.getAndIncrement()), snapshotId());
}

synchronized void close() {
if (writer != null) {
try {
Expand Down
8 changes: 2 additions & 6 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
Expand All @@ -48,7 +47,6 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private ManifestFile newManifest = null;
private final AtomicInteger manifestCount = new AtomicInteger(0);
private boolean hasNewFiles = false;

FastAppend(TableOperations ops) {
Expand Down Expand Up @@ -110,7 +108,7 @@ public FastAppend appendManifest(ManifestFile manifest) {

private ManifestFile copyManifest(ManifestFile manifest) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement());
OutputFile newManifestPath = newManifestOutput();
return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
Expand Down Expand Up @@ -165,9 +163,7 @@ private ManifestFile writeManifest() throws IOException {
}

if (newManifest == null && newFiles.size() > 0) {
OutputFile out = manifestPath(manifestCount.getAndIncrement());

ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
ManifestWriter writer = newManifestWriter(spec);
try {
writer.addAll(newFiles);
} finally {
Expand Down
18 changes: 16 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,18 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;

class ManifestListWriter implements FileAppender<ManifestFile> {
abstract class ManifestListWriter implements FileAppender<ManifestFile> {
static ManifestListWriter write(int formatVersion, OutputFile manifestListFile,
long snapshotId, Long parentSnapshotId) {
if (formatVersion == 1) {
return new V1Writer(manifestListFile, snapshotId, parentSnapshotId);
}
throw new UnsupportedOperationException("Cannot write manifest list for table version: " + formatVersion);
}

private final FileAppender<ManifestFile> writer;

ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
private ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
this.writer = newAppender(snapshotFile, ImmutableMap.of(
"snapshot-id", String.valueOf(snapshotId),
"parent-snapshot-id", String.valueOf(parentSnapshotId)));
Expand Down Expand Up @@ -80,4 +88,10 @@ private static FileAppender<ManifestFile> newAppender(OutputFile file, Map<Strin
throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: " + file);
}
}

static class V1Writer extends ManifestListWriter {
private V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
super(snapshotFile, snapshotId, parentSnapshotId);
}
}
}
22 changes: 18 additions & 4 deletions core/src/main/java/org/apache/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
/**
* Writer for manifest files.
*/
public class ManifestWriter implements FileAppender<DataFile> {
public abstract class ManifestWriter implements FileAppender<DataFile> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);

static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
Expand All @@ -44,7 +44,7 @@ static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputF
static ManifestFile copyManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
SnapshotSummary.Builder summaryBuilder,
Set<ManifestEntry.Status> allowedEntryStatuses) {
ManifestWriter writer = new ManifestWriter(reader.spec(), outputFile, snapshotId);
ManifestWriter writer = new V1Writer(reader.spec(), outputFile, snapshotId);
boolean threw = true;
try {
for (ManifestEntry entry : reader.entries()) {
Expand Down Expand Up @@ -93,7 +93,15 @@ static ManifestFile copyManifest(ManifestReader reader, OutputFile outputFile, l
* @return a manifest writer
*/
public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
return new ManifestWriter(spec, outputFile, null);
// always use a v1 writer for appended manifests because sequence number must be inherited
return write(1, spec, outputFile, null);
}

static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
if (formatVersion == 1) {
return new V1Writer(spec, outputFile, snapshotId);
}
throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
}

private final OutputFile file;
Expand All @@ -111,7 +119,7 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
private int deletedFiles = 0;
private long deletedRows = 0L;

ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
this.file = file;
this.specId = spec.specId();
this.writer = newAppender(FileFormat.AVRO, spec, file);
Expand Down Expand Up @@ -231,4 +239,10 @@ private static <D> FileAppender<D> newAppender(FileFormat format, PartitionSpec
throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file);
}
}

private static class V1Writer extends ManifestWriter {
V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
}
}
}
15 changes: 4 additions & 11 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -87,7 +86,6 @@ public String partition() {
private final boolean snapshotIdInheritanceEnabled;

// update data
private final AtomicInteger manifestCount = new AtomicInteger(0);
private final List<DataFile> newFiles = Lists.newArrayList();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -230,7 +228,7 @@ protected void add(ManifestFile manifest) {

private ManifestFile copyManifest(ManifestFile manifest) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement());
OutputFile newManifestPath = newManifestOutput();
return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), appendedManifestsSummary);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
Expand Down Expand Up @@ -542,8 +540,7 @@ private ManifestFile filterManifestWithDeletedFiles(
// manifest. produce a copy of the manifest with all deleted files removed.
List<DataFile> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
OutputFile filteredCopy = manifestPath(manifestCount.getAndIncrement());
ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId());
ManifestWriter writer = newManifestWriter(reader.spec());
try {
reader.entries().forEach(entry -> {
DataFile file = entry.file();
Expand Down Expand Up @@ -655,9 +652,7 @@ private ManifestFile createManifest(int specId, List<ManifestFile> bin) throws I
return mergeManifests.get(bin);
}

OutputFile out = manifestPath(manifestCount.getAndIncrement());

ManifestWriter writer = new ManifestWriter(ops.current().spec(specId), out, snapshotId());
ManifestWriter writer = newManifestWriter(ops.current().spec());
try {
for (ManifestFile manifest : bin) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
Expand Down Expand Up @@ -697,9 +692,7 @@ private ManifestFile newFilesAsManifest() throws IOException {
}

if (cachedNewManifest == null) {
OutputFile out = manifestPath(manifestCount.getAndIncrement());

ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
ManifestWriter writer = newManifestWriter(spec);
try {
writer.addAll(newFiles);
} finally {
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void accept(String file) {

private final TableOperations ops;
private final String commitUUID = UUID.randomUUID().toString();
private final AtomicInteger manifestCount = new AtomicInteger(0);
private final AtomicInteger attempt = new AtomicInteger(0);
private final List<String> manifestLists = Lists.newArrayList();
private volatile Long snapshotId = null;
Expand Down Expand Up @@ -148,8 +149,8 @@ public Snapshot apply() {
if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
OutputFile manifestList = manifestListPath();

try (ManifestListWriter writer = new ManifestListWriter(
manifestList, snapshotId(), parentSnapshotId)) {
try (ManifestListWriter writer = ManifestListWriter.write(
ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId)) {

// keep track of the manifest lists created
manifestLists.add(manifestList.location());
Expand Down Expand Up @@ -310,9 +311,13 @@ protected OutputFile manifestListPath() {
String.format("snap-%d-%d-%s", snapshotId(), attempt.incrementAndGet(), commitUUID))));
}

protected OutputFile manifestPath(int manifestNumber) {
protected OutputFile newManifestOutput() {
return ops.io().newOutputFile(
ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestNumber)));
ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())));
}

protected ManifestWriter newManifestWriter(PartitionSpec spec) {
return ManifestWriter.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId());
}

protected long snapshotId() {
Expand Down
Loading