Skip to content

Commit

Permalink
Core: Parallelize manifest writing for many new files (#11086)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Sep 12, 2024
1 parent e3d3f88 commit d2087a0
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 80 deletions.
13 changes: 9 additions & 4 deletions core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Timeout;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

/**
* A benchmark that evaluates the performance of appending files to the table.
Expand Down Expand Up @@ -66,14 +65,20 @@ public class AppendBenchmark {
required(4, "date_col", Types.DateType.get()),
required(5, "timestamp_col", Types.TimestampType.withoutZone()),
required(6, "timestamp_tz_col", Types.TimestampType.withZone()),
required(7, "str_col", Types.StringType.get()));
required(7, "str_col1", Types.StringType.get()),
required(8, "str_col2", Types.StringType.get()),
required(9, "str_col3", Types.StringType.get()),
required(10, "str_col4", Types.StringType.get()),
required(11, "str_col5", Types.StringType.get()),
required(12, "str_col6", Types.StringType.get()),
required(13, "str_col7", Types.StringType.get()));
private static final PartitionSpec SPEC = PartitionSpec.unpartitioned();
private static final HadoopTables TABLES = new HadoopTables();

private Table table;
private List<DataFile> dataFiles;

@Param({"500000", "1000000", "2500000"})
@Param({"50000", "100000", "500000", "1000000", "2500000"})
private int numFiles;

@Param({"true", "false"})
Expand All @@ -92,7 +97,7 @@ public void tearDownBenchmark() {

@Benchmark
@Threads(1)
public void appendFiles(Blackhole blackhole) {
public void appendFiles() {
AppendFiles append = fast ? table.newFastAppend() : table.newAppend();

for (DataFile dataFile : dataFiles) {
Expand Down
9 changes: 1 addition & 8 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,7 @@ private List<ManifestFile> writeNewManifests() throws IOException {
}

if (newManifests == null && !newFiles.isEmpty()) {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec);
try {
newFiles.forEach(writer::add);
} finally {
writer.close();
}

this.newManifests = writer.toManifestFiles();
this.newManifests = writeDataManifests(newFiles, spec);
hasNewFiles = false;
}

Expand Down
73 changes: 5 additions & 68 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -972,21 +971,9 @@ private List<ManifestFile> newDataFilesAsManifests() {
if (cachedNewDataManifests.isEmpty()) {
newDataFilesBySpec.forEach(
(dataSpec, newDataFiles) -> {
try {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(dataSpec);
try {
if (newDataFilesDataSequenceNumber == null) {
newDataFiles.forEach(writer::add);
} else {
newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber));
}
} finally {
writer.close();
}
this.cachedNewDataManifests.addAll(writer.toManifestFiles());
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
List<ManifestFile> newDataManifests =
writeDataManifests(newDataFiles, newDataFilesDataSequenceNumber, dataSpec);
cachedNewDataManifests.addAll(newDataManifests);
});
this.hasNewDataFiles = false;
}
Expand Down Expand Up @@ -1016,24 +1003,8 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops.current().spec(specId);
try {
RollingManifestWriter<DeleteFile> writer = newRollingDeleteManifestWriter(spec);
try {
deleteFiles.forEach(
df -> {
if (df.dataSequenceNumber() != null) {
writer.add(df.deleteFile(), df.dataSequenceNumber());
} else {
writer.add(df.deleteFile());
}
});
} finally {
writer.close();
}
cachedNewDeleteManifests.addAll(writer.toManifestFiles());
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
List<ManifestFile> newDeleteManifests = writeDeleteManifests(deleteFiles, spec);
cachedNewDeleteManifests.addAll(newDeleteManifests);
});

this.hasNewDeleteFiles = false;
Expand Down Expand Up @@ -1147,38 +1118,4 @@ protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
}
}

private static class DeleteFileHolder {
private final DeleteFile deleteFile;
private final Long dataSequenceNumber;

/**
* Wrap a delete file for commit with a given data sequence number
*
* @param deleteFile delete file
* @param dataSequenceNumber data sequence number to apply
*/
DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) {
this.deleteFile = deleteFile;
this.dataSequenceNumber = dataSequenceNumber;
}

/**
* Wrap a delete file for commit with the latest sequence number
*
* @param deleteFile delete file
*/
DeleteFileHolder(DeleteFile deleteFile) {
this.deleteFile = deleteFile;
this.dataSequenceNumber = null;
}

public DeleteFile deleteFile() {
return deleteFile;
}

public Long dataSequenceNumber() {
return dataSequenceNumber;
}
}
}
124 changes: 124 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.events.CreateSnapshotEvent;
Expand All @@ -59,10 +62,14 @@
import org.apache.iceberg.metrics.LoggingMetricsReporter;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.Timer.Timed;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.IntMath;
import org.apache.iceberg.util.Exceptions;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
Expand All @@ -73,6 +80,7 @@
@SuppressWarnings("UnnecessaryAnonymousClass")
abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotProducer.class);
static final int MIN_FILE_GROUP_SIZE = 10_000;
static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();

/** Default callback used to delete files. */
Expand Down Expand Up @@ -554,6 +562,88 @@ protected boolean cleanupAfterCommit() {
return true;
}

