Skip to content

Commit

Permalink
(#25316) Enable LZMA compression in Python SDK I/O (#25317)
Browse files Browse the repository at this point in the history
* (#25316) Added naive first shot at enabling LZMA compression

* (#25316) Added a draft line to CHANGES.md

* (#25316) fix linter issues

* (#25316) update tests (draft)

* (#25316) import order in test file
  • Loading branch information
wrossmorrow authored Feb 13, 2023
1 parent 198b93e commit 6f7d2fb
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Added in JmsIO a retry policy for failed publications (Java) ([#24971](https://github.com/apache/beam/issues/24971)).
* Support for `LZMA` compression/decompression of text files added to the Python SDK ([#25316](https://github.com/apache/beam/issues/25316))

## New Features / Improvements

Expand Down
21 changes: 19 additions & 2 deletions sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import bz2
import io
import logging
import lzma
import os
import posixpath
import re
Expand Down Expand Up @@ -65,6 +66,10 @@ class CompressionTypes(object):
# .bz2 (implies BZIP2 as described below).
# .gz (implies GZIP as described below)
# .deflate (implies DEFLATE as described below)
# .zst (implies ZSTD as described below)
# .zst (implies ZSTD as described below)
# .xz (implies LZMA as described below)
# .lzma (implies LZMA as described below)
# Any non-recognized extension implies UNCOMPRESSED as described below.
AUTO = 'auto'

Expand All @@ -80,6 +85,9 @@ class CompressionTypes(object):
# GZIP compression (deflate with GZIP headers).
GZIP = 'gzip'

# LZMA compression
LZMA = 'lzma'

# Uncompressed (i.e., may be split).
UNCOMPRESSED = 'uncompressed'

Expand All @@ -92,6 +100,7 @@ def is_valid_compression_type(cls, compression_type):
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD,
CompressionTypes.LZMA,
CompressionTypes.UNCOMPRESSED
])
return compression_type in types
Expand All @@ -103,6 +112,7 @@ def mime_type(cls, compression_type, default='application/octet-stream'):
cls.DEFLATE: 'application/x-deflate',
cls.GZIP: 'application/x-gzip',
cls.ZSTD: 'application/zstd',
cls.LZMA: 'application/lzma'
}
return mime_types_by_compression_type.get(compression_type, default)

Expand All @@ -114,7 +124,9 @@ def detect_compression_type(cls, file_path):
'.deflate': cls.DEFLATE,
'.gz': cls.GZIP,
'.zst': cls.ZSTD,
'.zstd': cls.ZSTD
'.zstd': cls.ZSTD,
'.xz': cls.LZMA,
'.lzma': cls.LZMA
}
lowercased_path = file_path.lower()
for suffix, compression_type in compression_types_by_suffix.items():
Expand Down Expand Up @@ -184,6 +196,8 @@ def _initialize_decompressor(self):
# https://github.com/indygreg/python-zstandard/issues/157
self._decompressor = zstandard.ZstdDecompressor(
max_window_size=2147483648).decompressobj()
elif self._compression_type == CompressionTypes.LZMA:
self._decompressor = lzma.LZMADecompressor()
else:
assert self._compression_type == CompressionTypes.GZIP
self._decompressor = zlib.decompressobj(self._gzip_mask)
Expand All @@ -196,6 +210,8 @@ def _initialize_compressor(self):
zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED)
elif self._compression_type == CompressionTypes.ZSTD:
self._compressor = zstandard.ZstdCompressor().compressobj()
elif self._compression_type == CompressionTypes.LZMA:
self._compressor = lzma.LZMACompressor()
else:
assert self._compression_type == CompressionTypes.GZIP
self._compressor = zlib.compressobj(
Expand Down Expand Up @@ -257,7 +273,8 @@ def _fetch_to_internal_buffer(self, num_bytes: int) -> None:
if (self._compression_type == CompressionTypes.BZIP2 or
self._compression_type == CompressionTypes.DEFLATE or
self._compression_type == CompressionTypes.ZSTD or
self._compression_type == CompressionTypes.GZIP):
self._compression_type == CompressionTypes.GZIP or
self._compression_type == CompressionTypes.LZMA):
pass
else:
# Deflate, Gzip and bzip2 formats do not require flushing
Expand Down
25 changes: 19 additions & 6 deletions sdks/python/apache_beam/io/filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import bz2
import gzip
import logging
import lzma
import ntpath
import os
import posixpath
Expand Down Expand Up @@ -316,6 +317,10 @@ def _create_compressed_file(self, compression_type, content):
compress_open = zstandard.open
with compress_open(file_name, 'wb') as f:
f.write(content)
elif compression_type == CompressionTypes.LZMA:
compress_open = lzma.open
with compress_open(file_name, 'wb') as f:
f.write(content)
else:
assert False, "Invalid compression type: %s" % compression_type

Expand All @@ -340,7 +345,8 @@ def test_seek_set(self):
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
Expand Down Expand Up @@ -375,7 +381,8 @@ def test_seek_cur(self):
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
Expand Down Expand Up @@ -410,7 +417,8 @@ def test_read_from_end_returns_no_data(self):
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
Expand All @@ -428,7 +436,8 @@ def test_seek_outside(self):
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
Expand All @@ -453,7 +462,8 @@ def test_read_and_seek_back_to_beginning(self):
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
Expand Down Expand Up @@ -520,6 +530,8 @@ def create_test_file(compression_type, lines):
compress_factory = gzip.open
elif compression_type == CompressionTypes.ZSTD:
compress_factory = zstandard.open
elif compression_type == CompressionTypes.LZMA:
compress_factory = lzma.open
else:
assert False, "Invalid compression type: %s" % compression_type
for line in lines:
Expand Down Expand Up @@ -547,7 +559,8 @@ def timeout_handler():
test_lines = tuple(generate_random_line() for i in range(num_test_lines))
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = create_test_file(compression_type, test_lines)
timer.start()
with open(file_name, 'rb') as f:
Expand Down

0 comments on commit 6f7d2fb

Please sign in to comment.