From 6d5ea36908ebf58425a1b7c34d2165ad68ab2e55 Mon Sep 17 00:00:00 2001 From: Anthony Ricaud Date: Sun, 2 Apr 2023 15:24:03 +0100 Subject: [PATCH] Compress each file in a ProcessPool fix https://github.com/evansd/whitenoise/issues/148 --- src/whitenoise/compress.py | 21 ++++++++++----- src/whitenoise/storage.py | 54 ++++++++++++++++++++++++++------------ 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/src/whitenoise/compress.py b/src/whitenoise/compress.py index 143e1e44..c8a0d834 100644 --- a/src/whitenoise/compress.py +++ b/src/whitenoise/compress.py @@ -1,6 +1,7 @@ from __future__ import annotations import argparse +import concurrent.futures import gzip import os import re @@ -77,7 +78,7 @@ def should_compress(self, filename): def log(self, message): pass - def compress(self, path): + def _lazy_compress(self, path): with open(path, "rb") as f: stat_result = os.fstat(f.fileno()) data = f.read() @@ -94,6 +95,9 @@ def compress(self, path): if self.is_compressed_effectively("Gzip", path, size, compressed): yield self.write_data(path, compressed, ".gz", stat_result) + def compress(self, path): + return list(self._lazy_compress(path)) + @staticmethod def compress_gzip(data): output = BytesIO() @@ -133,6 +137,12 @@ def write_data(self, path, data, suffix, stat_result): os.utime(filename, (stat_result.st_atime, stat_result.st_mtime)) return filename + def files_to_compress(self, root): + for dirpath, _dirs, files in os.walk(root): + for filename in files: + if self.should_compress(filename): + yield os.path.join(dirpath, filename) + def main(argv=None): parser = argparse.ArgumentParser( @@ -175,12 +185,9 @@ def main(argv=None): use_brotli=args.use_brotli, quiet=args.quiet, ) - for dirpath, _dirs, files in os.walk(args.root): - for filename in files: - if compressor.should_compress(filename): - path = os.path.join(dirpath, filename) - for _compressed in compressor.compress(path): - pass + + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map(compressor.compress, compressor.files_to_compress(args.root)) return 0 diff --git a/src/whitenoise/storage.py b/src/whitenoise/storage.py index 029b0ff8..9fb30460 100644 --- a/src/whitenoise/storage.py +++ b/src/whitenoise/storage.py @@ -1,5 +1,6 @@ from __future__ import annotations +import concurrent.futures import errno import os import re @@ -30,15 +31,24 @@ def post_process( return extensions = getattr(settings, "WHITENOISE_SKIP_COMPRESS_EXTENSIONS", None) - compressor = self.create_compressor(extensions=extensions, quiet=True) - - for path in paths: - if compressor.should_compress(path): - full_path = self.path(path) - prefix_len = len(full_path) - len(path) - for compressed_path in compressor.compress(full_path): - compressed_name = compressed_path[prefix_len:] - yield path, compressed_name, True + self.compressor = self.create_compressor(extensions=extensions, quiet=True) + + to_compress = (path for path in paths if self.compressor.should_compress(path)) + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = ( + executor.submit(self._compress_one, path) for path in to_compress + ) + for compressed_paths in concurrent.futures.as_completed(futures): + yield from compressed_paths.result() + + def _compress_one(self, path: str) -> list[tuple[str, str, bool]]: + compressed: list[tuple[str, str, bool]] = [] + full_path = self.path(path) + prefix_len = len(full_path) - len(path) + for compressed_path in self.compressor.compress(full_path): + compressed_name = compressed_path[prefix_len:] + compressed.append((path, compressed_name, True)) + return compressed def create_compressor(self, **kwargs: Any) -> Compressor: return Compressor(**kwargs) @@ -130,14 +140,24 @@ def create_compressor(self, **kwargs): def compress_files(self, names): extensions = getattr(settings, "WHITENOISE_SKIP_COMPRESS_EXTENSIONS", None) - compressor = self.create_compressor(extensions=extensions, quiet=True) - for name in names: - if compressor.should_compress(name): - path = self.path(name) - prefix_len = len(path) - len(name) - for compressed_path in compressor.compress(path): - compressed_name = compressed_path[prefix_len:] - yield name, compressed_name + self.compressor = self.create_compressor(extensions=extensions, quiet=True) + + to_compress = (name for name in names if self.compressor.should_compress(name)) + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = ( + executor.submit(self._compress_one, name) for name in to_compress + ) + for compressed_paths in concurrent.futures.as_completed(futures): + yield from compressed_paths.result() + + def _compress_one(self, name: str) -> list[tuple[str, str]]: + compressed: list[tuple[str, str]] = [] + path = self.path(name) + prefix_len = len(path) - len(name) + for compressed_path in self.compressor.compress(path): + compressed_name = compressed_path[prefix_len:] + compressed.append((name, compressed_name)) + return compressed def make_helpful_exception(self, exception, name): """