Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FileScanConfig::new() API #10623

Merged
merged 6 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use std::{sync::Arc, vec};

use datafusion::common::Statistics;
use datafusion::{
assert_batches_eq,
datasource::{
Expand Down Expand Up @@ -58,16 +57,11 @@ async fn main() -> Result<()> {

let path = std::path::Path::new(&path).canonicalize()?;

let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new(path.display().to_string(), 10)]],
statistics: Statistics::new_unknown(&schema),
projection: Some(vec![12, 0]),
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
};
let scan_config =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty good example of what the new API allows for:

  1. Less code (defaults are set)
  2. Makes it clearer what is the default and what is not

FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));

let result =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
Expand Down
16 changes: 5 additions & 11 deletions datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::{
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use datafusion_common::Statistics;

use futures::StreamExt;
use object_store::ObjectStore;
Expand Down Expand Up @@ -61,16 +60,11 @@ async fn main() -> Result<()> {
Arc::new(object_store),
);

let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]],
statistics: Statistics::new_unknown(&schema),
projection: Some(vec![1, 0]),
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
};
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10));

let result =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
Expand Down
15 changes: 5 additions & 10 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,11 @@ pub(crate) mod test_util {
let exec = format
.create_physical_plan(
state,
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema,
file_groups,
statistics,
projection,
limit,
table_partition_cols: vec![],
output_ordering: vec![],
},
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_groups(file_groups)
.with_statistics(statistics)
.with_projection(projection)
.with_limit(limit),
None,
)
.await?;
Expand Down
17 changes: 7 additions & 10 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,16 +805,13 @@ impl TableProvider for ListingTable {
.format
.create_physical_plan(
state,
FileScanConfig {
object_store_url,
file_schema: Arc::clone(&self.file_schema),
file_groups: partitioned_file_lists,
statistics,
projection: projection.cloned(),
limit,
output_ordering,
table_partition_cols,
},
FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema))
.with_file_groups(partitioned_file_lists)
.with_statistics(statistics)
.with_projection(projection.cloned())
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols),
filters.as_ref(),
)
.await
Expand Down
55 changes: 23 additions & 32 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,11 @@ mod tests {
.infer_schema(&state, &store, &[meta.clone()])
.await?;

let avro_exec = AvroExec::new(FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![meta.into()]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection: Some(vec![0, 1, 2]),
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
});
let avro_exec = AvroExec::new(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2])),
);
assert_eq!(
avro_exec
.properties()
Expand Down Expand Up @@ -348,16 +343,11 @@ mod tests {
// Include the missing column in the projection
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);

let avro_exec = AvroExec::new(FileScanConfig {
object_store_url,
file_groups: vec![vec![meta.into()]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
});
let avro_exec = AvroExec::new(
FileScanConfig::new(object_store_url, file_schema)
.with_file(meta.into())
.with_projection(projection),
);
assert_eq!(
avro_exec
.properties()
Expand Down Expand Up @@ -422,18 +412,19 @@ mod tests {
let mut partitioned_file = PartitionedFile::from(meta);
partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")];

let avro_exec = AvroExec::new(FileScanConfig {
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
object_store_url,
file_groups: vec![vec![partitioned_file]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
limit: None,
table_partition_cols: vec![Field::new("date", DataType::Utf8, false)],
output_ordering: vec![],
});
let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
let avro_exec = AvroExec::new(
FileScanConfig::new(object_store_url, file_schema)
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
.with_projection(projection)
.with_file(partitioned_file)
.with_table_partition_cols(vec![Field::new(
"date",
DataType::Utf8,
false,
)]),
);
assert_eq!(
avro_exec
.properties()
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![0, 2, 4]);

let csv = CsvExec::new(
Expand Down Expand Up @@ -627,7 +627,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![4, 0, 2]);

let csv = CsvExec::new(
Expand Down Expand Up @@ -693,7 +693,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);

let csv = CsvExec::new(
Expand Down Expand Up @@ -756,7 +756,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);

let csv = CsvExec::new(
Expand Down Expand Up @@ -809,7 +809,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);

// Add partition columns
config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)];
Expand Down Expand Up @@ -914,7 +914,7 @@ mod tests {
)
.unwrap();

let config = partitioned_csv_config(file_schema, file_groups).unwrap();
let config = partitioned_csv_config(file_schema, file_groups);
let csv = CsvExec::new(
config,
true,
Expand Down
124 changes: 114 additions & 10 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,41 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {

/// The base configurations to provide when creating a physical plan for
/// any given file format.
///
/// # Example
/// ```
/// # use std::sync::Arc;
/// # use arrow_schema::Schema;
/// use datafusion::datasource::listing::PartitionedFile;
/// # use datafusion::datasource::physical_plan::FileScanConfig;
/// # use datafusion_execution::object_store::ObjectStoreUrl;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 maybe I should change this example to use the with style 🤔

/// # let file_schema = Arc::new(Schema::empty());
/// // create FileScan config for reading data from file://
/// let object_store_url = ObjectStoreUrl::local_filesystem();
/// let config = FileScanConfig::new(object_store_url, file_schema)
/// .with_limit(Some(1000)) // read only the first 1000 records
/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3
/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
/// .with_file(PartitionedFile::new("file1.parquet", 1234))
/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
/// // in a single row group
/// .with_file_group(vec![
/// PartitionedFile::new("file2.parquet", 56),
/// PartitionedFile::new("file3.parquet", 78),
/// ]);
/// ```
#[derive(Clone)]
pub struct FileScanConfig {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the new API and documentation. Note this is entirely backwards compatible (I did not change any fields to non pub, etc)

/// Object store URL, used to get an [`ObjectStore`] instance from
/// [`RuntimeEnv::object_store`]
///
/// This `ObjectStoreUrl` should be the prefix of the absolute url for files
/// as `file://` or `s3://my_bucket`. It should not include the path to the
/// file itself. The relevant URL prefix must be registered via
/// [`RuntimeEnv::register_object_store`]
///
/// [`ObjectStore`]: object_store::ObjectStore
/// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
/// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
pub object_store_url: ObjectStoreUrl,
/// Schema before `projection` is applied. It contains the all columns that may
Expand All @@ -87,6 +116,7 @@ pub struct FileScanConfig {
/// sequentially, one after the next.
pub file_groups: Vec<Vec<PartitionedFile>>,
/// Estimated overall statistics of the files, taking `filters` into account.
/// Defaults to [`Statistics::new_unknown`].
pub statistics: Statistics,
/// Columns on which to project the data. Indexes that are higher than the
/// number of columns of `file_schema` refer to `table_partition_cols`.
Expand All @@ -101,6 +131,86 @@ pub struct FileScanConfig {
}

impl FileScanConfig {
/// Create a new `FileScanConfig` with default settings for scanning files.
///
/// See example on [`FileScanConfig`]
///
/// No file groups are added by default. See [`Self::with_file`], [`Self::with_file_group]` and
/// [`Self::with_file_groups`].
///
/// # Parameters:
/// * `object_store_url`: See [`Self::object_store_url`]
/// * `file_schema`: See [`Self::file_schema`]
pub fn new(object_store_url: ObjectStoreUrl, file_schema: SchemaRef) -> Self {
let statistics = Statistics::new_unknown(&file_schema);
Self {
object_store_url,
file_schema,
file_groups: vec![],
statistics,
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
}
}

/// Set the statistics of the files
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = statistics;
self
}

/// Set the projection of the files
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}

/// Set the limit of the files
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}

/// Add a file as a single group
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having to wrap a single file in a vec![vec![..]] makes sense from the implementation point of view, but most users shouldn't have to worry about this

///
/// See [Self::file_groups] for more information.
pub fn with_file(self, file: PartitionedFile) -> Self {
self.with_file_group(vec![file])
}

/// Add the file groups
///
/// See [Self::file_groups] for more information.
pub fn with_file_groups(
mut self,
mut file_groups: Vec<Vec<PartitionedFile>>,
) -> Self {
self.file_groups.append(&mut file_groups);
self
}

/// Add a new file group
///
/// See [Self::file_groups] for more information
pub fn with_file_group(mut self, file_group: Vec<PartitionedFile>) -> Self {
self.file_groups.push(file_group);
self
}

/// Set the partitioning columns of the files
pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
self.table_partition_cols = table_partition_cols;
self
}

/// Set the output ordering of the files
pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
self.output_ordering = output_ordering;
self
}

/// Project the schema and the statistics on the given column indices
pub fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) {
if self.projection.is_none() && self.table_partition_cols.is_empty() {
Expand Down Expand Up @@ -1117,16 +1227,10 @@ mod tests {
statistics: Statistics,
table_partition_cols: Vec<Field>,
) -> FileScanConfig {
FileScanConfig {
file_schema,
file_groups: vec![vec![]],
limit: None,
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
projection,
statistics,
table_partition_cols,
output_ordering: vec![],
}
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), file_schema)
.with_projection(projection)
.with_statistics(statistics)
.with_table_partition_cols(table_partition_cols)
}

/// Convert partition columns from Vec<String DataType> to Vec<Field>
Expand Down
Loading