-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Allow for Parquet reading from files with differing schemas #2514
Conversation
ca55235
to
8cee2b3
Compare
4a7bd71
to
c3299f4
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2514 +/- ##
==========================================
- Coverage 63.30% 63.23% -0.08%
==========================================
Files 968 973 +5
Lines 108068 108506 +438
==========================================
+ Hits 68414 68613 +199
- Misses 39654 39893 +239
|
field: String, | ||
available_fields: Vec<String>, | ||
path: String, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this because our Parquet readers will now not complain if a requested field is not found.
Instead, the readers will return a best-effort Table, with all the columns it can find from the ones requested.
let schema: SchemaRef = schema.into(); | ||
if schema.fields.len() != columns.len() { | ||
return Err(DaftError::SchemaMismatch(format!("While building a Table, we found that the number of fields did not match between the schema and the input columns.\n {:?}\n vs\n {:?}", schema.fields.len(), columns.len()))); | ||
} | ||
let mut num_rows = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We used to default to tables with num_rows=1
, but this led to really odd behavior. Now the caller will explicitly pass in num_rows
, which makes a little more sense because the caller usually has more context (e.g. callers may know that even though no columns were read, the Parquet rowgroup(s) it read had 1,000 rows).
// We discard the original self.len() because we expect aggregations to change | ||
// the final cardinality. Aggregations on empty tables are expected to produce unit length results. | ||
(true, _) => result_series.iter().map(|s| s.len()).max().unwrap(), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for num_rows here was tricky to get correct. Reviewers should pay a little more attention to this block of code.
It was a little difficult to get the "correct" logic for how many rows would result from a call to eval_expression_list, which can contain cardinality-modifying expressions such as UDFs and aggregations.
) Enables the `dataframe/test_creation.py` and `io/test_parquet.py` test suite for the native executor. Changes: - Add `PythonStorageConfig` reading functionality (just copying the existing logic in `materialize_scan_task`) - Enable streaming parquet reads to read files with differing schemas: See: #2514 --------- Co-authored-by: Colin Ho <[email protected]> Co-authored-by: Colin Ho <[email protected]>
…entual-Inc#2672) Enables the `dataframe/test_creation.py` and `io/test_parquet.py` test suite for the native executor. Changes: - Add `PythonStorageConfig` reading functionality (just copying the existing logic in `materialize_scan_task`) - Enable streaming parquet reads to read files with differing schemas: See: Eventual-Inc#2514 --------- Co-authored-by: Colin Ho <[email protected]> Co-authored-by: Colin Ho <[email protected]>
Fixes to allow for reading Parquet files and specifying columns that do not exist in the Parquet file.
This is common when we try to "apply" a schema from some external source (e.g. a data catalog). In that case, old Parquet files may not have certain columns because the schema evolved over time. We want to make sure that reads still succeed on these files, and we get back tables with the appropriate number of rows (even if no columns were read!)
Summary of Changes
Fixes to Parquet reader
column
names that may not exist in the file. These columns will just be missing from the returnedTable
. Note that this potentially means we get emptyTables
with valid number of rows.Fixes on
Table
Table
to allow empty (num_rows=0) tables that still have columns of data. This lets us do reads of files without producing any data at all (e.g. reading onlycol("x")
from a file that doesn't havecol("x")
).FieldNotFound
errors. We no longer complain about a user-provided field not found, and will just returnTable
structs without the requested columns insteadTable::new
to not default to num_rows=1. Instead, we pass in num_rows explicitly and check against that.Table::from_columns
to receive an explicit num_rows as well. This cleans up a host of bugs where we might be naively creating tables with the wrong length when a table has no columns.