Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.exceptions.RuntimeIOException;
Expand Down Expand Up @@ -117,6 +118,18 @@ public O next() {
};
}

static <E> CloseableIterable<E> combine(Iterable<CloseableIterable<E>> iterable,
ExecutorService workerPool,
int workerPoolSize) {
if (workerPool == null) {
return concat(iterable);
}

Preconditions.checkArgument(workerPoolSize > 0, "Invalid workerPoolSize (not positive): " + workerPoolSize);

return new ParallelIterable<>(iterable, workerPool, workerPoolSize);
}

static <E> CloseableIterable<E> concat(Iterable<CloseableIterable<E>> iterable) {
Iterator<CloseableIterable<E>> iterables = iterable.iterator();
if (!iterables.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iceberg.util;
package org.apache.iceberg.io;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -28,25 +28,39 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

/**
* Run iterables in parallel.
* @deprecated please use {@link CloseableIterable#combine(Iterable, ExecutorService, int)} instead.
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is not deprecated. It is still public and will not be removed. This should be created using CloseableIterable#combine rather than directly using the constructor. Can you remove this deprecation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the module here also changed. I don't think we are allowed to move public class to another module?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, @jackye1995!

public class ParallelIterable<T> extends CloseableGroup implements CloseableIterable<T> {
private final Iterable<? extends Iterable<T>> iterables;
private final ExecutorService workerPool;
private final int workerPoolSize;

/**
* @deprecated please use {@link CloseableIterable#combine(Iterable, ExecutorService, int)} instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can omit "please" from documentation so that docs are direct and as short as possible.

Also, can you add "will be removed in 0.14.0"? We like to keep track of when things can be removed.

*/
@Deprecated
public ParallelIterable(Iterable<? extends Iterable<T>> iterables,
ExecutorService workerPool) {
this(iterables, workerPool, Runtime.getRuntime().availableProcessors());
}

ParallelIterable(Iterable<? extends Iterable<T>> iterables,
ExecutorService workerPool,
int workerPoolSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix the indentation here?

this.iterables = iterables;
this.workerPool = workerPool;
this.workerPoolSize = workerPoolSize;
}

@Override
public CloseableIterator<T> iterator() {
ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool);
ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool, workerPoolSize);
addCloseable(iter);
return iter;
}
Expand All @@ -59,7 +73,8 @@ private static class ParallelIterator<T> implements CloseableIterator<T> {
private boolean closed = false;

private ParallelIterator(Iterable<? extends Iterable<T>> iterables,
ExecutorService workerPool) {
ExecutorService workerPool,
int workerPoolSize) {
this.tasks = Iterables.transform(iterables, iterable ->
(Runnable) () -> {
try (Closeable ignored = (iterable instanceof Closeable) ?
Expand All @@ -72,8 +87,8 @@ private ParallelIterator(Iterable<? extends Iterable<T>> iterables,
}
}).iterator();
this.workerPool = workerPool;
// submit 2 tasks per worker at a time
this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
// submit 2 tasks per worker at a time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the non-functional change on this line? We don't want unnecessary changes to cause commit conflicts.

this.taskFutures = new Future[workerPoolSize * 2];
}

@Override
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;

/**
Expand Down Expand Up @@ -128,9 +127,9 @@ protected CloseableIterable<FileScanTask> planFiles(
}

private static CloseableIterable<ManifestFile> allDataManifestFiles(List<Snapshot> snapshots) {
try (CloseableIterable<ManifestFile> iterable = new ParallelIterable<>(
Iterables.transform(snapshots, snapshot -> (Iterable<ManifestFile>) () -> snapshot.dataManifests().iterator()),
ThreadPools.getWorkerPool())) {
try (CloseableIterable<ManifestFile> iterable = CloseableIterable.combine(
Iterables.transform(snapshots, snapshot -> CloseableIterable.withNoopClose(snapshot.dataManifests())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this should add a noop close here. Can you avoid adding this in all the updated calls to create a parallel iterable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can't avoid adding this, becasue combine will call concat when workerPool is null, and concat receive Iterable<CloseableIterable<E>> but not Iterable<Iterable<E>>. So I need to add a noop close to wrap it and pass to combine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does need to be fixed. ParallelIterable was constructed using Iterable<? extends Iterable<T>>. There should be a version of CloseableIterable.combine that accepts the same type. That may mean updating CloseableIterable.concat to accept the same ? extends Iterable<T> in addition to strictly a CloseableIterable<T>. But that should be okay, since you can update to close the iterable if it is closeable.

ThreadPools.getWorkerPool(), ThreadPools.WORKER_THREAD_POOL_SIZE)) {
return CloseableIterable.withNoopClose(Sets.newHashSet(iterable));
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close parallel iterable");
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/java/org/apache/iceberg/AllEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;

/**
Expand Down Expand Up @@ -114,9 +113,9 @@ protected CloseableIterable<FileScanTask> planFiles(
}

private static CloseableIterable<ManifestFile> allManifestFiles(List<Snapshot> snapshots) {
try (CloseableIterable<ManifestFile> iterable = new ParallelIterable<>(
Iterables.transform(snapshots, snapshot -> (Iterable<ManifestFile>) () -> snapshot.allManifests().iterator()),
ThreadPools.getWorkerPool())) {
try (CloseableIterable<ManifestFile> iterable = CloseableIterable.combine(
Iterables.transform(snapshots, snapshot -> CloseableIterable.withNoopClose(snapshot.allManifests())),
ThreadPools.getWorkerPool(), ThreadPools.WORKER_THREAD_POOL_SIZE)) {
return CloseableIterable.withNoopClose(Sets.newHashSet(iterable));
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close parallel iterable");
Expand Down
8 changes: 2 additions & 6 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;

class ManifestGroup {
private static final Types.StructType EMPTY_STRUCT = Types.StructType.of();
Expand Down Expand Up @@ -182,11 +182,7 @@ public CloseableIterable<FileScanTask> planFiles() {
}
});

if (executorService != null) {
return new ParallelIterable<>(tasks, executorService);
} else {
return CloseableIterable.concat(tasks);
}
return CloseableIterable.combine(tasks, executorService, ThreadPools.WORKER_THREAD_POOL_SIZE);
}

/**
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/java/org/apache/iceberg/SystemProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,24 @@ private SystemProperties() {
*/
public static final String SCAN_THREAD_POOL_ENABLED = "iceberg.scan.plan-in-worker-pool";

static boolean getBoolean(String systemProperty, boolean defaultValue) {
/**
* Whether to use the shared worker pool when planning table scans.
*/
public static final String READ_DELETE_FILES_WORKER_POOL_SIZE = "iceberg.worker.read-deletes-num-threads";

public static boolean getBoolean(String systemProperty, boolean defaultValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said elsewhere, I don't think that this feature should be controlled through a system property. This should be a Flin-specific property for now and we can introduce a similar config for Spark later. But since this violates Spark's threading model on executors, we don't want to make this global.

String value = System.getProperty(systemProperty);
if (value != null) {
return Boolean.parseBoolean(value);
}
return defaultValue;
}

public static int getInteger(String systemProperty, int defaultValue) {
String value = System.getProperty(systemProperty);
if (value != null) {
return Integer.parseInt(value);
}
return defaultValue;
}
}
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, Functio
return filter.filter(rows);
}

public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> eqDeletes, Types.StructType eqType) {
try (CloseableIterable<StructLike> deletes = eqDeletes) {
public static <T extends StructLike> StructLikeSet toEqualitySet(CloseableIterable<T> eqDeletes,
Types.StructType eqType) {
try (CloseableIterable<T> deletes = eqDeletes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this change needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid to transform Record to StructLike as your comment #3120 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable.

StructLikeSet deleteSet = StructLikeSet.create(eqType);
Iterables.addAll(deleteSet, deletes);
return deleteSet;
Expand Down
25 changes: 17 additions & 8 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DataFile;
Expand All @@ -32,6 +35,7 @@
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.SystemProperties;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
Expand All @@ -49,6 +53,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Filter;
Expand All @@ -62,6 +68,13 @@ public abstract class DeleteFilter<T> {
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS);

private static final int READ_DELETES_WORKER_POOL_SIZE_DEFAULT = 0; // read delete files in serial.
private static final int READ_DELETES_WORKER_POOL_SIZE = SystemProperties.getInteger(
SystemProperties.READ_DELETE_FILES_WORKER_POOL_SIZE, READ_DELETES_WORKER_POOL_SIZE_DEFAULT);
private static final ExecutorService READ_DELETES_SERVICE = READ_DELETES_WORKER_POOL_SIZE <= 1 ? null :
MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(
READ_DELETES_WORKER_POOL_SIZE, new ThreadFactoryBuilder().setNameFormat("Read-delete-Service-%d").build()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the changes other than the ones in this file are to make it easier to use ParallelIterable. Can we separate those changes from these for Flink and DeleteFilter? A separate PR for the CloseableIterable changes would make it easier to review this when there are more Flink changes to handle the executor service and pass it into DeleteFilter.


private final long setFilterThreshold;
private final DataFile dataFile;
private final List<DeleteFile> posDeletes;
Expand Down Expand Up @@ -137,13 +150,10 @@ private List<Predicate<T>> applyEqDeletes() {
Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
delete -> openDeletes(delete, deleteSchema));

// copy the delete records because they will be held in a set
CloseableIterable<Record> records = CloseableIterable.transform(
CloseableIterable.concat(deleteRecords), Record::copy);

StructLikeSet deleteSet = Deletes.toEqualitySet(
CloseableIterable.transform(
records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)),
CloseableIterable.combine(deleteRecords, READ_DELETES_SERVICE, READ_DELETES_WORKER_POOL_SIZE),
record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)),
deleteSchema.asStruct());

Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
Expand Down Expand Up @@ -196,7 +206,8 @@ private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
return Deletes.filter(
records, this::pos,
Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes)));
Deletes.toPositionSet(dataFile.path(),
CloseableIterable.combine(deletes, READ_DELETES_SERVICE, READ_DELETES_WORKER_POOL_SIZE)));
}

return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes));
Expand All @@ -212,14 +223,12 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
case AVRO:
return Avro.read(input)
.project(deleteSchema)
.reuseContainers()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is needed because records are now placed on a queue inside the parallel iterator. Reusing the record instance instead of copying would cause a problem.

.createReaderFunc(DataReader::create)
.build();

case PARQUET:
Parquet.ReadBuilder builder = Parquet.read(input)
.project(deleteSchema)
.reuseContainers()
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema));

if (deleteFile.content() == FileContent.POSITION_DELETES) {
Expand Down