From bd0c1164a058d654d5ba654dbb005ae5b412d00b Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 2 Jan 2017 14:06:41 -0500 Subject: [PATCH 1/2] Don't Blosc-serialize arrays if not compressible --- distributed/protocol/numpy.py | 21 +++++++++++++++++---- distributed/protocol/tests/test_numpy.py | 19 +++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/distributed/protocol/numpy.py b/distributed/protocol/numpy.py index b7a93ea424e..5ed22fb554d 100644 --- a/distributed/protocol/numpy.py +++ b/distributed/protocol/numpy.py @@ -10,6 +10,7 @@ except ImportError: blosc = False +from .compression import byte_sample from .utils import frame_split_size from .serialize import register_serialization from . import pickle @@ -49,13 +50,25 @@ def serialize_numpy_ndarray(x): data = x.view('u1').data - if blosc: + if blosc and len(data) > 1e5: frames = frame_split_size([data]) if sys.version_info.major == 2: frames = [ensure_bytes(frame) for frame in frames] - frames = [blosc.compress(frame, typesize=size, - cname='lz4', clevel=5) for frame in frames] - header['compression'] = ['blosc'] * len(frames) + + out = [] + compression = [] + for frame in frames: + sample = byte_sample(frame, 10000 * size, 5) + csample = blosc.compress(sample, typesize=size, cname='lz4', clevel=3) + if len(csample) < 0.8 * len(sample): + compressed = blosc.compress(frame, typesize=size, cname='lz4', clevel=5) + out.append(compressed) + compression.append('blosc') + else: + out.append(frame) + compression.append(None) + header['compression'] = compression + frames = out else: frames = [data] diff --git a/distributed/protocol/tests/test_numpy.py b/distributed/protocol/tests/test_numpy.py index 7daca3d0081..ebecd8ee6e1 100644 --- a/distributed/protocol/tests/test_numpy.py +++ b/distributed/protocol/tests/test_numpy.py @@ -118,3 +118,22 @@ def test_compress_memoryview(): compression, compressed = maybe_compress(mv) if compression: assert len(compressed) < len(mv) + + +def test_dont_compress_uncompressable_data(): + blosc = pytest.importorskip('blosc') + x = np.random.randint(0, 255, size=100000).astype('uint8') + header, [data] = serialize(x) + assert 'compression' not in header + assert data == x.data + + x = np.ones(1000000) + header, [data] = serialize(x) + assert header['compression'] == ['blosc'] + assert data != x.data + + + x = np.ones(100) + header, [data] = serialize(x) + assert 'compression' not in header + assert data.obj.ctypes.data == x.ctypes.data From 27c4cf0168e5e2f63f71470883a11d527ea35eb2 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 2 Jan 2017 16:33:04 -0500 Subject: [PATCH 2/2] Update dependencies in continuous integration scripts --- .travis.yml | 6 +++--- continuous_integration/setup_conda_environment.cmd | 2 +- distributed/cli/tests/test_dask_scheduler.py | 5 +++++ 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 15fbf959655..1433b3d61af 100644 --- a/.travis.yml +++ b/.travis.yml @@ -48,7 +48,7 @@ install: # Install dependencies - conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION - source activate test-environment - - conda install -q pytest pytest-timeout coverage tornado toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client h5py netcdf4 lz4 paramiko + - conda install -q pytest pytest-timeout coverage tornado toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client h5py netcdf4 lz4 paramiko tblib click -c conda-forge - | if [[ $HDFS == true ]]; then conda install -q libxml2 krb5 boost @@ -59,11 +59,11 @@ install: - pip install -q git+https://github.com/joblib/joblib.git --upgrade - pip install -q git+https://github.com/dask/s3fs.git --upgrade - pip install -q git+https://github.com/dask/zict.git --upgrade - - pip install sortedcollections + - pip install -q sortedcollections msgpack-python - if [[ $PYTHON == '3.5' ]]; then pip install git+https://github.com/jcrist/crick.git; fi # Install distributed - - python setup.py install + - pip install --no-deps -e . script: - export PYTEST_OPTIONS="--verbose -r s --timeout-method=thread --timeout=300 --runslow --durations=20" diff --git a/continuous_integration/setup_conda_environment.cmd b/continuous_integration/setup_conda_environment.cmd index 970dfa2442e..e9820f311ac 100644 --- a/continuous_integration/setup_conda_environment.cmd +++ b/continuous_integration/setup_conda_environment.cmd @@ -15,7 +15,7 @@ call deactivate @rem Create test environment @rem (note: no cytoolz as it seems to prevent faulthandler tracebacks on crash) -%CONDA% create -n %CONDA_ENV% -q -y python=%PYTHON% pytest toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client tblib msgpack-python cloudpickle click zict lz4 +%CONDA% create -n %CONDA_ENV% -q -y python=%PYTHON% pytest toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client tblib msgpack-python cloudpickle click zict lz4 -c conda-forge call activate %CONDA_ENV% diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index bd14d76835b..75e3c7ff432 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -134,11 +134,16 @@ def test_multiple_workers(loop): def test_pid_file(loop): def check_pidfile(proc, pidfile): + start = time() while not os.path.exists(pidfile): sleep(0.01) + assert time() < start + 5 + text = False + start = time() while not text: sleep(0.01) + assert time() < start + 5 with open(pidfile) as f: text = f.read() pid = int(text)