Skip to content

Commit

Permalink
Add FileScanConfig::new() API (apache#10623)
Browse files Browse the repository at this point in the history
* Add FileScanConfig::new() API, update code to use new API

* Remove add_* api
  • Loading branch information
alamb authored and jayzhan211 committed May 26, 2024
1 parent cfd61b2 commit ff9382c
Show file tree
Hide file tree
Showing 24 changed files with 316 additions and 443 deletions.
16 changes: 5 additions & 11 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use std::{sync::Arc, vec};

use datafusion::common::Statistics;
use datafusion::{
assert_batches_eq,
datasource::{
Expand Down Expand Up @@ -58,16 +57,11 @@ async fn main() -> Result<()> {

let path = std::path::Path::new(&path).canonicalize()?;

let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new(path.display().to_string(), 10)]],
statistics: Statistics::new_unknown(&schema),
projection: Some(vec![12, 0]),
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
};
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));

let result =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
Expand Down
16 changes: 5 additions & 11 deletions datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::{
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use datafusion_common::Statistics;

use futures::StreamExt;
use object_store::ObjectStore;
Expand Down Expand Up @@ -61,16 +60,11 @@ async fn main() -> Result<()> {
Arc::new(object_store),
);

let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]],
statistics: Statistics::new_unknown(&schema),
projection: Some(vec![1, 0]),
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
};
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10));

