-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Write operation #29
Write operation #29
Conversation
Codecov Report
❗ Your organization is not using the GitHub App Integration. As a result you may experience degraded service beginning May 15th. Please install the Github App Integration for your organization. Read more. @@ Coverage Diff @@
## main #29 +/- ##
===========================================
- Coverage 92.85% 73.28% -19.57%
===========================================
Files 2 4 +2
Lines 112 307 +195
===========================================
+ Hits 104 225 +121
- Misses 8 82 +74
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test coverage is obviously still very bad but I would love to see this getting in soon before we cover all cases
@rjzamora I assume you built the dask vanilla-parquet writer. This may interest you as well
# TODO: This is applying a potentially stricted schema control than what | ||
# Delta requires but if this passes, it should be good to go | ||
schema = validate_compatible(schemas) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took this code from https://github.com/data-engineering-collective/plateau where I've built a similar thing before. The challenge here is to get a schema from all the partitions that represents the transaction the best. this is particularly a struggle for partitions will null columns because pyarrow then does not give a proper schema. This method effectively looks at all written schemas, deduplicates it and merges them into a super-schema, i.e. it fills null columns with a proper type.
It also raises if there are incompatible schemas detected. Incompatible in this sense means, for example that the same column has an integer and a float in two different partitions. This may be stricter than what delta requires and a lot of this is already covered by dask but I used this regardless since we need the scheme deduplication.
I'm very open to throwing this out again down the road but this gets us started quickly since this is battle-proven code.
def test_roundtrip(tmpdir, with_index): | ||
dtypes = { | ||
"str": object, | ||
# FIXME: Categorical data does not work |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of problems around dtypes. We can open dedicated issues for this once the PR is merged
raise NotImplementedError() | ||
|
||
written = df.map_partitions( | ||
_write_partition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we use write_deltalake
from deltatable
for writing a partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to separate the writing of the parquet files from the commit to the log. The entire to_deltalake
is supposed to be one transaction. Using the deltatable.write_deltalake
would create a commit per partition. This is not only not what a transaction is supposed to be but this would almost guarantee that we'd have write conflicts due to concurrent writes.
dask_deltatable/write.py
Outdated
raise DeltaProtocolError( | ||
"This table's min_writer_version is " | ||
f"{table.protocol().min_writer_version}, " | ||
"but this method only supports version 2." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"but this method only supports version 2." | |
f"but this method only supports version {MAX_SUPPORTED_WRITER_VERSION}." |
dask_deltatable/write.py
Outdated
partitioning = None | ||
if mode == "overwrite": | ||
# FIXME: There are a couple of checks that are not migrated yet | ||
raise NotImplementedError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise NotImplementedError() | |
raise NotImplementedError("mode='overwrite' is not implemented") |
dask_deltatable/write.py
Outdated
format="parquet", | ||
partitioning=partitioning, | ||
# It will not accept a schema if using a RBR | ||
schema=schema if not isinstance(data, RecordBatchReader) else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would data
be a RecordBatchReader
if you just created it as pa.Table.from_pandas
on line 218?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Currently, there are a couple of cases that are not covered by this PR. I would either open issues to fix those cases later, or add tests that are xfailed, so they serve as a reminder.
Closes #4
It effectively translates/copied most of https://github.com/delta-io/delta-rs/blob/e5dd8e2167b94e6856aa531d878584397d5bea69/python/deltalake/writer.py#L142-L342 to dask. I omitted the overwrite path for now mostly for simplicities sake. The only genuine addition here is how this would have to be wired together with dask (the HLG / map_partition / Scalar foo)
There is a lot of overlap and I have to hook into some internal flagged APIs of delta-rs . This can be hopefully cleaned up eventually (cc @MrPowers)