Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#25316) Enable LZMA compression in Python SDK I/O #25317

Merged
merged 6 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Added in JmsIO a retry policy for failed publications (Java) ([#24971](https://github.com/apache/beam/issues/24971)).
* Support for `LZMA` compression/decompression of text files added to the Python SDK ([#25316](https://github.com/apache/beam/issues/25316))

## New Features / Improvements

Expand Down
21 changes: 19 additions & 2 deletions sdks/python/apache_beam/io/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import bz2
import io
import logging
import lzma
import os
import posixpath
import re
Expand Down Expand Up @@ -65,6 +66,10 @@ class CompressionTypes(object):
# .bz2 (implies BZIP2 as described below).
# .gz (implies GZIP as described below)
# .deflate (implies DEFLATE as described below)
# .zst (implies ZSTD as described below)
# .zst (implies ZSTD as described below)
# .xz (implies LZMA as described below)
# .lzma (implies LZMA as described below)
# Any non-recognized extension implies UNCOMPRESSED as described below.
AUTO = 'auto'

Expand All @@ -80,6 +85,9 @@ class CompressionTypes(object):
# GZIP compression (deflate with GZIP headers).
GZIP = 'gzip'

# LZMA compression
LZMA = 'lzma'

# Uncompressed (i.e., may be split).
UNCOMPRESSED = 'uncompressed'

Expand All @@ -92,6 +100,7 @@ def is_valid_compression_type(cls, compression_type):
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD,
CompressionTypes.LZMA,
CompressionTypes.UNCOMPRESSED
])
return compression_type in types
Expand All @@ -103,6 +112,7 @@ def mime_type(cls, compression_type, default='application/octet-stream'):
cls.DEFLATE: 'application/x-deflate',
cls.GZIP: 'application/x-gzip',
cls.ZSTD: 'application/zstd',
cls.LZMA: 'application/lzma'
}
return mime_types_by_compression_type.get(compression_type, default)

Expand All @@ -114,7 +124,9 @@ def detect_compression_type(cls, file_path):
'.deflate': cls.DEFLATE,
'.gz': cls.GZIP,
'.zst': cls.ZSTD,
'.zstd': cls.ZSTD
'.zstd': cls.ZSTD,
'.xz': cls.LZMA,
'.lzma': cls.LZMA
}
lowercased_path = file_path.lower()
for suffix, compression_type in compression_types_by_suffix.items():
Expand Down Expand Up @@ -184,6 +196,8 @@ def _initialize_decompressor(self):
# https://github.com/indygreg/python-zstandard/issues/157
self._decompressor = zstandard.ZstdDecompressor(
max_window_size=2147483648).decompressobj()
elif self._compression_type == CompressionTypes.LZMA:
self._decompressor = lzma.LZMADecompressor()
else:
assert self._compression_type == CompressionTypes.GZIP
self._decompressor = zlib.decompressobj(self._gzip_mask)
Expand All @@ -196,6 +210,8 @@ def _initialize_compressor(self):
zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED)
elif self._compression_type == CompressionTypes.ZSTD:
self._compressor = zstandard.ZstdCompressor().compressobj()
elif self._compression_type == CompressionTypes.LZMA:
self._compressor = lzma.LZMACompressor()
else:
assert self._compression_type == CompressionTypes.GZIP
self._compressor = zlib.compressobj(
Expand Down Expand Up @@ -257,7 +273,8 @@ def _fetch_to_internal_buffer(self, num_bytes: int) -> None:
if (self._compression_type == CompressionTypes.BZIP2 or
self._compression_type == CompressionTypes.DEFLATE or
self._compression_type == CompressionTypes.ZSTD or
self._compression_type == CompressionTypes.GZIP):
self._compression_type == CompressionTypes.GZIP or
self._compression_type == CompressionTypes.LZMA):
pass
else:
# Deflate, Gzip and bzip2 formats do not require flushing
Expand Down
25 changes: 19 additions & 6 deletions sdks/python/apache_beam/io/filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import bz2
import gzip
import logging
import lzma
import ntpath
import os
import posixpath
Expand Down Expand Up @@ -316,6 +317,10 @@ def _create_compressed_file(self, compression_type, content):
compress_open = zstandard.open
with compress_open(file_name, 'wb') as f:
f.write(content)
elif compression_type == CompressionTypes.LZMA:
compress_open = lzma.open
with compress_open(file_name, 'wb') as f:
f.write(content)
else:
assert False, "Invalid compression type: %s" % compression_type

Expand All @@ -340,7 +345,8 @@ def test_seek_set(self):
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
Expand Down Expand Up @@ -375,7 +381,8 @@ def test_seek_cur(self):
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
Expand Down Expand Up @@ -410,7 +417,8 @@ def test_read_from_end_returns_no_data(self):
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
Expand All @@ -428,7 +436,8 @@ def test_seek_outside(self):
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
Expand All @@ -453,7 +462,8 @@ def test_read_and_seek_back_to_beginning(self):
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.DEFLATE,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = self._create_compressed_file(compression_type, self.content)
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(
Expand Down Expand Up @@ -520,6 +530,8 @@ def create_test_file(compression_type, lines):
compress_factory = gzip.open
elif compression_type == CompressionTypes.ZSTD:
compress_factory = zstandard.open
elif compression_type == CompressionTypes.LZMA:
compress_factory = lzma.open
else:
assert False, "Invalid compression type: %s" % compression_type
for line in lines:
Expand Down Expand Up @@ -547,7 +559,8 @@ def timeout_handler():
test_lines = tuple(generate_random_line() for i in range(num_test_lines))
for compression_type in [CompressionTypes.BZIP2,
CompressionTypes.GZIP,
CompressionTypes.ZSTD]:
CompressionTypes.ZSTD,
CompressionTypes.LZMA]:
file_name = create_test_file(compression_type, test_lines)
timer.start()
with open(file_name, 'rb') as f:
Expand Down