diff --git a/.github/workflows/s390x.yml b/.github/workflows/s390x.yml index e0e6af504..7e173ba93 100644 --- a/.github/workflows/s390x.yml +++ b/.github/workflows/s390x.yml @@ -58,6 +58,7 @@ jobs: pkg-config \ python3 \ python3-astropy \ + python3-blosc \ python3-lz4 \ python3-numpy \ python3-scipy \ diff --git a/CHANGES.rst b/CHANGES.rst index 5293cd874..93d22d8cd 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,8 @@ +Upcoming: +--------- + +- Added support for ``blosc`` compression algorithm [#1678]. + 3.0.1 (2023-10-30) ------------------ diff --git a/README.rst b/README.rst index 9cf905a53..0dbc0ba85 100644 --- a/README.rst +++ b/README.rst @@ -162,9 +162,11 @@ It is possible to compress the array data when writing the file: af.write_to("compressed.asdf", all_array_compression="zlib") -The built-in compression algorithms are ``'zlib'``, and ``'bzp2'``. The -``'lz4'`` algorithm becomes available when the `lz4 `__ package -is installed. Other compression algorithms may be available via extensions. +The built-in compression algorithms are ``'zlib'``, and ``'bzp2'``. +The ``'lz4'`` and ``'blosc'``` algorithms become available when the +`lz4 `__ or `blosc +`__ packages are installed, respectively. +Other compression algorithms may be available via extensions. .. _end-compress-file: diff --git a/asdf/_tests/_block/test_options.py b/asdf/_tests/_block/test_options.py index 22bade26c..cab0b67ae 100644 --- a/asdf/_tests/_block/test_options.py +++ b/asdf/_tests/_block/test_options.py @@ -7,7 +7,7 @@ valid_storage_types = ["internal", "external", "streamed", "inline"] valid_default_storage_types = [st for st in valid_storage_types if st != "streamed"] -valid_compression_types = [None, "zlib", "bzp2", "lz4", ""] +valid_compression_types = [None, "zlib", "bzp2", "lz4", "blsc", ""] invalid_storage_types = ["foo", "bar"] invalid_compression_types = ["input", "foo"] diff --git a/asdf/_tests/commands/tests/test_defragment.py b/asdf/_tests/commands/tests/test_defragment.py index e21b7f951..1a1364434 100644 --- a/asdf/_tests/commands/tests/test_defragment.py +++ b/asdf/_tests/commands/tests/test_defragment.py @@ -53,3 +53,8 @@ def test_defragment_bzp2(tmpdir): def test_defragment_lz4(tmpdir): pytest.importorskip("lz4") _test_defragment(tmpdir, "lz4") + + +def test_defragment_blosc(tmpdir): + pytest.importorskip("blosc") + _test_defragment(tmpdir, "blsc") diff --git a/asdf/_tests/test_array_blocks.py b/asdf/_tests/test_array_blocks.py index f2c2b626b..b3c20c773 100644 --- a/asdf/_tests/test_array_blocks.py +++ b/asdf/_tests/test_array_blocks.py @@ -858,7 +858,7 @@ def test_add_block_before_fully_loaded(tmp_path): @pytest.mark.parametrize("all_array_storage", ["internal", "external", "inline"]) -@pytest.mark.parametrize("all_array_compression", [None, "", "zlib", "bzp2", "lz4", "input"]) +@pytest.mark.parametrize("all_array_compression", [None, "", "zlib", "bzp2", "lz4", "blsc", "input"]) @pytest.mark.parametrize("compression_kwargs", [None, {}]) def test_write_to_update_storage_options(tmp_path, all_array_storage, all_array_compression, compression_kwargs): if all_array_compression == "bzp2" and compression_kwargs is not None: diff --git a/asdf/_tests/test_compression.py b/asdf/_tests/test_compression.py index 150d27cc5..de3f5d0e7 100644 --- a/asdf/_tests/test_compression.py +++ b/asdf/_tests/test_compression.py @@ -113,6 +113,13 @@ def test_lz4(tmp_path): _roundtrip(tmp_path, tree, "lz4") +def test_blosc(tmp_path): + pytest.importorskip("blosc") + tree = _get_large_tree() + + _roundtrip(tmp_path, tree, "blsc") + + def test_recompression(tmp_path): tree = _get_large_tree() tmpfile = os.path.join(str(tmp_path), "test1.asdf") @@ -191,6 +198,7 @@ def test_nonnative_endian_compression(tmp_path): bedata = np.arange(1000, dtype=">i8") _roundtrip(tmp_path, {"ledata": ledata, "bedata": bedata}, "lz4") + _roundtrip(tmp_path, {"ledata": ledata, "bedata": bedata}, "blsc") class LzmaCompressor(Compressor): diff --git a/asdf/asdf.py b/asdf/asdf.py index 924274d8f..f9c06f543 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -674,6 +674,8 @@ def set_array_compression(self, arr, compression, **compression_kwargs): - ``lz4``: Use lz4 compression + - ``blsc``: Use blosc compression + - ``input``: Use the same compression as in the file read. If there is no prior file, acts as None. @@ -1003,6 +1005,8 @@ def update( - ``lz4``: Use lz4 compression. + - ``blsc``: Use blosc compression. + - ``input``: Use the same compression as in the file read. If there is no prior file, acts as None @@ -1156,6 +1160,8 @@ def write_to( - ``lz4``: Use lz4 compression. + - ``blsc``: Use blosc compression. + - ``input``: Use the same compression as in the file read. If there is no prior file, acts as None. diff --git a/asdf/commands/defragment.py b/asdf/commands/defragment.py index 36e1106dd..bdbc0825a 100644 --- a/asdf/commands/defragment.py +++ b/asdf/commands/defragment.py @@ -33,8 +33,8 @@ def setup_arguments(cls, subparsers): "-c", type=str, nargs="?", - choices=["zlib", "bzp2", "lz4"], - help="""Compress blocks using one of "zlib", "bzp2" or "lz4".""", + choices=["zlib", "bzp2", "lz4", "blsc"], + help="""Compress blocks using one of "zlib", "bzp2", "lz4", or "blsc".""", ) parser.set_defaults(func=cls.run) diff --git a/asdf/compression.py b/asdf/compression.py index 89076e313..e248e3913 100644 --- a/asdf/compression.py +++ b/asdf/compression.py @@ -34,7 +34,7 @@ def validate(compression): compression = compression.strip("\0") - builtin_labels = ["zlib", "bzp2", "lz4", "input"] + builtin_labels = ["zlib", "bzp2", "lz4", "blsc", "input"] ext_labels = _get_all_compression_extension_labels() all_labels = ext_labels + builtin_labels @@ -136,6 +136,56 @@ def decompress(self, blocks, out, **kwargs): return bytesout +class BloscCompressor: + # Note that blosc supports only inputs of up to about 2 GByte. + # blosc2, which is not yet available as Python package, supports larger inputs. + def __init__(self): + try: + import blosc + except ImportError as err: + msg = ( + "blosc library in not installed in your Python environment, " + "therefore the compressed block in this ASDF file " + "can not be decompressed." + ) + raise ImportError(msg) from err + + self._api = blosc + + def compress(self, data, **kwargs): + typesize = data.itemsize + clevel = 9 + # Coded name (‘blosclz’, ‘lz4’, ‘lz4hc’, ‘snappy’, ‘zlib’, ‘zstd’) + cname = "blosclz" + # Shuffle filter (`SHUFFLE`, `NOSHUFFLE`, `BITSHUFFLE`) + shuffle = self._api.BITSHUFFLE + nthreads = 1 + # nthreads = self._api.ncores) + self._api.set_nthreads(nthreads) + _output = self._api.compress(data, typesize=typesize, clevel=clevel, shuffle=shuffle, cname=cname) + yield _output + + def decompress(self, blocks, out, **kwargs): + # There is no Python API for piecewise decompression. We need to collect all data first. + # TODO: Read all data at once instead of piecewise. + blocks = [block for block in blocks] + size = sum(len(block) for block in blocks) + buffer = np.empty(size, dtype=np.uint8) + base = 0 + for block in blocks: + buffer[base:base+len(block)] = np.frombuffer(memoryview(block), dtype=buffer.dtype) + base += len(block) + assert base == len(buffer) + nthreads = 1 + # nthreads = self._api.ncores) + self._api.set_nthreads(nthreads) + _out = self._api.decompress(buffer) + nbytes = len(out) + # TODO: call `self._api.decompress_ptr` instead to avoid copying the output + np.frombuffer(out, dtype=np.uint8)[0:nbytes] = np.frombuffer(_out, dtype=np.uint8) + return nbytes + + class ZlibCompressor: def compress(self, data, **kwargs): comp = zlib.compress(data, **kwargs) @@ -214,6 +264,8 @@ def _get_compressor(label): comp = Bzp2Compressor() elif label == "lz4": comp = Lz4Compressor() + elif label == "blsc": + comp = BloscCompressor() else: msg = f"Unknown compression type: '{label}'" raise ValueError(msg) diff --git a/asdf/config.py b/asdf/config.py index 911c7a9b3..a1c71778b 100644 --- a/asdf/config.py +++ b/asdf/config.py @@ -363,6 +363,8 @@ def all_array_compression(self): - ``lz4``: Use lz4 compression. + - ``blsc``: Use blosc compression. + - ``input``: Use the same compression as in the file read. If there is no prior file, acts as None """ diff --git a/asdf/extension/_serialization_context.py b/asdf/extension/_serialization_context.py index 95e2a0b67..3c65061d9 100644 --- a/asdf/extension/_serialization_context.py +++ b/asdf/extension/_serialization_context.py @@ -203,6 +203,8 @@ def set_array_compression(self, arr, compression, **compression_kwargs): - ``lz4``: Use lz4 compression + - ``blsc``: Use blosc compression + - ``input``: Use the same compression as in the file read. If there is no prior file, acts as None. diff --git a/docs/asdf/arrays.rst b/docs/asdf/arrays.rst index 34c4cbf2e..98d3ec7cc 100644 --- a/docs/asdf/arrays.rst +++ b/docs/asdf/arrays.rst @@ -245,6 +245,10 @@ The `lz4 `__ compression algorithm is also supported, but requires the optional `lz4 `__ package in order to work. +A `bitshuffle `__ compression algorithm is also +supported, but requires the optional +`blosc `__ package in order to work. + When reading a file with compressed blocks, the blocks will be automatically decompressed when accessed. If a file with compressed blocks is read and then written out again, by default the new file will use the same compression as the diff --git a/docs/asdf/install.rst b/docs/asdf/install.rst index 4842d0036..919c78c59 100644 --- a/docs/asdf/install.rst +++ b/docs/asdf/install.rst @@ -20,6 +20,9 @@ types. One recommended option is the `asdf-astropy `__ compression is provided by the `lz4 `__ package. +Optional support for `bitshuffle `__ +compression is provided by the `blosc `__ package. + Installing with pip =================== diff --git a/pyproject.toml b/pyproject.toml index 3f954c247..164d8608a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ [project.optional-dependencies] all = [ "lz4>=0.10", + "blosc>=1.11.1", ] docs = [ "sphinx-asdf>=0.2.2", @@ -45,6 +46,7 @@ docs = [ tests = [ "fsspec[http]>=2022.8.2", "lz4>=0.10", + "blosc>=1.11.1", "psutil", "pytest>=6", "pytest-doctestplus",