diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java index f38366b3c0f0..3b2167c1c809 100644 --- a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java +++ b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Predicate; import org.apache.iceberg.exceptions.RuntimeIOException; @@ -117,6 +118,18 @@ public O next() { }; } + static CloseableIterable combine(Iterable> iterable, + ExecutorService workerPool, + int workerPoolSize) { + if (workerPool == null) { + return concat(iterable); + } + + Preconditions.checkArgument(workerPoolSize > 0, "Invalid workerPoolSize (not positive): " + workerPoolSize); + + return new ParallelIterable<>(iterable, workerPool, workerPoolSize); + } + static CloseableIterable concat(Iterable> iterable) { Iterator> iterables = iterable.iterator(); if (!iterables.hasNext()) { diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/api/src/main/java/org/apache/iceberg/io/ParallelIterable.java similarity index 87% rename from core/src/main/java/org/apache/iceberg/util/ParallelIterable.java rename to api/src/main/java/org/apache/iceberg/io/ParallelIterable.java index 3afc1c932a84..c261e9d182fd 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/api/src/main/java/org/apache/iceberg/io/ParallelIterable.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.util; +package org.apache.iceberg.io; import java.io.Closeable; import java.io.IOException; @@ -28,25 +28,39 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.apache.iceberg.exceptions.RuntimeIOException; -import org.apache.iceberg.io.CloseableGroup; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +/** + * Run iterables in parallel. + * @deprecated please use {@link CloseableIterable#combine(Iterable, ExecutorService, int)} instead. + */ +@Deprecated public class ParallelIterable extends CloseableGroup implements CloseableIterable { private final Iterable> iterables; private final ExecutorService workerPool; + private final int workerPoolSize; + /** + * @deprecated please use {@link CloseableIterable#combine(Iterable, ExecutorService, int)} instead. + */ + @Deprecated public ParallelIterable(Iterable> iterables, ExecutorService workerPool) { + this(iterables, workerPool, Runtime.getRuntime().availableProcessors()); + } + + ParallelIterable(Iterable> iterables, + ExecutorService workerPool, + int workerPoolSize) { this.iterables = iterables; this.workerPool = workerPool; + this.workerPoolSize = workerPoolSize; } @Override public CloseableIterator iterator() { - ParallelIterator iter = new ParallelIterator<>(iterables, workerPool); + ParallelIterator iter = new ParallelIterator<>(iterables, workerPool, workerPoolSize); addCloseable(iter); return iter; } @@ -59,7 +73,8 @@ private static class ParallelIterator implements CloseableIterator { private boolean closed = false; private ParallelIterator(Iterable> iterables, - ExecutorService workerPool) { + ExecutorService workerPool, + int workerPoolSize) { this.tasks = Iterables.transform(iterables, iterable -> (Runnable) () -> { try (Closeable ignored = (iterable instanceof Closeable) ? @@ -72,8 +87,8 @@ private ParallelIterator(Iterable> iterables, } }).iterator(); this.workerPool = workerPool; - // submit 2 tasks per worker at a time - this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; + // submit 2 tasks per worker at a time. + this.taskFutures = new Future[workerPoolSize * 2]; } @Override diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java index d1b084f0206c..e68582d4f916 100644 --- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java @@ -30,7 +30,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types.StructType; -import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.ThreadPools; /** @@ -128,9 +127,9 @@ protected CloseableIterable planFiles( } private static CloseableIterable allDataManifestFiles(List snapshots) { - try (CloseableIterable iterable = new ParallelIterable<>( - Iterables.transform(snapshots, snapshot -> (Iterable) () -> snapshot.dataManifests().iterator()), - ThreadPools.getWorkerPool())) { + try (CloseableIterable iterable = CloseableIterable.combine( + Iterables.transform(snapshots, snapshot -> CloseableIterable.withNoopClose(snapshot.dataManifests())), + ThreadPools.getWorkerPool(), ThreadPools.WORKER_THREAD_POOL_SIZE)) { return CloseableIterable.withNoopClose(Sets.newHashSet(iterable)); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close parallel iterable"); diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java index 84c1609fd4e7..413c877125ef 100644 --- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java @@ -30,7 +30,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types.StructType; -import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.ThreadPools; /** @@ -114,9 +113,9 @@ protected CloseableIterable planFiles( } private static CloseableIterable allManifestFiles(List snapshots) { - try (CloseableIterable iterable = new ParallelIterable<>( - Iterables.transform(snapshots, snapshot -> (Iterable) () -> snapshot.allManifests().iterator()), - ThreadPools.getWorkerPool())) { + try (CloseableIterable iterable = CloseableIterable.combine( + Iterables.transform(snapshots, snapshot -> CloseableIterable.withNoopClose(snapshot.allManifests())), + ThreadPools.getWorkerPool(), ThreadPools.WORKER_THREAD_POOL_SIZE)) { return CloseableIterable.withNoopClose(Sets.newHashSet(iterable)); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close parallel iterable"); diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index 1120bdfb36d3..03293ad41f0e 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -39,7 +39,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.ParallelIterable; +import org.apache.iceberg.util.ThreadPools; class ManifestGroup { private static final Types.StructType EMPTY_STRUCT = Types.StructType.of(); @@ -182,11 +182,7 @@ public CloseableIterable planFiles() { } }); - if (executorService != null) { - return new ParallelIterable<>(tasks, executorService); - } else { - return CloseableIterable.concat(tasks); - } + return CloseableIterable.combine(tasks, executorService, ThreadPools.WORKER_THREAD_POOL_SIZE); } /** diff --git a/core/src/main/java/org/apache/iceberg/SystemProperties.java b/core/src/main/java/org/apache/iceberg/SystemProperties.java index 7ef8a20eb580..143cc3eb62f2 100644 --- a/core/src/main/java/org/apache/iceberg/SystemProperties.java +++ b/core/src/main/java/org/apache/iceberg/SystemProperties.java @@ -39,11 +39,24 @@ private SystemProperties() { */ public static final String SCAN_THREAD_POOL_ENABLED = "iceberg.scan.plan-in-worker-pool"; - static boolean getBoolean(String systemProperty, boolean defaultValue) { + /** + * Whether to use the shared worker pool when planning table scans. + */ + public static final String READ_DELETE_FILES_WORKER_POOL_SIZE = "iceberg.worker.read-deletes-num-threads"; + + public static boolean getBoolean(String systemProperty, boolean defaultValue) { String value = System.getProperty(systemProperty); if (value != null) { return Boolean.parseBoolean(value); } return defaultValue; } + + public static int getInteger(String systemProperty, int defaultValue) { + String value = System.getProperty(systemProperty); + if (value != null) { + return Integer.parseInt(value); + } + return defaultValue; + } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 62154f7d6071..ea6875d479d5 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -77,8 +77,9 @@ public static CloseableIterable filter(CloseableIterable rows, Functio return filter.filter(rows); } - public static StructLikeSet toEqualitySet(CloseableIterable eqDeletes, Types.StructType eqType) { - try (CloseableIterable deletes = eqDeletes) { + public static StructLikeSet toEqualitySet(CloseableIterable eqDeletes, + Types.StructType eqType) { + try (CloseableIterable deletes = eqDeletes) { StructLikeSet deleteSet = StructLikeSet.create(eqType); Iterables.addAll(deleteSet, deletes); return deleteSet; diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 40a77a11334d..1e1c755848e6 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -23,6 +23,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Predicate; import org.apache.iceberg.Accessor; import org.apache.iceberg.DataFile; @@ -32,6 +35,7 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.SystemProperties; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; @@ -49,6 +53,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Multimap; import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Filter; @@ -62,6 +68,13 @@ public abstract class DeleteFilter { MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS); + private static final int READ_DELETES_WORKER_POOL_SIZE_DEFAULT = 0; // read delete files in serial. + private static final int READ_DELETES_WORKER_POOL_SIZE = SystemProperties.getInteger( + SystemProperties.READ_DELETE_FILES_WORKER_POOL_SIZE, READ_DELETES_WORKER_POOL_SIZE_DEFAULT); + private static final ExecutorService READ_DELETES_SERVICE = READ_DELETES_WORKER_POOL_SIZE <= 1 ? null : + MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool( + READ_DELETES_WORKER_POOL_SIZE, new ThreadFactoryBuilder().setNameFormat("Read-delete-Service-%d").build())); + private final long setFilterThreshold; private final DataFile dataFile; private final List posDeletes; @@ -137,13 +150,10 @@ private List> applyEqDeletes() { Iterable> deleteRecords = Iterables.transform(deletes, delete -> openDeletes(delete, deleteSchema)); - // copy the delete records because they will be held in a set - CloseableIterable records = CloseableIterable.transform( - CloseableIterable.concat(deleteRecords), Record::copy); - StructLikeSet deleteSet = Deletes.toEqualitySet( CloseableIterable.transform( - records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)), + CloseableIterable.combine(deleteRecords, READ_DELETES_SERVICE, READ_DELETES_WORKER_POOL_SIZE), + record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)), deleteSchema.asStruct()); Predicate isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); @@ -196,7 +206,8 @@ private CloseableIterable applyPosDeletes(CloseableIterable records) { if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) { return Deletes.filter( records, this::pos, - Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes))); + Deletes.toPositionSet(dataFile.path(), + CloseableIterable.combine(deletes, READ_DELETES_SERVICE, READ_DELETES_WORKER_POOL_SIZE))); } return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes)); @@ -212,14 +223,12 @@ private CloseableIterable openDeletes(DeleteFile deleteFile, Schema dele case AVRO: return Avro.read(input) .project(deleteSchema) - .reuseContainers() .createReaderFunc(DataReader::create) .build(); case PARQUET: Parquet.ReadBuilder builder = Parquet.read(input) .project(deleteSchema) - .reuseContainers() .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema)); if (deleteFile.content() == FileContent.POSITION_DELETES) {