From 5110ae3fe6b41b64b70edc009cb76fe2d3f60036 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 8 Jun 2020 19:40:16 -0700 Subject: [PATCH 1/3] Drop old frame splitting in NumPy serialization This is leftover from when Blosc compression was still being handled with the NumPy serialization code path itself. These days frame splitting already happens as part of serialization more generally before compressing data. Also this cutoff is not relevant for when splitting occurs as that is determined by a limit in the size of buffers Blosc can handle, which `frame_split_size` already knows. So go ahead and drop this special casing in NumPy serialization and rely on these commonly shared code paths that all serialization passes through. --- distributed/protocol/numpy.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/distributed/protocol/numpy.py b/distributed/protocol/numpy.py index a0df77c8b37..a934c53f31f 100644 --- a/distributed/protocol/numpy.py +++ b/distributed/protocol/numpy.py @@ -1,7 +1,7 @@ import math import numpy as np -from .utils import frame_split_size, merge_frames +from .utils import merge_frames from .serialize import dask_serialize, dask_deserialize from . import pickle @@ -87,10 +87,7 @@ def serialize_numpy_ndarray(x): if broadcast_to is not None: header["broadcast_to"] = broadcast_to - if x.nbytes > 1e5: - frames = frame_split_size(data) - else: - frames = [data] + frames = [data] header["lengths"] = [x.nbytes] From 89140428551d032b10b3663b55eafbd932fae3dd Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 8 Jun 2020 19:40:21 -0700 Subject: [PATCH 2/3] Drop `merge_frames` in NumPy deserialization Since the frames are not being split in serialization and other common code paths to all deserialization already make sure frames match their expected length, there should be no need to merge frames in NumPy deserialization. So drop this as well. --- distributed/protocol/numpy.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/distributed/protocol/numpy.py b/distributed/protocol/numpy.py index a934c53f31f..4c91bc9813e 100644 --- a/distributed/protocol/numpy.py +++ b/distributed/protocol/numpy.py @@ -1,7 +1,6 @@ import math import numpy as np -from .utils import merge_frames from .serialize import dask_serialize, dask_deserialize from . import pickle @@ -97,9 +96,6 @@ def serialize_numpy_ndarray(x): @dask_deserialize.register(np.ndarray) def deserialize_numpy_ndarray(header, frames): with log_errors(): - if len(frames) > 1: - frames = merge_frames(header, frames) - if header.get("pickle"): return pickle.loads(frames[0]) From 3fc13a557739cfad943a39b860d2d10edbcb65eb Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 8 Jun 2020 20:27:53 -0700 Subject: [PATCH 3/3] Unpack `frames` in NumPy deserialization To simplify the code a bit and assert the number of frames is exactly 1, unpack `frames` into a single `frame` variable. --- distributed/protocol/numpy.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/numpy.py b/distributed/protocol/numpy.py index 4c91bc9813e..2140c2f0c4e 100644 --- a/distributed/protocol/numpy.py +++ b/distributed/protocol/numpy.py @@ -96,8 +96,10 @@ def serialize_numpy_ndarray(x): @dask_deserialize.register(np.ndarray) def deserialize_numpy_ndarray(header, frames): with log_errors(): + (frame,) = frames + if header.get("pickle"): - return pickle.loads(frames[0]) + return pickle.loads(frame) is_custom, dt = header["dtype"] if is_custom: @@ -110,7 +112,7 @@ def deserialize_numpy_ndarray(header, frames): else: shape = header["shape"] - x = np.ndarray(shape, dtype=dt, buffer=frames[0], strides=header["strides"]) + x = np.ndarray(shape, dtype=dt, buffer=frame, strides=header["strides"]) return x