-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-9782: [C++][Dataset] More configurable Dataset writing #8305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@bkietz this removes the ability to specify format specific options? (or it's still WIP?) |
|
@jorisvandenbossche yes, not ready for review yet. I will repair format specific write options as part of this PR |
|
Okido, will wait a bit more then ;-) |
b090f12 to
b47d07d
Compare
cpp/src/arrow/dataset/file_base.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we know all fragments (and their expressions) already, can we avoid all the locking multi-threading in WriterSet (IIRC, you need them to create the writer once)? That would heavily simplify all of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this context fragments are the object of writing rather than the target (so for example one might represent an in-memory table which is being copied to disk). Writers are not known ahead of time since they depend on the partitioning which depends on the set of unique values in a given column, which we discover only after running GroupBy on an input batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do two scans of the input data:
- Assemble a list of all unique values in the partition columns of the data, from which we can determine the precise set of writers to open
- Apply groupings to batches, passing the results to pre-opened writers
This doesn't seem worthwhile to me; scanning the input is potentially expensive so we should avoid doing it twice. Furthermore we'll still need to coordinate between threads since two input batches might still contain rows bound for a single output writer.
python/pyarrow/dataset.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we provide a default template here?
Can eg the format object have a property with the default name to use? (or get the extension from there and use that in a default?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorisvandenbossche Added, PTAL
ef5fc61 to
b79a95c
Compare
nealrichardson
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 from me, thanks for doing this!
cpp/src/arrow/util/mutex.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect a Lock() method as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to continue acquiring new locks exclusively through Mutex::Lock; there's no loss of generality and it keeps Guard as simple as possible
370e2a0 to
bc3b106
Compare
|
Is this done, or what is left? |
|
@pitrou are you planning to review C++ again? |
|
The C++ changes addressed my comments. It would be nice though if @fsaintjacques could take a look. |
jorisvandenbossche
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a pass over the python code
python/pyarrow/_dataset.pyx
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this change behaviour? It seems you are now creating a single fragment instead of a vector of fragments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileSystemDataset::Write now parallelizes across scan tasks rather than fragments so there will be no difference in performance/written files even if we create a single in-memory fragment, so I changed this to create a single fragment since it's simpler
ef952ef to
20cf19f
Compare
5cb797e to
5602aa8
Compare
|
Merging |
|
|
||
| target = tempdir / 'single-directory-target' | ||
| expected_files = [target / "dat_0.ipc", target / "dat_1.ipc"] | ||
| expected_files = [target / "part-0.feather"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this change to a single file? (the original has 2 files, I expect the roundtrip to preserve those files)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this patch, a single file will be written for each partition directory. In a follow up we'll add an optional cap on file size
| # check that all files are the same in both cases | ||
| paths1 = [p.relative_to(target1) for p in target1.rglob("*")] | ||
| paths2 = [p.relative_to(target2) for p in target2.rglob("*")] | ||
| assert set(paths1) == set(paths2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this removed? (does it no longer hold?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It no longer holds consistently; the auto incremented {i} doesn't necessarily round trip
There might be no difference, but I think the user should still be able to control how many files are created. Because now whathever you pass, it's always consolidated into a single file? (or one file per partitioning) Also, it seems that reading/writing a dataset does not preserve the files? (so if we discover a dataset with multiple files, we write it as a single file?) |
|
If you're writing with no partitioning then yes, everything will be written to a single file. In a follow up we'll probably add a special case for unpartitioned writing which allocates an output file for each thread just for performance reasons. |
Python:
R:
C++:
Internal C++: