Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should parallel collecting statistics like infer schema? #7573

Closed
hengfeiyang opened this issue Sep 15, 2023 · 7 comments
Closed

Should parallel collecting statistics like infer schema? #7573

hengfeiyang opened this issue Sep 15, 2023 · 7 comments
Labels
enhancement New feature or request

Comments

@hengfeiyang
Copy link
Contributor

hengfeiyang commented Sep 15, 2023

Is your feature request related to a problem or challenge?

When i searched data from s3 I found Datafusion fetches parquet file metadata one by one, it is a bit slow when I have many files.

The code is here:

https://github.com/apache/arrow-datafusion/blob/31.0.0/datafusion/core/src/datasource/listing/table.rs#L960C1-L985

I found this code uses iter::then, and next it will fetch data one by one.

But I found something here:

https://github.com/apache/arrow-datafusion/blob/31.0.0/datafusion/core/src/datasource/file_format/parquet.rs#L170-L175

When fetching schema it uses concurrent requests.

iter::map().boxed().buffered(SCHEMA_INFERENCE_CONCURRENCY)

Is possible to do the same things here? user concurrent request for collecting statistics?

Describe the solution you'd like

Actually i tried change the code:

https://github.com/apache/arrow-datafusion/blob/31.0.0/datafusion/core/src/datasource/listing/table.rs#L960C1-L985

        let files = file_list.then(|part_file| async {
                let part_file = part_file?;
                let statistics = if self.options.collect_stat {
                    match self.collected_statistics.get(&part_file.object_meta) {
                        Some(statistics) => statistics,
                        None => {
                            let statistics = self
                                .options
                                .format
                                .infer_stats(
                                    ctx,
                                    &store,
                                    self.file_schema.clone(),
                                    &part_file.object_meta,
                                )
                                .await?;
                            self.collected_statistics
                                .save(part_file.object_meta.clone(), statistics.clone());
                            statistics
                        }
                    }
                } else {
                    Statistics::default()
                };
                Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
            });

To this:

          let files = file_list.map(|part_file| async {
                let part_file = part_file?;
                let statistics = if self.options.collect_stat {
                    match self.collected_statistics.get(&part_file.object_meta) {
                        Some(statistics) => statistics,
                        None => {
                            let statistics = self
                                .options
                                .format
                                .infer_stats(
                                    ctx,
                                    &store,
                                    self.file_schema.clone(),
                                    &part_file.object_meta,
                                )
                                .await?;
                            self.collected_statistics
                                .save(part_file.object_meta.clone(), statistics.clone());
                            statistics
                        }
                    }
                } else {
                    Statistics::default()
                };
                Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
            })
            .boxed()
            .buffered(COLLECT_STATISTICS_CONCURRENCY);

And set a const variable:

const COLLECT_STATISTICS_CONCURRENCY: usize = 32;

The search speed is much improved in my local because it can concurrently fetch parquet files from s3 to collect statistics, earlier it requested files one by one. to

Describe alternatives you've considered

No response

Additional context

No response

@hengfeiyang hengfeiyang added the enhancement New feature or request label Sep 15, 2023
@alamb
Copy link
Contributor

alamb commented Sep 18, 2023

Hi @hengfeiyang -- this sound like a great idea to me

Rather than hard coding the concurrency, perhaps you can add a config parameter, perhaps like ExecutionOptions::meta_fetch_concurrency, following the model of https://docs.rs/datafusion/latest/datafusion/config/struct.ExecutionOptions.html#structfield.planning_concurrency

The rationale for a configuration setting is that the optimal value will likely depend on the network configuration of the system, so there is no good constant that would work in all cases.

Perhaps it can default to 10 or 32 ?

cc @Ted-Jiang who I think was working on some other settings to cache statistics for multiple queries in the same session. See #7570

@hengfeiyang
Copy link
Contributor Author

@alamb You are right, we should add an option for it. the const 32 I just copy from here: https://github.com/apache/arrow-datafusion/blob/31.0.0/datafusion/core/src/datasource/file_format/parquet.rs#L68

Maybe we should both change it.

@hengfeiyang
Copy link
Contributor Author

hengfeiyang commented Sep 19, 2023

@Ted-Jiang So, you will improve this part in your PR, I don't need to create another PR for this, Right?

@alamb
Copy link
Contributor

alamb commented Sep 19, 2023

@Ted-Jiang 's PR is merged, so this change would need a follow on PR

@hengfeiyang
Copy link
Contributor Author

@alamb Okay, I will do it. thanks.

@alamb
Copy link
Contributor

alamb commented Sep 19, 2023

Thank you @hengfeiyang -- most appreciated

hengfeiyang added a commit to hengfeiyang/arrow-datafusion that referenced this issue Sep 19, 2023
Dandandan pushed a commit that referenced this issue Sep 21, 2023
* feat: parallel collecting parquet files statistics #7573

* fix: cargo clippy format

* docs: add doc for execution.meta_fetch_concurrency

* feat: change the default value to 32 for execution.meta_fetch_concurrency
@alamb
Copy link
Contributor

alamb commented Nov 27, 2023

Closed in #7595

@alamb alamb closed this as completed Nov 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants