diff --git a/CHANGELOG.md b/CHANGELOG.md index e5cd78dd56e..f318734dada 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - PR #4881 Support row_number in rolling_window - PR #5068 Add Java bindings for arctan2 - PR #5132 Support out-of-band buffers in Python pickling +- PR #5139 Add ``Serializable`` ABC for Python - PR #5149 Add Java bindings for PMOD - PR #5153 Add Java bindings for extract - PR #5196 Add Java bindings for NULL_EQUALS, NULL_MAX and NULL_MIN diff --git a/python/cudf/cudf/comm/serialize.py b/python/cudf/cudf/comm/serialize.py index 9235a83f1ab..a9e7fcae95c 100644 --- a/python/cudf/cudf/comm/serialize.py +++ b/python/cudf/cudf/comm/serialize.py @@ -1,59 +1,31 @@ -import pickle - -import cudf -import cudf.core.groupby.groupby - -# all (de-)serializations are attached to cudf Objects: -# Series/DataFrame/Index/Column/Buffer/etc -serializable_classes = ( - cudf.CategoricalDtype, - cudf.DataFrame, - cudf.Index, - cudf.MultiIndex, - cudf.Series, - cudf.core.groupby.groupby.GroupBy, - cudf.core.groupby.groupby._Grouping, - cudf.core.column.column.ColumnBase, - cudf.core.buffer.Buffer, -) +import cudf # noqa: F401 +from cudf.core.abc import Serializable try: from distributed.protocol import dask_deserialize, dask_serialize from distributed.protocol.cuda import cuda_deserialize, cuda_serialize from distributed.utils import log_errors - @cuda_serialize.register(serializable_classes) + @cuda_serialize.register(Serializable) def cuda_serialize_cudf_object(x): with log_errors(): - header, frames = x.serialize() - assert all((type(f) is cudf.core.buffer.Buffer) for f in frames) - header["lengths"] = [f.nbytes for f in frames] - return header, frames + return x.device_serialize() # all (de-)serializations are attached to cudf Objects: # Series/DataFrame/Index/Column/Buffer/etc - @dask_serialize.register(serializable_classes) + @dask_serialize.register(Serializable) def dask_serialize_cudf_object(x): - header, frames = cuda_serialize_cudf_object(x) with log_errors(): - frames = [f.to_host_array().data for f in frames] - return header, frames + return x.host_serialize() - @cuda_deserialize.register(serializable_classes) - @dask_deserialize.register(serializable_classes) + @cuda_deserialize.register(Serializable) + @dask_deserialize.register(Serializable) def deserialize_cudf_object(header, frames): with log_errors(): if header["serializer"] == "cuda": - for f in frames: - # some frames are empty -- meta/empty partitions/etc - if len(f) > 0: - assert hasattr(f, "__cuda_array_interface__") + return Serializable.device_deserialize(header, frames) elif header["serializer"] == "dask": - frames = [memoryview(f) for f in frames] - - cudf_typ = pickle.loads(header["type-serialized"]) - cudf_obj = cudf_typ.deserialize(header, frames) - return cudf_obj + return Serializable.host_deserialize(header, frames) except ImportError: diff --git a/python/cudf/cudf/core/abc.py b/python/cudf/cudf/core/abc.py new file mode 100644 index 00000000000..1da4104f475 --- /dev/null +++ b/python/cudf/cudf/core/abc.py @@ -0,0 +1,62 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. + +import abc +import pickle +from abc import abstractmethod + +import numpy + +import rmm + +import cudf + + +class Serializable(abc.ABC): + @abstractmethod + def serialize(self): + pass + + @classmethod + @abstractmethod + def deserialize(cls, header, frames): + pass + + def device_serialize(self): + header, frames = self.serialize() + assert all((type(f) is cudf.core.buffer.Buffer) for f in frames) + header["type-serialized"] = pickle.dumps(type(self)) + header["lengths"] = [f.nbytes for f in frames] + return header, frames + + @classmethod + def device_deserialize(cls, header, frames): + for f in frames: + # some frames are empty -- meta/empty partitions/etc + if len(f) > 0: + assert hasattr(f, "__cuda_array_interface__") + + typ = pickle.loads(header["type-serialized"]) + obj = typ.deserialize(header, frames) + + return obj + + def host_serialize(self): + header, frames = self.device_serialize() + frames = [f.to_host_array().view("u1").data for f in frames] + return header, frames + + @classmethod + def host_deserialize(cls, header, frames): + frames = [ + rmm.DeviceBuffer.to_device(memoryview(f).cast("B")) for f in frames + ] + obj = cls.device_deserialize(header, frames) + return obj + + def __reduce_ex__(self, protocol): + header, frames = self.host_serialize() + if protocol >= 5: + frames = [pickle.PickleBuffer(f) for f in frames] + else: + frames = [numpy.asarray(f) for f in frames] + return self.host_deserialize, (header, frames) diff --git a/python/cudf/cudf/core/buffer.py b/python/cudf/cudf/core/buffer.py index 5b06148430e..3f08e5cacef 100644 --- a/python/cudf/cudf/core/buffer.py +++ b/python/cudf/cudf/core/buffer.py @@ -7,8 +7,10 @@ import rmm from rmm import DeviceBuffer, _DevicePointer +from cudf.core.abc import Serializable -class Buffer: + +class Buffer(Serializable): def __init__(self, data=None, size=None, owner=None): """ A Buffer represents a device memory allocation. @@ -58,12 +60,6 @@ def __init__(self, data=None, size=None, owner=None): raise TypeError("data must be Buffer, array-like or integer") self._init_from_array_like(np.asarray(data), owner) - def __reduce_ex__(self, protocol): - data = self.to_host_array() - if protocol >= 5: - data = pickle.PickleBuffer(data) - return self.__class__, (data,) - def __len__(self): return self.size diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 378a87c8b8e..72973594f28 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -24,6 +24,7 @@ from cudf._lib.scalar import as_scalar from cudf._lib.stream_compaction import unique_count as cpp_unique_count from cudf._lib.transform import bools_to_mask +from cudf.core.abc import Serializable from cudf.core.buffer import Buffer from cudf.core.dtypes import CategoricalDtype from cudf.utils import cudautils, ioutils, utils @@ -38,7 +39,7 @@ from cudf.utils.utils import buffers_from_pyarrow, mask_dtype -class ColumnBase(Column): +class ColumnBase(Column, Serializable): def __init__( self, data, @@ -67,20 +68,6 @@ def __init__( children=children, ) - def __reduce__(self): - return ( - build_column, - ( - self.data, - self.dtype, - self.mask, - self.size, - 0, - self.null_count, - self.children, - ), - ) - def as_frame(self): from cudf.core.frame import Frame diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index aad6ad7f7cf..4d7b9525d85 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -25,6 +25,7 @@ from cudf._lib.null_mask import MaskState, create_null_mask from cudf._lib.nvtx import annotate from cudf.core import column +from cudf.core.abc import Serializable from cudf.core.column import as_column, column_empty from cudf.core.column_accessor import ColumnAccessor from cudf.core.frame import Frame @@ -87,7 +88,7 @@ def _reverse_op(fn): }[fn] -class DataFrame(Frame): +class DataFrame(Frame, Serializable): """ A GPU Dataframe object. @@ -1684,9 +1685,6 @@ def __deepcopy__(self, memo={}): memo = {} return self.copy(deep=True) - def __reduce__(self): - return (DataFrame, (self._data, self.index)) - @annotate("INSERT", color="green", domain="cudf_python") def insert(self, loc, name, value): """ Add a column to DataFrame at the index specified by loc. diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index bbad3342a2e..f5e8977118a 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -9,9 +9,10 @@ import cudf import cudf._lib.groupby as libgroupby from cudf._lib.nvtx import annotate +from cudf.core.abc import Serializable -class GroupBy(object): +class GroupBy(Serializable): """ Group a DataFrame or Series by a set of columns. @@ -513,7 +514,7 @@ def __init__(self, key=None, level=None): self.level = level -class _Grouping(object): +class _Grouping(Serializable): def __init__(self, obj, by=None, level=None): self._obj = obj self._key_columns = [] diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 80a11cfd8cb..17e1e86a0da 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -2,7 +2,6 @@ from __future__ import division, print_function -import functools import pickle import cupy @@ -11,6 +10,7 @@ import cudf from cudf._lib.nvtx import annotate +from cudf.core.abc import Serializable from cudf.core.column import ( CategoricalColumn, ColumnBase, @@ -56,7 +56,7 @@ def _to_frame(this_index, index=True, name=None): ) -class Index(Frame): +class Index(Frame, Serializable): """The root interface for all Series indexes. """ @@ -657,9 +657,6 @@ def __getitem__(self, index): def __eq__(self, other): return super(type(self), self).__eq__(other) - def __reduce__(self): - return (RangeIndex, (self._start, self._stop, self.name)) - def equals(self, other): if self is other: return True @@ -840,13 +837,6 @@ def copy(self, deep=True): def __sizeof__(self): return self._values.__sizeof__() - def __reduce__(self): - _maker = functools.partial( - self.__class__, self._values, name=self.name - ) - - return _maker, () - def __len__(self): return len(self._values) diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index 4f082eaaf40..ae2c848c674 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -11,6 +11,7 @@ import cudf import cudf._lib as libcudf from cudf._lib.nvtx import annotate +from cudf.core.abc import Serializable from cudf.core.column import ( ColumnBase, DatetimeColumn, @@ -35,7 +36,7 @@ ) -class Series(Frame): +class Series(Frame, Serializable): """ Data and null-masks. diff --git a/python/cudf/cudf/tests/test_pickling.py b/python/cudf/cudf/tests/test_pickling.py index a00ef67ef5e..a639e81715a 100644 --- a/python/cudf/cudf/tests/test_pickling.py +++ b/python/cudf/cudf/tests/test_pickling.py @@ -23,6 +23,16 @@ def check_serialization(df): sortvaldf = df.sort_values("vals") assert isinstance(sortvaldf.index, GenericIndex) assert_frame_picklable(sortvaldf) + # out-of-band + if pickle.HIGHEST_PROTOCOL >= 5: + buffers = [] + serialbytes = pickle.dumps( + df, protocol=5, buffer_callback=buffers.append + ) + for b in buffers: + assert isinstance(b, pickle.PickleBuffer) + loaded = pickle.loads(serialbytes, buffers=buffers) + pd.util.testing.assert_frame_equal(loaded.to_pandas(), df.to_pandas()) def assert_frame_picklable(df): diff --git a/python/cudf/cudf/tests/test_serialize.py b/python/cudf/cudf/tests/test_serialize.py index 3f0c605ac68..1413ebd93bf 100644 --- a/python/cudf/cudf/tests/test_serialize.py +++ b/python/cudf/cudf/tests/test_serialize.py @@ -38,20 +38,26 @@ # pd.util.testing.makeMultiIndex, # Indices not serialized on device ], ) -def test_serialize(df): +@pytest.mark.parametrize("to_host", [True, False]) +def test_serialize(df, to_host): """ This should hopefully replace all functions below """ a = df() if "cudf" not in type(a).__module__: a = cudf.from_pandas(a) - header, frames = a.serialize() + if to_host: + header, frames = a.host_serialize() + else: + header, frames = a.device_serialize() msgpack.dumps(header) # ensure that header is msgpack serializable ndevice = 0 for frame in frames: - if not isinstance(frame, (bytes, memoryview)): + if hasattr(frame, "__cuda_array_interface__"): ndevice += 1 # Indices etc. will not be DeviceNDArray # but data should be... - if hasattr(df, "_cols"): + if to_host: + assert ndevice == 0 + elif hasattr(df, "_cols"): assert ndevice >= len(df._data) else: assert ndevice > 0