diff --git a/camelot/cli.py b/camelot/cli.py index e45664c1..cc349176 100644 --- a/camelot/cli.py +++ b/camelot/cli.py @@ -39,6 +39,12 @@ def set_config(self, key, value): default="1", help="Comma-separated page numbers." " Example: 1,3,4 or 1,4-end or all.", ) +@click.option( + "--parallel", + is_flag=True, + default=False, + help="Read pdf pages in parallel using all CPU cores.", +) @click.option("-pw", "--password", help="Password for decryption.") @click.option("-o", "--output", help="Output file path.") @click.option( diff --git a/camelot/handlers.py b/camelot/handlers.py index 66ee1697..74ddde7a 100644 --- a/camelot/handlers.py +++ b/camelot/handlers.py @@ -1,3 +1,4 @@ +import multiprocessing as mp import os import sys from pathlib import Path @@ -143,7 +144,12 @@ def _save_page(self, filepath: Union[StrByteType, Path], page, temp): instream.close() def parse( - self, flavor="lattice", suppress_stdout=False, layout_kwargs=None, **kwargs + self, + flavor="lattice", + suppress_stdout=False, + parallel=False, + layout_kwargs=None, + **kwargs ): """Extracts tables by calling parser.get_tables on all single page PDFs. @@ -153,8 +159,10 @@ def parse( flavor : str (default: 'lattice') The parsing method to use ('lattice' or 'stream'). Lattice is used by default. - suppress_stdout : str (default: False) + suppress_stdout : bool (default: False) Suppress logs and warnings. + parallel : bool (default: False) + Process pages in parallel using all available cpu cores. layout_kwargs : dict, optional (default: {}) A dict of `pdfminer.layout.LAParams `_ kwargs. @@ -171,14 +179,56 @@ def parse( layout_kwargs = {} tables = [] + parser = Lattice(**kwargs) if flavor == "lattice" else Stream(**kwargs) with TemporaryDirectory() as tempdir: - for p in self.pages: - self._save_page(self.filepath, p, tempdir) - pages = [os.path.join(tempdir, f"page-{p}.pdf") for p in self.pages] - parser = Lattice(**kwargs) if flavor == "lattice" else Stream(**kwargs) - for p in pages: - t = parser.extract_tables( - p, suppress_stdout=suppress_stdout, layout_kwargs=layout_kwargs - ) - tables.extend(t) + cpu_count = mp.cpu_count() + # Using multiprocessing only when cpu_count > 1 to prevent a stallness issue + # when cpu_count is 1 + if parallel and len(self.pages) > 1 and cpu_count > 1: + with mp.get_context("spawn").Pool(processes=cpu_count) as pool: + jobs = [] + for p in self.pages: + j = pool.apply_async( + self._parse_page,(p, tempdir, parser, suppress_stdout, layout_kwargs) + ) + jobs.append(j) + + for j in jobs: + t = j.get() + tables.extend(t) + else: + for p in self.pages: + t = self._parse_page(p, tempdir, parser, suppress_stdout, layout_kwargs) + tables.extend(t) + return TableList(sorted(tables)) + + def _parse_page( + self, page, tempdir, parser, suppress_stdout, layout_kwargs + ): + """Extracts tables by calling parser.get_tables on a single + page PDF. + + Parameters + ---------- + page : str + Page number to parse + parser : Lattice or Stream + The parser to use (Lattice or Stream). + suppress_stdout : bool + Suppress logs and warnings. + layout_kwargs : dict, optional (default: {}) + A dict of `pdfminer.layout.LAParams `_ kwargs. + + Returns + ------- + tables : camelot.core.TableList + List of tables found in PDF. + + """ + self._save_page(self.filepath, page, tempdir) + page_path = os.path.join(tempdir, f"page-{page}.pdf") + tables = parser.extract_tables( + page_path, suppress_stdout=suppress_stdout, layout_kwargs=layout_kwargs + ) + return tables diff --git a/camelot/io.py b/camelot/io.py index 78319bc9..12718828 100644 --- a/camelot/io.py +++ b/camelot/io.py @@ -15,6 +15,7 @@ def read_pdf( password=None, flavor="lattice", suppress_stdout=False, + parallel=False, layout_kwargs=None, **kwargs ): @@ -37,6 +38,8 @@ def read_pdf( Lattice is used by default. suppress_stdout : bool, optional (default: True) Print all logs and warnings. + parallel : bool, optional (default: False) + Process pages in parallel using all available cpu cores. layout_kwargs : dict, optional (default: {}) A dict of `pdfminer.layout.LAParams `_ kwargs. @@ -122,6 +125,7 @@ def read_pdf( tables = p.parse( flavor=flavor, suppress_stdout=suppress_stdout, + parallel=parallel, layout_kwargs=layout_kwargs, **kwargs ) diff --git a/docs/user/quickstart.rst b/docs/user/quickstart.rst index aa9c9fa3..c3cff640 100644 --- a/docs/user/quickstart.rst +++ b/docs/user/quickstart.rst @@ -99,6 +99,26 @@ By default, Camelot only uses the first page of the PDF to extract tables. To sp The ``pages`` keyword argument accepts pages as comma-separated string of page numbers. You can also specify page ranges — for example, ``pages=1,4-10,20-30`` or ``pages=1,4-10,20-end``. +Extract tables in parallel +-------------------------- + +Camelot supports extracting tables in parrallel using all the available CPU cores. + +:: + + >>> tables = camelot.read_pdf('foo.pdf', page='all', parallel=True) + >>> tables + + +.. tip:: + Here's how you can do the same with the :ref:`command-line interface `. + :: + + $ camelot --pages all --parallel lattice foo.pdf + +.. note:: The reading of the PDF document is parallelized by processing pages by different CPU core. + Therefore, a document with a low page count could be slower to process in parallel. + Reading encrypted PDFs ---------------------- diff --git a/tests/files/diesel_engines.pdf b/tests/files/diesel_engines.pdf new file mode 100644 index 00000000..2f14f161 Binary files /dev/null and b/tests/files/diesel_engines.pdf differ diff --git a/tests/test_cli.py b/tests/test_cli.py index 27ac41c5..0903dfa3 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -62,6 +62,30 @@ def test_cli_stream(testdir): assert format_error in result.output +@skip_on_windows +def test_cli_parallel(testdir): + with TemporaryDirectory() as tempdir: + infile = os.path.join(testdir, "diesel_engines.pdf") + outfile = os.path.join(tempdir, "diesel_engines.csv") + runner = CliRunner() + result = runner.invoke( + cli, + [ + "--parallel", + "--pages", + "1,2,3", + "--format", + "csv", + "--output", + outfile, + "lattice", + infile, + ], + ) + assert result.exit_code == 0 + assert result.output == "Found 2 tables\n" + + def test_cli_password(testdir): with TemporaryDirectory() as tempdir: infile = os.path.join(testdir, "health_protected.pdf")