-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
validate and adjust Substrait NamedTable schemas (#12223) #12245
Merged
Merged
Changes from 5 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
429cd2b
fix: producer did not emit base_schema struct field for ReadRel
vbarua a44a689
feat: include field_qualifier param for from_substrait_named_struct
vbarua 86a7339
feat: verify that Substrait and DataFusion agree on NamedScan schemas
vbarua 1bfa887
test: update existing substrait test + substrait validation test
vbarua 2f71997
feat: expose from_substrait_named_struct
vbarua 8634d23
refactor: remove unused imports
vbarua 5051991
docs: add missing licenses
vbarua f52f01c
refactor: deal with unused code warnings
vbarua aa063c7
remove optional qualifier from from_substrait_named_struct
vbarua f9319ba
return DFSchema from from_substrait_named_struct
vbarua 444aad6
one must imagine clippy happy
vbarua c2b0cb0
accidental blah
vbarua 0afb845
loosen the validation for schemas
vbarua 9ba5c4d
minor doc tweaks
vbarua 83529f5
update test data to deal with case issues in tests
vbarua acb2b47
fix error message
vbarua 386264e
improve readability of field compatability check
vbarua f6017eb
make TestSchemaCollector more flexible
vbarua 185bff5
fix doc typo
vbarua aeb6c3d
remove unecessary TODO
vbarua d571eb2
handle ReadRel projection on top of mismatched schema
vbarua File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,40 +19,33 @@ | |
|
||
#[cfg(test)] | ||
mod tests { | ||
use crate::utils::test::TestSchemaCollector; | ||
use datafusion::common::Result; | ||
use datafusion::prelude::{CsvReadOptions, SessionContext}; | ||
use datafusion_substrait::logical_plan::consumer::from_substrait_plan; | ||
use std::fs::File; | ||
use std::io::BufReader; | ||
use substrait::proto::Plan; | ||
|
||
#[tokio::test] | ||
async fn contains_function_test() -> Result<()> { | ||
let ctx = create_context().await?; | ||
|
||
let path = "tests/testdata/contains_plan.substrait.json"; | ||
let proto = serde_json::from_reader::<_, Plan>(BufReader::new( | ||
|
||
let proto_plan = serde_json::from_reader::<_, Plan>(BufReader::new( | ||
File::open(path).expect("file not found"), | ||
)) | ||
.expect("failed to parse json"); | ||
|
||
let plan = from_substrait_plan(&ctx, &proto).await?; | ||
let ctx = TestSchemaCollector::generate_context_from_plan(&proto_plan); | ||
let plan = from_substrait_plan(&ctx, &proto_plan).await?; | ||
|
||
let plan_str = format!("{}", plan); | ||
|
||
assert_eq!( | ||
plan_str, | ||
"Projection: nation.b AS n_name\ | ||
\n Filter: contains(nation.b, Utf8(\"IA\"))\ | ||
\n TableScan: nation projection=[a, b, c, d, e, f]" | ||
"Projection: nation.n_name\ | ||
\n Filter: contains(nation.n_name, Utf8(\"IA\"))\ | ||
\n TableScan: nation projection=[n_nationkey, n_name, n_regionkey, n_comment]" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can see in this change how the DataFusion and Substrait plans had different schemas. |
||
); | ||
Ok(()) | ||
} | ||
|
||
async fn create_context() -> datafusion::common::Result<SessionContext> { | ||
let ctx = SessionContext::new(); | ||
ctx.register_csv("nation", "tests/testdata/data.csv", CsvReadOptions::new()) | ||
.await?; | ||
Ok(ctx) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
#[cfg(test)] | ||
mod tests { | ||
use crate::utils::test::read_json; | ||
use datafusion::arrow::datatypes::{DataType, Field}; | ||
use datafusion::catalog_common::TableReference; | ||
use datafusion::common::{DFSchema, Result}; | ||
use datafusion::datasource::empty::EmptyTable; | ||
use datafusion::prelude::SessionContext; | ||
use datafusion_substrait::logical_plan::consumer::from_substrait_plan; | ||
use std::collections::HashMap; | ||
use std::sync::Arc; | ||
|
||
fn generate_context_with_table( | ||
table_name: &str, | ||
field_data_type_pairs: Vec<(&str, DataType)>, | ||
) -> Result<SessionContext> { | ||
let table_ref = TableReference::bare(table_name); | ||
let fields: Vec<(Option<TableReference>, Arc<Field>)> = field_data_type_pairs | ||
.into_iter() | ||
.map(|pair| { | ||
let (field_name, data_type) = pair; | ||
( | ||
Some(table_ref.clone()), | ||
Arc::new(Field::new(field_name, data_type, false)), | ||
) | ||
}) | ||
.collect(); | ||
|
||
let df_schema = DFSchema::new_with_metadata(fields, HashMap::default())?; | ||
|
||
let ctx = SessionContext::new(); | ||
ctx.register_table( | ||
table_ref, | ||
Arc::new(EmptyTable::new(df_schema.inner().clone())), | ||
)?; | ||
Ok(ctx) | ||
} | ||
|
||
#[tokio::test] | ||
async fn substrait_schema_validation_ignores_field_name_case() -> Result<()> { | ||
let proto_plan = | ||
read_json("tests/testdata/test_plans/simple_select.substrait.json"); | ||
|
||
let ctx = generate_context_with_table("DATA", vec![("a", DataType::Int32)])?; | ||
from_substrait_plan(&ctx, &proto_plan).await?; | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn reject_plans_with_mismatched_number_of_fields() -> Result<()> { | ||
let proto_plan = | ||
read_json("tests/testdata/test_plans/simple_select.substrait.json"); | ||
|
||
let ctx = generate_context_with_table( | ||
"DATA", | ||
vec![("a", DataType::Int32), ("b", DataType::Int32)], | ||
)?; | ||
let res = from_substrait_plan(&ctx, &proto_plan).await; | ||
assert!(res.is_err()); | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn reject_plans_with_mismatched_field_names() -> Result<()> { | ||
let proto_plan = | ||
read_json("tests/testdata/test_plans/simple_select.substrait.json"); | ||
|
||
let ctx = generate_context_with_table("DATA", vec![("b", DataType::Date32)])?; | ||
let res = from_substrait_plan(&ctx, &proto_plan).await; | ||
assert!(res.is_err()); | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn reject_plans_with_incompatible_field_types() -> Result<()> { | ||
let proto_plan = | ||
read_json("tests/testdata/test_plans/simple_select.substrait.json"); | ||
|
||
let ctx = generate_context_with_table("DATA", vec![("a", DataType::Date32)])?; | ||
let res = from_substrait_plan(&ctx, &proto_plan).await; | ||
assert!(res.is_err()); | ||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,3 +17,4 @@ | |
|
||
/// Run all tests that are found in the `cases` directory | ||
mod cases; | ||
mod utils; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started wondering if the
ensure_schema_compatability
can now conflict withextract_projection
- and I think it can, either by failing if DF doesn't optimize the select into a projection, or if DF does, then by overriding the select's projection with the Substrait projection...I guess a fix would be something like in
extract_projection
, if there is an existingscan.projection
, then applycolumnIndices
on it firstThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That did indeed cause problems. It triggered an error of
unexpected plan for table
inextract_projection
.I added some code for this case in
d571eb2
(#12245). Is something like this what you had in mind?I am noticing that the plans generated look a little weird/bad with a lot of redundant projects
but they are at least correct for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I had in mind was manipulating the
scan.projection
directly - kinda like it is alreadydone in extract_projection, we could do it that way also forensure_schema_compatibility
. That way there wouldn't be additional Projections, and maybe it'd be a bit more efficient if the current setup doesn't push the column-pruning into the scan level (though I'm a bit surprised they don't get optimized anyways).But I don't think it's necessary - the way you've done it here seems correct, and we (I?) can do the project-mangling as a followup, unless you want to take a stab at it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think project unmangling would be better as a follow-up. Possible as part of #12347 because supporting remaps is going to add yet another layer of Projects 😅