-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-2229: [C++][Python] Add WriteCsv functionality. #9504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@jorisvandenbossche @pitrou would one or both of you mind reviewing? It appears CSV is part of the minimal build but compute (casts are not). I was thinking of doing an ifdef and raise not implemented in that case. Does that seem like a reasonable approach? |
pitrou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice! The approach is definitely interesting. You'll find some comments below.
python/pyarrow/_csv.pyx
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... which table memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy and paste bug.
python/pyarrow/_csv.pyx
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather make the API more consistent and have the user pass a WriteOptions instance here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
cpp/src/arrow/record_batch.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this comment: where is the optimization which just returns the original RecordBatch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left over comment from when I was going to down the rabbit whole. I've moved all this code into the writer.cc to avoid debating the best way to expose this publicly.
cpp/src/arrow/record_batch.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... this means the iterator will fail if the user doesn't keep the original batch alive. Do we really gain anything by not taking a shared_ptr here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it would. The tricky part is we can't get a shared_ptr to record batch if this is a member method. (unless we add enable_shared_from_this).
To avoid any contention here for now I've moved this to be an implementation detail.
cpp/src/arrow/record_batch.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be more consistent with the rest of the RecordBatch API to use RecordBatchReader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was guessing iterators were now preferred? I haven't been keeping up. But as noted above the point is moot now.
cpp/src/arrow/csv/writer.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but I think we spell "CSV" everywhere currently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replacing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaced.
cpp/src/arrow/csv/writer.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting approach. It may also allow parallelizing conversion if we want to go that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep :)
cpp/src/arrow/csv/writer.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 is because of quoting and delimiters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, replaced with a constexpr
cpp/src/arrow/csv/writer.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DCHECK that row_positions_.back() corresponds to the end of the buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(even better would be to DCHECK that each row position is consistent with the corresponding pre-computed row length)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each row length positions is no longer possible I mutate offsets.
cpp/src/arrow/csv/options.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intuitively, this seems a bit low, but we can tune it later.
|
@pitrou thank you for all the comment still working through addressing them (I'll ping you again for review when it is ready). Could you chime in if using you think the ifdef approach mentioned above to return not implemented when compute isn't available seems reasonable to you? |
|
Sorry, I hadn't seen that question. Yes, I think that raising In the future, we may want to always enable |
|
Cool! I don't have time right now to give it a more detailed review, but I quickly fetched the branch, and even for a not yet optimized first version, this is already much faster as the pure python pandas Few things I noticed / random thoughts:
|
Quoting by default ensures that we can distinguish missing values and empty strings, I suppose? |
python/pyarrow/_csv.pyx
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the parquet and feather api, it's first the data and then the file name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reversed. thanks.
cpp/src/arrow/csv/writer.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another easy optimization is to move the writes to a dedicated background thread. I'll add this to my list of "to asyncerize" which will include this optimization by necessity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, agreed. My initial use-case for this will be to a bufferoutputstream so asynchronous isn't important for that at least. This actually raises a question on whether detecting blocking vs non-blocking IO sources is important for threading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know that a buffered output stream solves the problem I'm worried about. It will help mitigate the cost of many small writes by grouping them but the large writes still take time. So if the CSV is large enough to span multiple buffers you still have to block here occasionally. I may be misunderstanding what you mean by bufferedoutputstream though.
The detecting question is a good one. I think it boils down like so...
Currently, we assume all streams are blocking (buffered output stream is still a blocking stream, it just blocks less). This means that anytime we do I/O from an async context we need to "background" it by creating a dedicated thread to do the I/O (very soon I hope this will switch to "borrowing a dedicated thread from the I/O pool")
At some point, as more underlying filesystems are non-blocking, we can reverse the assumption and assume that all underlying streams are non-blocking. Any remaining blocking streams will then be responsible for the "background"ing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know that a buffered output stream solves the problem I'm worried about. It will help mitigate the cost of many small writes by grouping them but the large writes still take time. So if the CSV is large enough to span multiple buffers you still have to block here occasionally. I may be misunderstanding what you mean by bufferedoutputstream though.
BufferOutputStream is an in memory output stream that writes to a resizable buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why we're tallking about doing writes in a separate thread. Writes are typically asynchronous, so the only cost is a memory copy (and perhaps a system call).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, yes, not a problem at all for BufferOutputStream. That will teach me to read more closely.
@pitrou You are correct. Simply adding a background readahead style thread will just introduce a second cache in addition to the OS' existing cache (or whatever dirty page cache is in our s3 filesystem) and that doesn't help anything. I still think we will eventually want to make writes properly async (futures). In the event we are in a huge write or writing to a slow (S3) source we still might block, although the gain there is not performance of this CSV write (the one blocked by I/O) as it is keeping the CPU threads clear in case other tasks are going on at the same time.
Yes, although, we could use this convention for only nulls. Always quoting also helps maintain data types. For instance all number in a string column would be maintained round-trip. |
Sorry I missed this. The approach I went with is to not include write header in api.h or try to compile if compute isn't on. |
|
@jorisvandenbossche thank you for trying it out. In regards to:
I think this would be nice, or at least configurable, but I would like to make this outside the scope of this PR (this probably belongs as a feature on cast?) |
|
@pitrou I think this is ready for another review when you have time. |
|
Can you rebase from master to fix Windows builds? |
|
@pitrou rebased. |
pitrou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Here are a couple more comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The preferred way would to be let CMake generate it: https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/config.h.cmake#L37
(also, if you just pass the option here, it might not be taken up by third-party applications including Arrow)
cpp/src/arrow/csv/writer.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing iteratively requires to be careful with the options (you don't want to append a new header for each subsequent batch).
As you prefer, though. We can convert this later.
python/pyarrow/_csv.pyx
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This docstring misses a description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
python/pyarrow/_csv.pyx
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the existing PyArrow code, we generally don't mention anything when the function doesn't return a useful value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
python/pyarrow/_csv.pyx
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems get_writer should do this for string and path-like inputs already. Is there a reason you had to write this try/except/else switch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I copied this from someplace.
python/pyarrow/_csv.pyx
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pyarrow_unwrap_batch is generally preferred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. changed.
python/pyarrow/_csv.pyx
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pyarrow_unwrap_table also
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. changed.
python/pyarrow/_csv.pyx
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should raise a proper TypeError, e.g.:
raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
python/pyarrow/includes/libarrow.pxd
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need this with the config.h.cmake approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
cpp/src/arrow/csv/writer_test.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks.
|
@pitrou I think I addressed all your comments. Not sure what is going on with the R CI builds? |
|
@emkornfield It looks like Arrow C++ failed compiling on those builds: It seems |
trying using uint8_t instead. (I'm also open to maybe removing the allocator in this case?) |
|
Note that |
yes, I agree this isn't great but it should be pretty small (at least 1024 bytes bytes by default. The other option would be to not use the custom allocator. Do you have a preference? |
|
I'd say just use the default allocator. |
Done. |
This offers possibly performance naive CSV writer with limited options to keep the initial PR down. Obvious potential improvements to this approach are: - Smarter casts for dictionaries - Arena allocation for intermediate cast results The implementation also means that for all primitive type support we might have to fill in gaps in our cast function.
Add Python properties and test them
pitrou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will merge. I still think the C++ API should be class-based, like other readers and writers.
|
I'll do a follow-up pr to expose a class/object for writing |
This offers possibly performance naive CSV writer with
limited options to keep the initial PR down.
Obvious potential improvements to this approach
are:
The implementation also means that for all primitive type
support we might have to fill in gaps in our cast function.