Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read schema from parquet files in datafusion scans #1266

Merged
merged 7 commits into from
Apr 14, 2023

Conversation

roeap
Copy link
Collaborator

@roeap roeap commented Apr 8, 2023

Description

This PR updates table scans with datafusion to read the file schema from the parquet file within the latest add action of the table. This is to work around some issues, where the schema we derive from metadata does not match the data in the parquet files - e.g. nanosecond timestamps vs. micorsoecond.

We also update the Load command to handle column selections and make it more consistent with the other operations.

Related Issue(s)

closes #441

Documentation

@github-actions github-actions bot added binding/rust Issues for the Rust crate rust labels Apr 8, 2023
@roeap roeap changed the title [WIP] feat: read schema from parquet files in datafusion scans feat: read schema from parquet files in datafusion scans Apr 8, 2023
@roeap roeap marked this pull request as ready for review April 8, 2023 10:22
rust/src/operations/transaction/state.rs Outdated Show resolved Hide resolved
///
/// This will construct a schema derived from the parqet schema of the latest data file,
/// and fields for partition columns from the schema defined in table meta data.
pub async fn physical_arrow_schema(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can guarantee this schema is consistent across all parquet files in the table; different writers may have written to the table with different physical types for timestamps. IMO this should be handled in the scan of each Parquet file. That is, we should cast the physical type to microsecond timestamps as needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In PyArrow, we handle the int96 timestamp issue by passing an argument to the reader to coerce it to microsecond precision. Maybe we could implement something similar upstream?

parquet_read_options=ParquetReadOptions(coerce_int96_timestamp_unit="ms")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There definitely are no guarantees as to the file schema being consistent. Datafusion however needs a consistent schema. Once we get into column mappings etc, things might get even more demanding and we may have to roll our own parquet scan, or rather start putting logic into our DeltaScan.

That said, I do believe using the schema from the latest file is an improvement over the current way, which at least for me fails for more or less every databricks written table where there are timestamps involved.

Not sure about the best way forward, but I'm happy to keep that logic on a private branch somewhere until we have a more general fix.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somewhat related, so I already published it as WIP - in #1267 I did some work on the write command. There my plan was to use the same schema to validates writes. But there it would be even more confusing, since we might end up on situation, where writing the "official" schema of the chart would not be permissible. But somehow it feels very strange to me to have potentially many schemas in the same table.

i guess spark must allow at least some flexibility in what schema it expects at write time, otherwise how would we end up in this discussion at all :D.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we are definitely hitting the limits of DataFusion's scanner. I've created an issue upstream apache/datafusion#5950

I'm fine with moving this forward; I mostly care that we have a more robust implementation in the future and have at least some momentum towards it.

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished looking through. Just one other comment.

.files()
.iter()
.max_by_key(|obj| obj.modification_time)
.ok_or(DeltaTableError::Generic("No active file actions to get physical schema. Maybe the current state has not yet been loaded?".into()))?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this error propagate to the user? Does this mean trying to scan an empty tables leads to an error? I don't think it should.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does. at the time I though we could fail on a scan if no files have been added yet, but you are right there are several valid scenarios where we have no files in a table and still should be able to do a scan.

Fixed it so we a re falling back to the schema from metadata.

@wjones127 wjones127 merged commit 362a94e into delta-io:main Apr 14, 2023
@roeap roeap deleted the load-schema branch April 14, 2023 04:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate rust
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Datafusion table provider: issues with timestamp types
2 participants