Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ install:
# Install dependencies
- conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION
- source activate test-environment
- conda install -q pytest pytest-timeout coverage tornado toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client h5py netcdf4 lz4 paramiko
- conda install -q pytest pytest-timeout coverage tornado toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client h5py netcdf4 lz4 paramiko tblib click -c conda-forge
- |
if [[ $HDFS == true ]]; then
conda install -q libxml2 krb5 boost
Expand All @@ -59,11 +59,11 @@ install:
- pip install -q git+https://github.com/joblib/joblib.git --upgrade
- pip install -q git+https://github.com/dask/s3fs.git --upgrade
- pip install -q git+https://github.com/dask/zict.git --upgrade
- pip install sortedcollections
- pip install -q sortedcollections msgpack-python
- if [[ $PYTHON == '3.5' ]]; then pip install git+https://github.com/jcrist/crick.git; fi

# Install distributed
- python setup.py install
- pip install --no-deps -e .

script:
- export PYTEST_OPTIONS="--verbose -r s --timeout-method=thread --timeout=300 --runslow --durations=20"
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/setup_conda_environment.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ call deactivate

@rem Create test environment
@rem (note: no cytoolz as it seems to prevent faulthandler tracebacks on crash)
%CONDA% create -n %CONDA_ENV% -q -y python=%PYTHON% pytest toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client tblib msgpack-python cloudpickle click zict lz4
%CONDA% create -n %CONDA_ENV% -q -y python=%PYTHON% pytest toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client tblib msgpack-python cloudpickle click zict lz4 -c conda-forge

call activate %CONDA_ENV%

Expand Down
5 changes: 5 additions & 0 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,16 @@ def test_multiple_workers(loop):

def test_pid_file(loop):
def check_pidfile(proc, pidfile):
start = time()
while not os.path.exists(pidfile):
sleep(0.01)
assert time() < start + 5

text = False
start = time()
while not text:
sleep(0.01)
assert time() < start + 5
with open(pidfile) as f:
text = f.read()
pid = int(text)
Expand Down
21 changes: 17 additions & 4 deletions distributed/protocol/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
except ImportError:
blosc = False

from .compression import byte_sample
from .utils import frame_split_size
from .serialize import register_serialization
from . import pickle
Expand Down Expand Up @@ -49,13 +50,25 @@ def serialize_numpy_ndarray(x):

data = x.view('u1').data

if blosc:
if blosc and len(data) > 1e5:
frames = frame_split_size([data])
if sys.version_info.major == 2:
frames = [ensure_bytes(frame) for frame in frames]
frames = [blosc.compress(frame, typesize=size,
cname='lz4', clevel=5) for frame in frames]
header['compression'] = ['blosc'] * len(frames)

out = []
compression = []
for frame in frames:
sample = byte_sample(frame, 10000 * size, 5)
csample = blosc.compress(sample, typesize=size, cname='lz4', clevel=3)
if len(csample) < 0.8 * len(sample):
compressed = blosc.compress(frame, typesize=size, cname='lz4', clevel=5)
out.append(compressed)
compression.append('blosc')
else:
out.append(frame)
compression.append(None)
header['compression'] = compression
frames = out
else:
frames = [data]

Expand Down
19 changes: 19 additions & 0 deletions distributed/protocol/tests/test_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,22 @@ def test_compress_memoryview():
compression, compressed = maybe_compress(mv)
if compression:
assert len(compressed) < len(mv)


def test_dont_compress_uncompressable_data():
blosc = pytest.importorskip('blosc')
x = np.random.randint(0, 255, size=100000).astype('uint8')
header, [data] = serialize(x)
assert 'compression' not in header
assert data == x.data

x = np.ones(1000000)
header, [data] = serialize(x)
assert header['compression'] == ['blosc']
assert data != x.data


x = np.ones(100)
header, [data] = serialize(x)
assert 'compression' not in header
assert data.obj.ctypes.data == x.ctypes.data