Skip to content

Commit

Permalink
Adding -u/--unordered flag to extract & scrape commands
Browse files Browse the repository at this point in the history
Fix #901
  • Loading branch information
Yomguithereal committed Oct 27, 2023
1 parent 53b285e commit afebd4c
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 13 deletions.
14 changes: 10 additions & 4 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,8 @@ Usage: minet extract [-h] [-g] [--silent]
[--status-column STATUS_COLUMN]
[--encoding-column ENCODING_COLUMN]
[--mimetype-column MIMETYPE_COLUMN] [--encoding ENCODING]
[-i INPUT] [--explode EXPLODE] [-s SELECT] [--total TOTAL]
[--resume] [-o OUTPUT]
[-u] [-i INPUT] [--explode EXPLODE] [-s SELECT]
[--total TOTAL] [--resume] [-o OUTPUT]
[path_or_path_column]

# Minet Extract Command
Expand Down Expand Up @@ -868,6 +868,9 @@ Optional Arguments:
--status-column STATUS_COLUMN
Name of the CSV column containing HTTP status.
Defaults to `http_status`.
-u, --unordered Whether to allow the result to be written in an
arbitrary order dependent on the multiprocessing
scheduling. Can improve performance.
-s, --select SELECT Columns of -i/--input CSV file to include in the
output (separated by `,`). Use an empty string
if you don't want to keep anything: --select ''.
Expand Down Expand Up @@ -1119,7 +1122,7 @@ Usage: minet scrape [-h] [--silent] [--refresh-per-second REFRESH_PER_SECOND]
[--encoding-column ENCODING_COLUMN]
[--mimetype-column MIMETYPE_COLUMN] [--encoding ENCODING]
[--base-url BASE_URL] [-f {csv,jsonl,ndjson}]
[--plural-separator PLURAL_SEPARATOR] [--strain STRAIN]
[--plural-separator PLURAL_SEPARATOR] [--strain STRAIN] [-u]
[-i INPUT] [--explode EXPLODE] [-s SELECT] [--total TOTAL]
[-o OUTPUT]
scraper [path_or_path_column]
Expand Down Expand Up @@ -1184,6 +1187,9 @@ Optional Arguments:
--strain STRAIN Optional CSS selector used to strain, i.e. only
parse matched tags in the parsed html files in
order to optimize performance.
-u, --unordered Whether to allow the result to be written in an
arbitrary order dependent on the multiprocessing
scheduling. Can improve performance.
--url-column URL_COLUMN Name of the CSV column containing the url.
Defaults to `resolved_url`.
-s, --select SELECT Columns of -i/--input CSV file to include in the
Expand Down Expand Up @@ -1260,7 +1266,7 @@ Examples:
. Using a strainer to optimize performance:
$ minet scrape links-scraper.yml --strain "a" -i report.csv > links.csv

. Keeping some columns from input CSV file:
. Keeping only some columns from input CSV file:
$ minet scrape scraper.yml -i report.csv -s name,url > scraped.csv

. Using a builtin scraper:
Expand Down
5 changes: 5 additions & 0 deletions minet/cli/extract/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,10 @@ def resolve_arguments(cli_args):
"flag": "--encoding",
"help": "Name of the default encoding to use. If not given the command will infer it for you.",
},
{
"flags": ["-u", "--unordered"],
"help": "Whether to allow the result to be written in an arbitrary order dependent on the multiprocessing scheduling. Can improve performance.",
"action": "store_true",
},
],
)
7 changes: 5 additions & 2 deletions minet/cli/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,11 @@ def payloads() -> Iterator[ExtractWorkerPayload]:
warned_about_input_dir = False

with pool:
for result in pool.imap_unordered(
worker, payloads(), chunksize=cli_args.chunk_size
for result in pool.imap(
worker,
payloads(),
chunksize=cli_args.chunk_size,
unordered=cli_args.unordered,
):
with loading_bar.step():
assert isinstance(result, ExtractResult)
Expand Down
5 changes: 5 additions & 0 deletions minet/cli/scrape/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,10 @@ def resolve_arguments(cli_args):
"flag": "--strain",
"help": "Optional CSS selector used to strain, i.e. only parse matched tags in the parsed html files in order to optimize performance.",
},
{
"flags": ["-u", "--unordered"],
"help": "Whether to allow the result to be written in an arbitrary order dependent on the multiprocessing scheduling. Can improve performance.",
"action": "store_true",
},
],
)
7 changes: 5 additions & 2 deletions minet/cli/scrape/scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,11 @@ def payloads() -> Iterator[ScrapeWorkerPayload]:
warned_about_input_dir = False

with pool:
for result in pool.imap_unordered(
worker, payloads(), chunksize=cli_args.chunk_size
for result in pool.imap(
worker,
payloads(),
chunksize=cli_args.chunk_size,
unordered=cli_args.unordered,
):
with loading_bar.step():
original_item = worked_on.pop(result.id)
Expand Down
19 changes: 14 additions & 5 deletions minet/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,36 @@ def __init__(self, processes=None, initializer=None, initargs=None):

if self.actually_multiprocessed:
self.inner_pool = multiprocessing.Pool(
processes, initializer=initializer, initargs=initargs
processes, initializer=initializer, initargs=initargs or tuple()
)
else:
if initializer is not None:
initializer(*initargs)

def imap_unordered(self, worker, tasks, chunksize: int = 1):
def imap(self, worker, tasks, chunksize: int = 1, unordered: bool = False):
if self.actually_multiprocessed:
yield from self.inner_pool.imap_unordered(
WorkerWrapper(worker), tasks, chunksize=chunksize
)
assert self.inner_pool is not None

fn = self.inner_pool.imap_unordered if unordered else self.inner_pool.imap

yield from fn(WorkerWrapper(worker), tasks, chunksize=chunksize)
else:
for task in tasks:
yield worker(task)

def imap_unordered(self, worker, tasks, chunksize: int = 1):
return self.imap(worker, tasks, chunksize=chunksize, unordered=False)

def __enter__(self):
if self.actually_multiprocessed:
assert self.inner_pool is not None

self.inner_pool.__enter__()

return self

def __exit__(self, *args):
if self.actually_multiprocessed:
assert self.inner_pool is not None

self.inner_pool.__exit__(*args)

0 comments on commit afebd4c

Please sign in to comment.