Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for DataFrame::write_table #8531

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 93 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ mod tests {
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::datasource::file_format::test_util::scan_format;
use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::listing::{FileRange, ListingOptions, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
use crate::physical_plan::displayable;
Expand All @@ -769,8 +769,8 @@ mod tests {
};
use arrow_array::Date64Array;
use chrono::{TimeZone, Utc};
use datafusion_common::ScalarValue;
use datafusion_common::{assert_contains, ToDFSchema};
use datafusion_common::{FileType, GetExt, ScalarValue};
use datafusion_expr::{col, lit, when, Expr};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -1938,6 +1938,97 @@ mod tests {
Ok(schema)
}

#[tokio::test]
async fn write_table_results() -> Result<()> {
// create partitioned input file and context
let tmp_dir = TempDir::new()?;
// let mut ctx = create_ctx(&tmp_dir, 4).await?;
let ctx = SessionContext::new_with_config(
SessionConfig::new().with_target_partitions(8),
);
let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
// register csv file with the execution context
ctx.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new().schema(&schema),
)
.await?;

// register a local file system object store for /tmp directory
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);

// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext());

// execute a simple query and write the results to parquet
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
std::fs::create_dir(&out_dir).unwrap();
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
let schema: Schema = df.schema().into();
// Register a listing table - this will use all files in the directory as data sources
// for the query
ctx.register_listing_table(
"my_table",
&out_dir,
listing_options,
Some(Arc::new(schema)),
None,
)
.await
.unwrap();
df.write_table("my_table", DataFrameWriteOptions::new())
.await?;

// create a new context and verify that the results were saved to a partitioned parquet file
let ctx = SessionContext::new();

// get write_id
let mut paths = fs::read_dir(&out_dir).unwrap();
let path = paths.next();
let name = path
.unwrap()?
.path()
.file_name()
.expect("Should be a file name")
.to_str()
.expect("Should be a str")
.to_owned();
println!("{name}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets remove it

let (parsed_id, _) = name.split_once('_').expect("File should contain _ !");
let write_id = parsed_id.to_owned();

// register each partition as well as the top level dir
ctx.register_parquet(
"part0",
&format!("{out_dir}/{write_id}_0.parquet"),
ParquetReadOptions::default(),
)
.await?;

ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default())
.await?;

let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
let allparts = ctx
.sql("SELECT c1, c2 FROM allparts")
.await?
.collect()
.await?;

let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();

assert_eq!(part0[0].schema(), allparts[0].schema());

assert_eq!(allparts_count, 40);

Ok(())
}

#[tokio::test]
async fn write_parquet_results() -> Result<()> {
// create partitioned input file and context
Expand Down