let result =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
Expand Down
15 changes: 5 additions & 10 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,11 @@ pub(crate) mod test_util {
let exec = format
.create_physical_plan(
state,
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema,
file_groups,
statistics,
projection,
limit,
table_partition_cols: vec![],
output_ordering: vec![],
},
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_groups(file_groups)
.with_statistics(statistics)
.with_projection(projection)
.with_limit(limit),
None,
)
.await?;
Expand Down
17 changes: 7 additions & 10 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,16 +805,13 @@ impl TableProvider for ListingTable {
.format
.create_physical_plan(
state,
FileScanConfig {
object_store_url,
file_schema: Arc::clone(&self.file_schema),
file_groups: partitioned_file_lists,
statistics,
projection: projection.cloned(),
limit,
output_ordering,
table_partition_cols,
},
FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema))
.with_file_groups(partitioned_file_lists)
.with_statistics(statistics)
.with_projection(projection.cloned())
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols),
filters.as_ref(),
)
.await
Expand Down
55 changes: 23 additions & 32 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,11 @@ mod tests {
.infer_schema(&state, &store, &[meta.clone()])
.await?;

let avro_exec = AvroExec::new(FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![meta.into()]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection: Some(vec![0, 1, 2]),
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
});
let avro_exec = AvroExec::new(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2])),
);
assert_eq!(
avro_exec
.properties()
Expand Down Expand Up @@ -348,16 +343,11 @@ mod tests {
// Include the missing column in the projection
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);

let avro_exec = AvroExec::new(FileScanConfig {
object_store_url,
file_groups: vec![vec![meta.into()]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
});
let avro_exec = AvroExec::new(
FileScanConfig::new(object_store_url, file_schema)
.with_file(meta.into())
.with_projection(projection),
);
assert_eq!(
avro_exec
.properties()
Expand Down Expand Up @@ -422,18 +412,19 @@ mod tests {
let mut partitioned_file = PartitionedFile::from(meta);
partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")];

let avro_exec = AvroExec::new(FileScanConfig {
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
object_store_url,
file_groups: vec![vec![partitioned_file]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
limit: None,
table_partition_cols: vec![Field::new("date", DataType::Utf8, false)],
output_ordering: vec![],
});
let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
let avro_exec = AvroExec::new(
FileScanConfig::new(object_store_url, file_schema)
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
.with_projection(projection)
.with_file(partitioned_file)
.with_table_partition_cols(vec![Field::new(
"date",
DataType::Utf8,
false,
)]),
);
assert_eq!(
avro_exec
.properties()
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![0, 2, 4]);

let csv = CsvExec::new(
Expand Down Expand Up @@ -627,7 +627,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![4, 0, 2]);

let csv = CsvExec::new(
Expand Down Expand Up @@ -693,7 +693,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);

let csv = CsvExec::new(
Expand Down Expand Up @@ -756,7 +756,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);

let csv = CsvExec::new(
Expand Down Expand Up @@ -809,7 +809,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);

// Add partition columns
config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)];
Expand Down Expand Up @@ -914,7 +914,7 @@ mod tests {
)
.unwrap();

let config = partitioned_csv_config(file_schema, file_groups).unwrap();
let config = partitioned_csv_config(file_schema, file_groups);
let csv = CsvExec::new(
config,
true,
Expand Down
124 changes: 114 additions & 10 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,41 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {

/// The base configurations to provide when creating a physical plan for
/// any given file format.
///
/// # Example
/// ```
/// # use std::sync::Arc;
/// # use arrow_schema::Schema;
/// use datafusion::datasource::listing::PartitionedFile;
/// # use datafusion::datasource::physical_plan::FileScanConfig;
/// # use datafusion_execution::object_store::ObjectStoreUrl;
/// # let file_schema = Arc::new(Schema::empty());
/// // create FileScan config for reading data from file://
/// let object_store_url = ObjectStoreUrl::local_filesystem();
/// let config = FileScanConfig::new(object_store_url, file_schema)
/// .with_limit(Some(1000)) // read only the first 1000 records
/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3
/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
/// .with_file(PartitionedFile::new("file1.parquet", 1234))
/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
/// // in a single row group
/// .with_file_group(vec![
/// PartitionedFile::new("file2.parquet", 56),
/// PartitionedFile::new("file3.parquet", 78),
/// ]);
/// ```
#[derive(Clone)]
pub struct FileScanConfig {
/// Object store URL, used to get an [`ObjectStore`] instance from
/// [`RuntimeEnv::object_store`]
///
/// This `ObjectStoreUrl` should be the prefix of the absolute url for files
/// as `file://` or `s3://my_bucket`. It should not include the path to the
/// file itself. The relevant URL prefix must be registered via
/// [`RuntimeEnv::register_object_store`]
///
/// [`ObjectStore`]: object_store::ObjectStore
/// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
/// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
pub object_store_url: ObjectStoreUrl,
/// Schema before `projection` is applied. It contains the all columns that may
Expand All @@ -87,6 +116,7 @@ pub struct FileScanConfig {
/// sequentially, one after the next.
pub file_groups: Vec<Vec<PartitionedFile>>,
/// Estimated overall statistics of the files, taking `filters` into account.
/// Defaults to [`Statistics::new_unknown`].
pub statistics: Statistics,
/// Columns on which to project the data. Indexes that are higher than the
/// number of columns of `file_schema` refer to `table_partition_cols`.
Expand All @@ -101,6 +131,86 @@ pub struct FileScanConfig {
}

impl FileScanConfig {
/// Create a new `FileScanConfig` with default settings for scanning files.
///
/// See example on [`FileScanConfig`]
///
/// No file groups are added by default. See [`Self::with_file`], [`Self::with_file_group]` and
/// [`Self::with_file_groups`].
///
/// # Parameters:
/// * `object_store_url`: See [`Self::object_store_url`]
/// * `file_schema`: See [`Self::file_schema`]
pub fn new(object_store_url: ObjectStoreUrl, file_schema: SchemaRef) -> Self {
let statistics = Statistics::new_unknown(&file_schema);
Self {
object_store_url,
file_schema,
file_groups: vec![],
statistics,
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
}
}

/// Set the statistics of the files
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = statistics;
self
}

/// Set the projection of the files
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}

/// Set the limit of the files
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}

/// Add a file as a single group
///
/// See [Self::file_groups] for more information.
pub fn with_file(self, file: PartitionedFile) -> Self {
self.with_file_group(vec![file])
}

/// Add the file groups
///
/// See [Self::file_groups] for more information.
pub fn with_file_groups(
mut self,
mut file_groups: Vec<Vec<PartitionedFile>>,
) -> Self {
self.file_groups.append(&mut file_groups);
self
}

/// Add a new file group
///
/// See [Self::file_groups] for more information
pub fn with_file_group(mut self, file_group: Vec<PartitionedFile>) -> Self {
self.file_groups.push(file_group);
self
}

/// Set the partitioning columns of the files
pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
self.table_partition_cols = table_partition_cols;
self
}

/// Set the output ordering of the files
pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
self.output_ordering = output_ordering;
self
}

/// Project the schema and the statistics on the given column indices
pub fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) {
if self.projection.is_none() && self.table_partition_cols.is_empty() {
Expand Down Expand Up @@ -1117,16 +1227,10 @@ mod tests {
statistics: Statistics,
table_partition_cols: Vec<Field>,
) -> FileScanConfig {
FileScanConfig {
file_schema,
file_groups: vec![vec![]],
limit: None,
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
projection,
statistics,
table_partition_cols,
output_ordering: vec![],
}
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), file_schema)
.with_projection(projection)
.with_statistics(statistics)
.with_table_partition_cols(table_partition_cols)
}

/// Convert partition columns from Vec<String DataType> to Vec<Field>
Expand Down
Loading

0 comments on commit ff9382c

Please sign in to comment.