Add custom serialization support for pyarrow #2115
Conversation
|
Tests incoming - just wanted to get this up here for consideration in the meantime |
To be explicit, the objective would be to get something like a |
|
So if I were to do this for a pandas dataframe I would probably pull the |
5bac442 to
6b6af6c
Compare
97ba62a to
be5ce71
Compare
|
So the solution where you scatter explicitly should work fine, but when working with coroutines you'll need to yield the scatter result. This shouldn't be an issue if you use the normal API. The rest of the tests here are related to #2110 . It's also worth noting that this shouldn't be an issue for worker-to-worker transfers. So if your tasks generate and then pass around arrow objects then things will also be fine with this change. This is only an issue when you push data into Dask. |
|
In future, we will support pickling these objects. The return values of |
d496e2a to
911f71a
Compare
|
I think this is good to go now - all the tests, including the scatter pass ro me locally: The other Until If I've implemented it correctly I'd hope that it might be more efficient than pickling, at least until PEP 574 is passed and apache/arrow#2161 is merged. |
|
If you don't mind giving me a chance to review I'll have a closer look tomorrow |
wesm
left a comment
There was a problem hiding this comment.
LGTM aside from questions around sending pyarrow.Buffer directly
distributed/protocol/arrow.py
Outdated
| writer.close() | ||
| buf = sink.get_result() | ||
| header = {} | ||
| frames = [buf.to_pybytes()] |
There was a problem hiding this comment.
This causes an extra memory copy. Can frames contain objects exporting the buffer protocol?
There was a problem hiding this comment.
That's a good question. It looks like it can with small modification (pushed). Presumably this means that PyArrow also works with sockets and Tornado IOStreams. Hooray for consistent use of protocols.
| def deserialize_batch(header, frames): | ||
| import pyarrow as pa | ||
| blob = frames[0] | ||
| reader = pa.RecordBatchStreamReader(pa.BufferReader(blob)) |
There was a problem hiding this comment.
I opened ARROW-2859 to see if we can get rid of this pa.BufferReader detail
distributed/protocol/arrow.py
Outdated
| writer.close() | ||
| buf = sink.get_result() | ||
| header = {} | ||
| frames = [buf.to_pybytes()] |
|
Awesome. Is there something we can do to make the Python API for Buffer more conforming? |
| sink = pa.BufferOutputStream() | ||
| writer = pa.RecordBatchStreamWriter(sink, batch.schema) | ||
| writer.write_batch(batch) | ||
| writer.close() |
There was a problem hiding this comment.
One improvement on the arrow side would be if RecordBatchStreamWriter was a context manager as that would avoid the need for an explicit close.
|
Merging this in a few hours if there are no further comments. |
No, this was due to ugliness on the Dask side. |
|
Thanks @dhirschfeld ! Merged. |

No description provided.