diff --git a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java index a444e7ff9c63..57a5c2d3a11d 100644 --- a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java +++ b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java @@ -86,6 +86,7 @@ public class AppendBenchmark { @Setup public void setupBenchmark() { + dropTable(); initTable(); initDataFiles(); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 118ae0b328a5..77cdac8f4a29 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -40,12 +40,12 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Consumer; import java.util.function.Function; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -69,10 +69,10 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Queues; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.IntMath; import org.apache.iceberg.util.Exceptions; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; @@ -669,13 +669,33 @@ private static List writeManifests( Collection files, Function, List> writeFunc) { int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE, files.size()); List> groups = divide(files, parallelism); - Queue manifests = Queues.newConcurrentLinkedQueue(); - Tasks.foreach(groups) + + // Create a new list pairing each group with its index + List>> groupsWithIndex = Lists.newArrayList(); + for (int i = 0; i < groups.size(); i++) { + groupsWithIndex.add(Pair.of(i, groups.get(i))); + } + + AtomicReferenceArray> results = new AtomicReferenceArray<>(groups.size()); + + Tasks.foreach(groupsWithIndex) .stopOnFailure() .throwFailureWhenFinished() .executeWith(ThreadPools.getWorkerPool()) - .run(group -> manifests.addAll(writeFunc.apply(group))); - return ImmutableList.copyOf(manifests); + .run( + indexedGroup -> { + int index = indexedGroup.first(); + List group = indexedGroup.second(); + List groupResults = writeFunc.apply(group); + results.set(index, groupResults); + }); + + // Collect results in order + ImmutableList.Builder builder = ImmutableList.builder(); + for (int i = 0; i < results.length(); i++) { + builder.addAll(results.get(i)); + } + return builder.build(); } private static List> divide(Collection collection, int groupCount) { diff --git a/jmh.gradle b/jmh.gradle index 4d6d7207c539..7f3bc0deafd2 100644 --- a/jmh.gradle +++ b/jmh.gradle @@ -66,6 +66,7 @@ configure(jmhProjects) { forceGC = true includeTests = true humanOutputFile = file(jmhOutputPath) + jvmArgs = ['-Xmx32g'] resultsFile = file(jmhJsonOutputPath) resultFormat = 'JSON' includes = [jmhIncludeRegex]