Skip to content

Commit

Permalink
Add FileScanConfig::new() API, update code to use new API
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 22, 2024
1 parent 513d8d1 commit d7e0462
Show file tree
Hide file tree
Showing 24 changed files with 331 additions and 443 deletions.
16 changes: 5 additions & 11 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use std::{sync::Arc, vec};

use datafusion::common::Statistics;
use datafusion::{
assert_batches_eq,
datasource::{
Expand Down Expand Up @@ -58,16 +57,11 @@ async fn main() -> Result<()> {

let path = std::path::Path::new(&path).canonicalize()?;

let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new(path.display().to_string(), 10)]],
statistics: Statistics::new_unknown(&schema),
projection: Some(vec![12, 0]),
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
};
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));

let result =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
Expand Down
16 changes: 5 additions & 11 deletions datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::{
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use datafusion_common::Statistics;

use futures::StreamExt;
use object_store::ObjectStore;
Expand Down Expand Up @@ -61,16 +60,11 @@ async fn main() -> Result<()> {
Arc::new(object_store),
);

let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]],
statistics: Statistics::new_unknown(&schema),
projection: Some(vec![1, 0]),
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
};
let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10));

let result =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
Expand Down
15 changes: 5 additions & 10 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,11 @@ pub(crate) mod test_util {
let exec = format
.create_physical_plan(
state,
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema,
file_groups,
statistics,
projection,
limit,
table_partition_cols: vec![],
output_ordering: vec![],
},
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_groups(file_groups)
.with_statistics(statistics)
.with_projection(projection)
.with_limit(limit),
None,
)
.await?;
Expand Down
17 changes: 7 additions & 10 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,16 +805,13 @@ impl TableProvider for ListingTable {
.format
.create_physical_plan(
state,
FileScanConfig {
object_store_url,
file_schema: Arc::clone(&self.file_schema),
file_groups: partitioned_file_lists,
statistics,
projection: projection.cloned(),
limit,
output_ordering,
table_partition_cols,
},
FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema))
.with_file_groups(partitioned_file_lists)
.with_statistics(statistics)
.with_projection(projection.cloned())
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols),
filters.as_ref(),
)
.await
Expand Down
55 changes: 23 additions & 32 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,11 @@ mod tests {
.infer_schema(&state, &store, &[meta.clone()])
.await?;

let avro_exec = AvroExec::new(FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![meta.into()]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection: Some(vec![0, 1, 2]),
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
});
let avro_exec = AvroExec::new(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2])),
);
assert_eq!(
avro_exec
.properties()
Expand Down Expand Up @@ -350,16 +345,11 @@ mod tests {
// Include the missing column in the projection
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);

let avro_exec = AvroExec::new(FileScanConfig {
object_store_url,
file_groups: vec![vec![meta.into()]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
projection,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
});
let avro_exec = AvroExec::new(
FileScanConfig::new(object_store_url, file_schema)
.with_file(meta.into())
.with_projection(projection),
);
assert_eq!(
avro_exec
.properties()
Expand Down Expand Up @@ -424,18 +414,19 @@ mod tests {
let mut partitioned_file = PartitionedFile::from(meta);
partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")];

let avro_exec = AvroExec::new(FileScanConfig {
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
object_store_url,
file_groups: vec![vec![partitioned_file]],
statistics: Statistics::new_unknown(&file_schema),
file_schema,
limit: None,
table_partition_cols: vec![Field::new("date", DataType::Utf8, false)],
output_ordering: vec![],
});
let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
let avro_exec = AvroExec::new(
FileScanConfig::new(object_store_url, file_schema)
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
.with_projection(projection)
.with_file(partitioned_file)
.with_table_partition_cols(vec![Field::new(
"date",
DataType::Utf8,
false,
)]),
);
assert_eq!(
avro_exec
.properties()
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![0, 2, 4]);

let csv = CsvExec::new(
Expand Down Expand Up @@ -627,7 +627,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![4, 0, 2]);

let csv = CsvExec::new(
Expand Down Expand Up @@ -693,7 +693,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);

let csv = CsvExec::new(
Expand Down Expand Up @@ -756,7 +756,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);

let csv = CsvExec::new(
Expand Down Expand Up @@ -809,7 +809,7 @@ mod tests {
tmp_dir.path(),
)?;

let mut config = partitioned_csv_config(file_schema, file_groups)?;
let mut config = partitioned_csv_config(file_schema, file_groups);

// Add partition columns
config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)];
Expand Down Expand Up @@ -914,7 +914,7 @@ mod tests {
)
.unwrap();

let config = partitioned_csv_config(file_schema, file_groups).unwrap();
let config = partitioned_csv_config(file_schema, file_groups);
let csv = CsvExec::new(
config,
true,
Expand Down
Loading

0 comments on commit d7e0462

Please sign in to comment.