Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
31c65ec
Add stub for abstract base classes
jakirkham May 8, 2020
df37353
Add the `Serializable` abstract base class
jakirkham May 8, 2020
4b085dc
Subclass cuDF objects from `Serializable`
jakirkham May 8, 2020
e37b4a1
Update Dask serialization to use `Serializable`
jakirkham May 8, 2020
42353eb
Import `cudf`
jakirkham May 8, 2020
ad2abb2
Use NumPy arrays for older pickle protocols
jakirkham May 8, 2020
d17e586
Merge rapidsai/branch-0.14 into jakirkham/add_serializable
jakirkham May 9, 2020
9d19df7
Merge rapidsai/branch-0.14 into jakirkham/add_serializable
jakirkham May 19, 2020
ba0d225
Cast to `uint8` during host serialization
jakirkham May 19, 2020
2163072
Note `Serializable` ABC added
jakirkham May 19, 2020
4e1611f
Inherit `MultiIndex` from `Index` only
jakirkham May 19, 2020
430786c
Merge rapidsai/branch-0.14 into jakirkham/add_serializable
jakirkham May 19, 2020
b43ea58
Drop no longer used `import functools`
jakirkham May 19, 2020
7994dc7
Run `isort` on `groupby`
jakirkham May 19, 2020
87d1d5c
Test host/device serialization
jakirkham May 19, 2020
9fea0ad
Test out-of-band pickling as well
jakirkham May 19, 2020
51e9471
Tweak `ndevice` check for host case
jakirkham May 19, 2020
9175e5e
Adjust `to_host` test of `ndevice`
jakirkham May 19, 2020
9391f68
Check for `__cuda_array_interface__` on frames
jakirkham May 19, 2020
3440bd7
Merge rapidsai/branch-0.14 into jakirkham/add_serializable
jakirkham May 19, 2020
3491152
Inherit from `Serializable` after other base class
jakirkham May 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- PR #4881 Support row_number in rolling_window
- PR #5068 Add Java bindings for arctan2
- PR #5132 Support out-of-band buffers in Python pickling
- PR #5139 Add ``Serializable`` ABC for Python
- PR #5149 Add Java bindings for PMOD
- PR #5153 Add Java bindings for extract
- PR #5196 Add Java bindings for NULL_EQUALS, NULL_MAX and NULL_MIN
Expand Down
48 changes: 10 additions & 38 deletions python/cudf/cudf/comm/serialize.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,31 @@
import pickle

import cudf
import cudf.core.groupby.groupby

# all (de-)serializations are attached to cudf Objects:
# Series/DataFrame/Index/Column/Buffer/etc
serializable_classes = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 one from me as well! I especially like moving away from this list to anything which implements Serializable

cudf.CategoricalDtype,
cudf.DataFrame,
cudf.Index,
cudf.MultiIndex,
cudf.Series,
cudf.core.groupby.groupby.GroupBy,
cudf.core.groupby.groupby._Grouping,
cudf.core.column.column.ColumnBase,
cudf.core.buffer.Buffer,
)
import cudf # noqa: F401
from cudf.core.abc import Serializable

try:
from distributed.protocol import dask_deserialize, dask_serialize
from distributed.protocol.cuda import cuda_deserialize, cuda_serialize
from distributed.utils import log_errors

@cuda_serialize.register(serializable_classes)
@cuda_serialize.register(Serializable)
def cuda_serialize_cudf_object(x):
with log_errors():
header, frames = x.serialize()
assert all((type(f) is cudf.core.buffer.Buffer) for f in frames)
header["lengths"] = [f.nbytes for f in frames]
return header, frames
return x.device_serialize()

# all (de-)serializations are attached to cudf Objects:
# Series/DataFrame/Index/Column/Buffer/etc
@dask_serialize.register(serializable_classes)
@dask_serialize.register(Serializable)
def dask_serialize_cudf_object(x):
header, frames = cuda_serialize_cudf_object(x)
with log_errors():
frames = [f.to_host_array().data for f in frames]
return header, frames
return x.host_serialize()

@cuda_deserialize.register(serializable_classes)
@dask_deserialize.register(serializable_classes)
@cuda_deserialize.register(Serializable)
@dask_deserialize.register(Serializable)
def deserialize_cudf_object(header, frames):
with log_errors():
if header["serializer"] == "cuda":
for f in frames:
# some frames are empty -- meta/empty partitions/etc
if len(f) > 0:
assert hasattr(f, "__cuda_array_interface__")
return Serializable.device_deserialize(header, frames)
elif header["serializer"] == "dask":
frames = [memoryview(f) for f in frames]

