Skip to content

Commit

Permalink
(apache#25316) Added naive first shot at enabling LZMA compression
Browse files Browse the repository at this point in the history
  • Loading branch information
wrossmorrow committed Feb 5, 2023
1 parent 16cb63b commit 1b8c72f
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import bz2
import io
import logging
import lzma
import os
import posixpath
import re
Expand Down Expand Up @@ -65,6 +66,10 @@ class CompressionTypes(object):
# .bz2 (implies BZIP2 as described below).
# .gz (implies GZIP as described below)
# .deflate (implies DEFLATE as described below)
# .zst (implies ZSTD as described below)
# .zst (implies ZSTD as described below)
# .xz (implies LZMA as described below)
# .lzma (implies LZMA as described below)
# Any non-recognized extension implies UNCOMPRESSED as described below.
AUTO = 'auto'

Expand All @@ -80,6 +85,9 @@ class CompressionTypes(object):
# GZIP compression (deflate with GZIP headers).
GZIP = 'gzip'

# LZMA compression
LZMA = 'lzma'

# Uncompressed (i.e., may be split).
UNCOMPRESSED = 'uncompressed'

Expand All @@ -92,6 +100,7 @@ def is_valid_compression_type(cls, compression_type):
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD,
CompressionTypes.LZMA,
CompressionTypes.UNCOMPRESSED
])
return compression_type in types
Expand All @@ -103,6 +112,7 @@ def mime_type(cls, compression_type, default='application/octet-stream'):
cls.DEFLATE: 'application/x-deflate',
cls.GZIP: 'application/x-gzip',
cls.ZSTD: 'application/zstd',
cls.LZMA: 'application/lzma'
}
return mime_types_by_compression_type.get(compression_type, default)

Expand All @@ -114,7 +124,9 @@ def detect_compression_type(cls, file_path):
'.deflate': cls.DEFLATE,
'.gz': cls.GZIP,
'.zst': cls.ZSTD,
'.zstd': cls.ZSTD
'.zstd': cls.ZSTD,
'.xz': cls.LZMA,
'.lzma': cls.LZMA
}
lowercased_path = file_path.lower()
for suffix, compression_type in compression_types_by_suffix.items():
Expand Down Expand Up @@ -184,6 +196,8 @@ def _initialize_decompressor(self):
# https://github.com/indygreg/python-zstandard/issues/157
self._decompressor = zstandard.ZstdDecompressor(
max_window_size=2147483648).decompressobj()
elif self._compression_type == CompressionTypes.LZMA:
self._decompressor = lzma.LZMADecompressor()
else:
assert self._compression_type == CompressionTypes.GZIP
self._decompressor = zlib.decompressobj(self._gzip_mask)
Expand All @@ -196,6 +210,8 @@ def _initialize_compressor(self):
zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED)
elif self._compression_type == CompressionTypes.ZSTD:
self._compressor = zstandard.ZstdCompressor().compressobj()
elif self._compression_type == CompressionTypes.LZMA:
self._compressor = lzma.LZMACompressor()
else:
assert self._compression_type == CompressionTypes.GZIP
self._compressor = zlib.compressobj(
Expand Down Expand Up @@ -257,7 +273,8 @@ def _fetch_to_internal_buffer(self, num_bytes: int) -> None:
if (self._compression_type == CompressionTypes.BZIP2 or
self._compression_type == CompressionTypes.DEFLATE or
self._compression_type == CompressionTypes.ZSTD or
self._compression_type == CompressionTypes.GZIP):
self._compression_type == CompressionTypes.GZIP or
self._compression_type == CompressionTypes.LZMA):
pass
else:
# Deflate, Gzip and bzip2 formats do not require flushing
Expand Down Expand Up @@ -948,4 +965,4 @@ def delete(self, paths):
Raises:
``BeamIOError``: if any of the delete operations fail
"""
raise NotImplementedError
raise NotImplementedError

0 comments on commit 1b8c72f

Please sign in to comment.