protected List<ManifestFile> writeDataManifests(List<DataFile> files, PartitionSpec spec) {
return writeDataManifests(files, null /* inherit data seq */, spec);
}

protected List<ManifestFile> writeDataManifests(
List<DataFile> files, Long dataSeq, PartitionSpec spec) {
return writeManifests(files, group -> writeDataFileGroup(group, dataSeq, spec));
}

private List<ManifestFile> writeDataFileGroup(
List<DataFile> files, Long dataSeq, PartitionSpec spec) {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec);

try (RollingManifestWriter<DataFile> closableWriter = writer) {
if (dataSeq != null) {
files.forEach(file -> closableWriter.add(file, dataSeq));
} else {
files.forEach(closableWriter::add);
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write data manifests");
}

return writer.toManifestFiles();
}

protected List<ManifestFile> writeDeleteManifests(
List<DeleteFileHolder> files, PartitionSpec spec) {
return writeManifests(files, group -> writeDeleteFileGroup(group, spec));
}

private List<ManifestFile> writeDeleteFileGroup(
List<DeleteFileHolder> files, PartitionSpec spec) {
RollingManifestWriter<DeleteFile> writer = newRollingDeleteManifestWriter(spec);

try (RollingManifestWriter<DeleteFile> closableWriter = writer) {
for (DeleteFileHolder file : files) {
if (file.dataSequenceNumber() != null) {
closableWriter.add(file.deleteFile(), file.dataSequenceNumber());
} else {
closableWriter.add(file.deleteFile());
}
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write delete manifests");
}

return writer.toManifestFiles();
}

private static <F> List<ManifestFile> writeManifests(
List<F> files, Function<List<F>, List<ManifestFile>> writeFunc) {
int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE, files.size());
List<List<F>> groups = divide(files, parallelism);
Queue<ManifestFile> manifests = Queues.newConcurrentLinkedQueue();
Tasks.foreach(groups)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(group -> manifests.addAll(writeFunc.apply(group)));
return ImmutableList.copyOf(manifests);
}

private static <T> List<List<T>> divide(List<T> list, int groupCount) {
int groupSize = IntMath.divide(list.size(), groupCount, RoundingMode.CEILING);
return Lists.partition(list, groupSize);
}

/**
* Calculates how many manifest writers can be used to concurrently to handle the given number of
* files without creating too small manifests.
*
* @param workerPoolSize the size of the available worker pool
* @param fileCount the total number of files to be processed
* @return the number of manifest writers that can be used concurrently
*/
@VisibleForTesting
static int manifestWriterCount(int workerPoolSize, int fileCount) {
int limit = IntMath.divide(fileCount, MIN_FILE_GROUP_SIZE, RoundingMode.HALF_UP);
return Math.max(1, Math.min(workerPoolSize, limit));
}

private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
Expand Down Expand Up @@ -654,4 +744,38 @@ private static void updateTotal(
}
}
}

protected static class DeleteFileHolder {
private final DeleteFile deleteFile;
private final Long dataSequenceNumber;

/**
* Wrap a delete file for commit with a given data sequence number.
*
* @param deleteFile delete file
* @param dataSequenceNumber data sequence number to apply
*/
DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) {
this.deleteFile = deleteFile;
this.dataSequenceNumber = dataSequenceNumber;
}

/**
* Wrap a delete file for commit with the latest sequence number.
*
* @param deleteFile delete file
*/
DeleteFileHolder(DeleteFile deleteFile) {
this.deleteFile = deleteFile;
this.dataSequenceNumber = null;
}

public DeleteFile deleteFile() {
return deleteFile;
}

public Long dataSequenceNumber() {
return dataSequenceNumber;
}
}
}
5 changes: 5 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -460,6 +461,10 @@ void validateSnapshot(Snapshot old, Snapshot snap, Long sequenceNumber, DataFile
}

void validateTableFiles(Table tbl, DataFile... expectedFiles) {
validateTableFiles(tbl, Arrays.asList(expectedFiles));
}

void validateTableFiles(Table tbl, Collection<DataFile> expectedFiles) {
Set<CharSequence> expectedFilePaths = Sets.newHashSet();
for (DataFile file : expectedFiles) {
expectedFilePaths.add(file.path());
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestFastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ protected static List<Object> parameters() {
return Arrays.asList(1, 2, 3);
}

@TestTemplate
public void testAddManyFiles() {
assertThat(listManifestFiles()).as("Table should start empty").isEmpty();

List<DataFile> dataFiles = Lists.newArrayList();

for (int ordinal = 0; ordinal < 2 * SnapshotProducer.MIN_FILE_GROUP_SIZE; ordinal++) {
StructLike partition = TestHelpers.Row.of(ordinal % 2);
DataFile dataFile = FileGenerationUtil.generateDataFile(table, partition);
dataFiles.add(dataFile);
}

AppendFiles append = table.newAppend();
dataFiles.forEach(append::appendFile);
append.commit();

validateTableFiles(table, dataFiles);
}

@TestTemplate
public void appendNullFile() {
assertThatThrownBy(() -> table.newFastAppend().appendFile(null).commit())
Expand Down
Loading

0 comments on commit d2087a0

Please sign in to comment.