cudf_typ = pickle.loads(header["type-serialized"])
cudf_obj = cudf_typ.deserialize(header, frames)
return cudf_obj
return Serializable.host_deserialize(header, frames)


except ImportError:
Expand Down
62 changes: 62 additions & 0 deletions python/cudf/cudf/core/abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright (c) 2020, NVIDIA CORPORATION.

import abc
import pickle
from abc import abstractmethod

import numpy

import rmm

import cudf


class Serializable(abc.ABC):
@abstractmethod
def serialize(self):
pass

@classmethod
@abstractmethod
def deserialize(cls, header, frames):
pass

def device_serialize(self):
header, frames = self.serialize()
assert all((type(f) is cudf.core.buffer.Buffer) for f in frames)
header["type-serialized"] = pickle.dumps(type(self))
header["lengths"] = [f.nbytes for f in frames]
return header, frames

@classmethod
def device_deserialize(cls, header, frames):
for f in frames:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't going to be true with pack/unpack serialization which packs into a single host buffer that stores metadata, and a single device buffer that stores data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that makes sense. Happy to change this as we see fit.

The bigger idea here is that when we make these changes we can now do them in one place. So hopefully that makes pack/unpack and other changes in the future easier :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I think then we can leave this as is for now. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added PR ( #5309 ), which should handle a mixture of host and device frames.

# some frames are empty -- meta/empty partitions/etc
if len(f) > 0:
assert hasattr(f, "__cuda_array_interface__")

typ = pickle.loads(header["type-serialized"])
obj = typ.deserialize(header, frames)

return obj

def host_serialize(self):
header, frames = self.device_serialize()
frames = [f.to_host_array().view("u1").data for f in frames]
return header, frames

@classmethod
def host_deserialize(cls, header, frames):
frames = [
rmm.DeviceBuffer.to_device(memoryview(f).cast("B")) for f in frames
]
obj = cls.device_deserialize(header, frames)
return obj

def __reduce_ex__(self, protocol):
header, frames = self.host_serialize()
if protocol >= 5:
frames = [pickle.PickleBuffer(f) for f in frames]
else:
frames = [numpy.asarray(f) for f in frames]
return self.host_deserialize, (header, frames)
10 changes: 3 additions & 7 deletions python/cudf/cudf/core/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import rmm
from rmm import DeviceBuffer, _DevicePointer

from cudf.core.abc import Serializable

class Buffer:

class Buffer(Serializable):
def __init__(self, data=None, size=None, owner=None):
"""
A Buffer represents a device memory allocation.
Expand Down Expand Up @@ -58,12 +60,6 @@ def __init__(self, data=None, size=None, owner=None):
raise TypeError("data must be Buffer, array-like or integer")
self._init_from_array_like(np.asarray(data), owner)

def __reduce_ex__(self, protocol):
data = self.to_host_array()
if protocol >= 5:
data = pickle.PickleBuffer(data)
return self.__class__, (data,)

def __len__(self):
return self.size

Expand Down
17 changes: 2 additions & 15 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from cudf._lib.scalar import as_scalar
from cudf._lib.stream_compaction import unique_count as cpp_unique_count
from cudf._lib.transform import bools_to_mask
from cudf.core.abc import Serializable
from cudf.core.buffer import Buffer
from cudf.core.dtypes import CategoricalDtype
from cudf.utils import cudautils, ioutils, utils
Expand All @@ -38,7 +39,7 @@
from cudf.utils.utils import buffers_from_pyarrow, mask_dtype


class ColumnBase(Column):
class ColumnBase(Column, Serializable):
def __init__(
self,
data,
Expand Down Expand Up @@ -67,20 +68,6 @@ def __init__(
children=children,
)

def __reduce__(self):
return (
build_column,
(
self.data,
self.dtype,
self.mask,
self.size,
0,
self.null_count,
self.children,
),
)

def as_frame(self):
from cudf.core.frame import Frame

Expand Down
6 changes: 2 additions & 4 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from cudf._lib.null_mask import MaskState, create_null_mask
from cudf._lib.nvtx import annotate
from cudf.core import column
from cudf.core.abc import Serializable
from cudf.core.column import as_column, column_empty
from cudf.core.column_accessor import ColumnAccessor
from cudf.core.frame import Frame
Expand Down Expand Up @@ -87,7 +88,7 @@ def _reverse_op(fn):
}[fn]


class DataFrame(Frame):
class DataFrame(Frame, Serializable):
"""
A GPU Dataframe object.

Expand Down Expand Up @@ -1684,9 +1685,6 @@ def __deepcopy__(self, memo={}):
memo = {}
return self.copy(deep=True)

def __reduce__(self):
return (DataFrame, (self._data, self.index))

@annotate("INSERT", color="green", domain="cudf_python")
def insert(self, loc, name, value):
""" Add a column to DataFrame at the index specified by loc.
Expand Down
5 changes: 3 additions & 2 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import cudf
import cudf._lib.groupby as libgroupby
from cudf._lib.nvtx import annotate
from cudf.core.abc import Serializable


class GroupBy(object):
class GroupBy(Serializable):
"""
Group a DataFrame or Series by a set of columns.

Expand Down Expand Up @@ -513,7 +514,7 @@ def __init__(self, key=None, level=None):
self.level = level


class _Grouping(object):
class _Grouping(Serializable):
def __init__(self, obj, by=None, level=None):
self._obj = obj
self._key_columns = []
Expand Down
14 changes: 2 additions & 12 deletions python/cudf/cudf/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import division, print_function

import functools
import pickle

import cupy
Expand All @@ -11,6 +10,7 @@

import cudf
from cudf._lib.nvtx import annotate
from cudf.core.abc import Serializable
from cudf.core.column import (
CategoricalColumn,
ColumnBase,
Expand Down Expand Up @@ -56,7 +56,7 @@ def _to_frame(this_index, index=True, name=None):
)


class Index(Frame):
class Index(Frame, Serializable):
"""The root interface for all Series indexes.
"""

Expand Down Expand Up @@ -657,9 +657,6 @@ def __getitem__(self, index):
def __eq__(self, other):
return super(type(self), self).__eq__(other)

def __reduce__(self):
return (RangeIndex, (self._start, self._stop, self.name))

def equals(self, other):
if self is other:
return True
Expand Down Expand Up @@ -840,13 +837,6 @@ def copy(self, deep=True):
def __sizeof__(self):
return self._values.__sizeof__()

def __reduce__(self):
_maker = functools.partial(
self.__class__, self._values, name=self.name
)

return _maker, ()

def __len__(self):
return len(self._values)

Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import cudf
import cudf._lib as libcudf
from cudf._lib.nvtx import annotate
from cudf.core.abc import Serializable
from cudf.core.column import (
ColumnBase,
DatetimeColumn,
Expand All @@ -35,7 +36,7 @@
)


class Series(Frame):
class Series(Frame, Serializable):
"""
Data and null-masks.

Expand Down
10 changes: 10 additions & 0 deletions python/cudf/cudf/tests/test_pickling.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ def check_serialization(df):
sortvaldf = df.sort_values("vals")
assert isinstance(sortvaldf.index, GenericIndex)
assert_frame_picklable(sortvaldf)
# out-of-band
if pickle.HIGHEST_PROTOCOL >= 5:
buffers = []
serialbytes = pickle.dumps(
df, protocol=5, buffer_callback=buffers.append
)
for b in buffers:
assert isinstance(b, pickle.PickleBuffer)
loaded = pickle.loads(serialbytes, buffers=buffers)
pd.util.testing.assert_frame_equal(loaded.to_pandas(), df.to_pandas())


def assert_frame_picklable(df):
Expand Down
14 changes: 10 additions & 4 deletions python/cudf/cudf/tests/test_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,26 @@
# pd.util.testing.makeMultiIndex, # Indices not serialized on device
],
)
def test_serialize(df):
@pytest.mark.parametrize("to_host", [True, False])
def test_serialize(df, to_host):
""" This should hopefully replace all functions below """
a = df()
if "cudf" not in type(a).__module__:
a = cudf.from_pandas(a)
header, frames = a.serialize()
if to_host:
header, frames = a.host_serialize()
else:
header, frames = a.device_serialize()
msgpack.dumps(header) # ensure that header is msgpack serializable
ndevice = 0
for frame in frames:
if not isinstance(frame, (bytes, memoryview)):
if hasattr(frame, "__cuda_array_interface__"):
ndevice += 1
# Indices etc. will not be DeviceNDArray
# but data should be...
if hasattr(df, "_cols"):
if to_host:
assert ndevice == 0
elif hasattr(df, "_cols"):
assert ndevice >= len(df._data)
else:
assert ndevice > 0
Expand Down