Skip to content

Conversation

@Reo-LEI
Copy link
Contributor

@Reo-LEI Reo-LEI commented Sep 15, 2021

This PR is trying to close #3118.

I move ParallelIterable to api module and use it impl parallel concat api in CloseableIterable. Now we could use the parallel concat to replace the serial concat and read delete files in parallel when DeleteFilter construct the equalitySet and positionSet.

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Sep 15, 2021

@rdblue @aokolnychyi @kbendick @openinx @stevenzwu Could you take a look of this? :)

case AVRO:
return Avro.read(input)
.project(deleteSchema)
.reuseContainers()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is needed because records are now placed on a queue inside the parallel iterator. Reusing the record instance instead of copying would cause a problem.

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Sep 22, 2021

I have been make the parallelize in configurable and let it run in a separate thread pool.
Could you help review this PR again? @rdblue @stevenzwu @kbendick @jackye1995

@rdblue
Copy link
Contributor

rdblue commented Sep 27, 2021

Thanks, @Reo-LEI. I'll take another look.

Iterables.transform(snapshots, snapshot -> (Iterable<ManifestFile>) () -> snapshot.dataManifests().iterator()),
ThreadPools.getWorkerPool())) {
try (CloseableIterable<ManifestFile> iterable = CloseableIterable.combine(
Iterables.transform(snapshots, snapshot -> CloseableIterable.withNoopClose(snapshot.dataManifests())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this should add a noop close here. Can you avoid adding this in all the updated calls to create a parallel iterable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can't avoid adding this, becasue combine will call concat when workerPool is null, and concat receive Iterable<CloseableIterable<E>> but not Iterable<Iterable<E>>. So I need to add a noop close to wrap it and pass to combine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does need to be fixed. ParallelIterable was constructed using Iterable<? extends Iterable<T>>. There should be a version of CloseableIterable.combine that accepts the same type. That may mean updating CloseableIterable.concat to accept the same ? extends Iterable<T> in addition to strictly a CloseableIterable<T>. But that should be okay, since you can update to close the iterable if it is closeable.

try (CloseableIterable<StructLike> deletes = eqDeletes) {
public static <T extends StructLike> StructLikeSet toEqualitySet(CloseableIterable<T> eqDeletes,
Types.StructType eqType) {
try (CloseableIterable<T> deletes = eqDeletes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this change needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid to transform Record to StructLike as your comment #3120 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable.

private ExecutorService readDeletesService;

protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema,
Map<String, String> tableProperties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should not take table properties. I think it should take an executor service instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think DeleteFilter should make sure the reader will read delete files in parallel by itself, if we config it, but not pass a executor service by the caller. So I create the read service internally, and read the config from system properties.

new ThreadFactoryBuilder()
.setNameFormat("Read-delete-Service-%d")
.build()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this executor service cleaned up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we chould create a custom executor and set corePoolSize = 0, maximumPoolSize = poolSzie, workQueue = new LinkedBlockingQueue<Runnable>() and let executor shutdown automatically. Otherwise, we need to call shutdown in filter or add close metod and call by DeleterFilter caller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I make this executor service as static. Now, different filter task can share the same executor service, and we don't need to consider when we should shutdown and cleaned up the executor service.

public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; // 4MB

public static final String READ_DELETE_FILES_WORKER_POOL_SIZE = "read.deletes.num-threads";
public static final int READ_DELETE_FILES_WORKER_POOL_SIZE_DEFAULT = 1; // read delete files in serial.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this makes much sense as a table property. Table properties are for table configuration, but this is an engine concern. Many engines handle parallelism internally so this wouldn't be appropriate. I think that Flink should expose a setting and manage the thread pool for using a delete file reader pool.

Copy link
Contributor Author

@Reo-LEI Reo-LEI Sep 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit that this configuration may not be appropriate as a table property. But I think this optimization should apply to all engines, not only flink. Such as I will sync mysql cdc data to iceberg by flink and rewrite and merge the delete files into data files by spark.

Maybe we chould config this through SystemProperties like we config iceberg.worker.num-threads(https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/SystemProperties.java#L35). And stop propagate the 'tableProperties'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I move this config to SystemProperties. So we can stop propagate the 'tableProperties' and handle the parallelism internally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not possible to have Flink pass in the executor service it chooses to use?

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Nov 1, 2021

I adressed some comment and leave some comment. @rdblue Could you take another looks of this PR? 😄

private final int workerPoolSize;

/**
* @deprecated please use {@link CloseableIterable#combine(Iterable, ExecutorService, int)} instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can omit "please" from documentation so that docs are direct and as short as possible.

Also, can you add "will be removed in 0.14.0"? We like to keep track of when things can be removed.

this.workerPool = workerPool;
// submit 2 tasks per worker at a time
this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
// submit 2 tasks per worker at a time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the non-functional change on this line? We don't want unnecessary changes to cause commit conflicts.


ParallelIterable(Iterable<? extends Iterable<T>> iterables,
ExecutorService workerPool,
int workerPoolSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix the indentation here?

* Run iterables in parallel.
* @deprecated please use {@link CloseableIterable#combine(Iterable, ExecutorService, int)} instead.
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is not deprecated. It is still public and will not be removed. This should be created using CloseableIterable#combine rather than directly using the constructor. Can you remove this deprecation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the module here also changed. I don't think we are allowed to move public class to another module?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, @jackye1995!

*/
public static final String READ_DELETE_FILES_WORKER_POOL_SIZE = "iceberg.worker.read-deletes-num-threads";

public static boolean getBoolean(String systemProperty, boolean defaultValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said elsewhere, I don't think that this feature should be controlled through a system property. This should be a Flin-specific property for now and we can introduce a similar config for Spark later. But since this violates Spark's threading model on executors, we don't want to make this global.

@rdblue
Copy link
Contributor

rdblue commented Nov 3, 2021

SystemProperties.READ_DELETE_FILES_WORKER_POOL_SIZE, READ_DELETES_WORKER_POOL_SIZE_DEFAULT);
private static final ExecutorService READ_DELETES_SERVICE = READ_DELETES_WORKER_POOL_SIZE <= 1 ? null :
MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(
READ_DELETES_WORKER_POOL_SIZE, new ThreadFactoryBuilder().setNameFormat("Read-delete-Service-%d").build()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the changes other than the ones in this file are to make it easier to use ParallelIterable. Can we separate those changes from these for Flink and DeleteFilter? A separate PR for the CloseableIterable changes would make it easier to review this when there are more Flink changes to handle the executor service and pass it into DeleteFilter.

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Nov 8, 2021

@rdblue @stevenzwu @kbendick @jackye1995 Thanks every one for review! As Ryan's comment, current implementation will violates Spark's threading model. And since I start work on #3323, I think we should improve the read performance by rewrite data files and delete files in time, maybe this PR is not necessary. I will do some test after I finish #3323 and see how much benefit this optimization can bring, and then consider whether we still need this optimization. Thanks every one's work again!

@github-actions
Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jul 22, 2024
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Jul 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Read delete files in parallel.

5 participants