From 50bb796e3c79cc1dc5ffebdefe2061b9f482073e Mon Sep 17 00:00:00 2001 From: Marcel Date: Sun, 16 Mar 2025 20:47:18 +0100 Subject: [PATCH] Refactor PDF extraction functions to accept both file paths and BytesIO objects Added better arguments description, with types informations. Module has functionality for reading bytes pdf documents (useful when fetching document from web), but was documented poorly. Made it a bit better, to let devs know that module has such an option! --- pdftext/extraction.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/pdftext/extraction.py b/pdftext/extraction.py index 9c4e01d..b9f9f52 100644 --- a/pdftext/extraction.py +++ b/pdftext/extraction.py @@ -1,3 +1,4 @@ +from io import BytesIO import atexit import math from concurrent.futures import ProcessPoolExecutor @@ -33,16 +34,16 @@ def worker_shutdown(pdf_doc): pdf_doc.close() -def worker_init(pdf_path, flatten_pdf): +def worker_init(pdf_source: str | BytesIO, flatten_pdf): global pdf_doc - pdf_doc = _load_pdf(pdf_path, flatten_pdf) + pdf_doc = _load_pdf(pdf_source, flatten_pdf) atexit.register(partial(worker_shutdown, pdf_doc)) -def _get_pages(pdf_path, page_range=None, flatten_pdf=False, quote_loosebox=True, workers=None) -> Pages: - pdf_doc = _load_pdf(pdf_path, flatten_pdf) +def _get_pages(pdf_source: str | BytesIO, page_range=None, flatten_pdf=False, quote_loosebox=True, workers=None) -> Pages: + pdf_doc = _load_pdf(pdf_source, flatten_pdf) if page_range is None: page_range = range(len(pdf_doc)) @@ -60,20 +61,20 @@ def _get_pages(pdf_path, page_range=None, flatten_pdf=False, quote_loosebox=True pages_per_worker = math.ceil(len(page_range) / workers) page_range_chunks = [page_range[i * pages_per_worker:(i + 1) * pages_per_worker] for i in range(workers)] - with ProcessPoolExecutor(max_workers=workers, initializer=worker_init, initargs=(pdf_path, flatten_pdf)) as executor: + with ProcessPoolExecutor(max_workers=workers, initializer=worker_init, initargs=(pdf_source, flatten_pdf)) as executor: pages = list(executor.map(_get_page_range, page_range_chunks, repeat(flatten_pdf), repeat(quote_loosebox))) ordered_pages = [page for sublist in pages for page in sublist] return ordered_pages -def plain_text_output(pdf_path, sort=False, hyphens=False, page_range=None, flatten_pdf=False, workers=None) -> str: - text = paginated_plain_text_output(pdf_path, sort=sort, hyphens=hyphens, page_range=page_range, workers=workers, flatten_pdf=flatten_pdf) +def plain_text_output(pdf_source: str | BytesIO, sort=False, hyphens=False, page_range=None, flatten_pdf=False, workers=None) -> str: + text = paginated_plain_text_output(pdf_source, sort=sort, hyphens=hyphens, page_range=page_range, workers=workers, flatten_pdf=flatten_pdf) return "\n".join(text) -def paginated_plain_text_output(pdf_path, sort=False, hyphens=False, page_range=None, flatten_pdf=False, workers=None) -> List[str]: - pages: Pages = _get_pages(pdf_path, page_range, workers=workers, flatten_pdf=flatten_pdf) +def paginated_plain_text_output(pdf_source: str | BytesIO, sort=False, hyphens=False, page_range=None, flatten_pdf=False, workers=None) -> List[str]: + pages: Pages = _get_pages(pdf_source, page_range, workers=workers, flatten_pdf=flatten_pdf) text = [] for page in pages: text.append(merge_text(page, sort=sort, hyphens=hyphens).strip()) @@ -91,7 +92,7 @@ def _process_span(span, page_width, page_height, keep_chars): def dictionary_output( - pdf_path, + pdf_source: str | BytesIO, sort=False, page_range=None, keep_chars=False, @@ -100,10 +101,10 @@ def dictionary_output( disable_links=False, workers=None ) -> Pages: - pages: Pages = _get_pages(pdf_path, page_range, workers=workers, flatten_pdf=flatten_pdf, quote_loosebox=quote_loosebox) + pages: Pages = _get_pages(pdf_source, page_range, workers=workers, flatten_pdf=flatten_pdf, quote_loosebox=quote_loosebox) if not disable_links: - pdf = _load_pdf(pdf_path, False) + pdf = _load_pdf(pdf_source, False) add_links_and_refs(pages, pdf) pdf.close() @@ -132,7 +133,7 @@ def dictionary_output( def table_output( - pdf_path: str, + pdf_source: str | BytesIO, table_inputs: TableInputs, page_range=None, flatten_pdf=False, @@ -142,7 +143,7 @@ def table_output( ) -> List[Tables]: # Extract pages if they don't exist if not pages: - pages: Pages = dictionary_output(pdf_path, page_range=page_range, flatten_pdf=flatten_pdf, quote_loosebox=quote_loosebox, workers=workers, keep_chars=True) + pages: Pages = dictionary_output(pdf_source, page_range=page_range, flatten_pdf=flatten_pdf, quote_loosebox=quote_loosebox, workers=workers, keep_chars=True) assert len(pages) == len(table_inputs), "Number of pages and table inputs must match"