Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.4.6 #62

Merged
merged 21 commits into from
Oct 3, 2024
Prev Previous commit
Next Next commit
max_threads
erlichsefi committed Sep 28, 2024
commit c97ec69b6fd0490c52a2545f70ccdbaaac740358
8 changes: 6 additions & 2 deletions il_supermarket_scarper/engines/apsx.py
Original file line number Diff line number Diff line change
@@ -7,8 +7,12 @@
class Aspx(WebBase, ABC):
"""class for aspx scapers"""

def __init__(self, chain, chain_id, url, aspx_page, folder_name=None):
super().__init__(chain, chain_id, url, folder_name=folder_name)
def __init__(
self, chain, chain_id, url, aspx_page, folder_name=None, max_threads=5
):
super().__init__(
chain, chain_id, url, folder_name=folder_name, max_threads=max_threads
)
self.aspx_page = aspx_page

def extract_task_from_entry(self, all_trs):
5 changes: 3 additions & 2 deletions il_supermarket_scarper/engines/cerberus.py
Original file line number Diff line number Diff line change
@@ -26,8 +26,9 @@ def __init__(
ftp_path="/",
ftp_username="",
ftp_password="",
max_threads=5,
):
super().__init__(chain, chain_id, folder_name)
super().__init__(chain, chain_id, folder_name, max_threads)
self.ftp_host = ftp_host
self.ftp_path = ftp_path
self.ftp_username = ftp_username
@@ -63,7 +64,7 @@ def scrape(
self.on_collected_details(files)

results = execute_in_parallel(
self.persist_from_ftp, files, max_workers=self.max_workers
self.persist_from_ftp, files, max_threads=self.max_threads
)
self.on_download_completed(results=results)
self.on_scrape_completed(self.get_storage_path())
4 changes: 2 additions & 2 deletions il_supermarket_scarper/engines/engine.py
Original file line number Diff line number Diff line change
@@ -21,15 +21,15 @@
class Engine(ScraperStatus, ABC):
"""base engine for scraping"""

def __init__(self, chain, chain_id, folder_name=None):
def __init__(self, chain, chain_id, folder_name=None, max_threads=10):
assert DumpFolderNames.is_valid_folder_name(
chain
), "chain name can contain only abc and -"

super().__init__(chain.value, "status", folder_name=folder_name)
self.chain = chain
self.chain_id = chain_id
self.max_workers = 20
self.max_threads = max_threads
self.storage_path = get_output_folder(self.chain.value, folder_name=folder_name)
Logger.info(f"Storage path: {self.storage_path}")

7 changes: 5 additions & 2 deletions il_supermarket_scarper/engines/multipage_web.py
Original file line number Diff line number Diff line change
@@ -31,8 +31,11 @@ def __init__(
total_page_xpath="""//*[@id="gridContainer"]/table/
tfoot/tr/td/a[6]/@href""",
total_pages_pattern=r"^\/\?page\=([0-9]{3})$",
max_threads=5,
):
super().__init__(chain, chain_id, url=url, folder_name=folder_name)
super().__init__(
chain, chain_id, url=url, folder_name=folder_name, max_threads=max_threads
)
self.total_page_xpath = total_page_xpath
self.total_pages_pattern = total_pages_pattern

@@ -91,7 +94,7 @@ def collect_files_details_from_site(
self.process_links_before_download,
pages_to_scrape,
aggregtion_function=multiple_page_aggregtion,
max_workers=self.max_workers,
max_threads=self.max_threads,
)
file_names, download_urls = self.apply_limit_zip(
file_names,
11 changes: 10 additions & 1 deletion il_supermarket_scarper/engines/publishprice.py
Original file line number Diff line number Diff line change
@@ -17,12 +17,21 @@ class PublishPrice(WebBase):
but this is not implemented.
"""

def __init__(self, chain, chain_id, site_infix, folder_name=None, domain="prices"):
def __init__(
self,
chain,
chain_id,
site_infix,
folder_name=None,
domain="prices",
max_threads=5,
):
super().__init__(
chain,
chain_id,
url=f"https://{domain}.{site_infix}.co.il/",
folder_name=folder_name,
max_threads=max_threads,
)
self.folder = None

6 changes: 3 additions & 3 deletions il_supermarket_scarper/engines/web.py
Original file line number Diff line number Diff line change
@@ -12,8 +12,8 @@
class WebBase(Engine):
"""scrape the file of websites that the only why to download them is via web"""

def __init__(self, chain, chain_id, url, folder_name=None):
super().__init__(chain, chain_id, folder_name)
def __init__(self, chain, chain_id, url, folder_name=None, max_threads=5):
super().__init__(chain, chain_id, folder_name, max_threads=max_threads)
self.url = url
self.max_retry = 2

@@ -132,7 +132,7 @@ def scrape(
results = execute_in_parallel(
self.save_and_extract,
zip(download_urls, file_names),
max_workers=self.max_workers,
max_threads=self.max_threads,
)
else:
results = []
2 changes: 0 additions & 2 deletions il_supermarket_scarper/utils/logger.py
Original file line number Diff line number Diff line change
@@ -42,14 +42,12 @@ def info(cls, msg, *args, **kwargs):
if cls.enabled:
cls.logger.info(msg, *args, **kwargs)


@classmethod
def debug(cls, msg, *args, **kwargs):
"""log info"""
if cls.enabled:
cls.logger.debug(msg, *args, **kwargs)


@classmethod
def error(cls, msg, *args, **kwargs):
"""log error"""
22 changes: 14 additions & 8 deletions il_supermarket_scarper/utils/loop.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import concurrent.futures
from il_supermarket_scarper.utils import Logger


def defualt_aggregtion_function(all_done):
"""format the scraping result to the final input"""
result = []
@@ -26,7 +27,7 @@ def multiple_page_aggregtion(pages_to_scrape):
def execute_in_parallel(
function_to_execute,
iterable,
max_workers=None,
max_threads=None,
aggregtion_function=defualt_aggregtion_function,
):
"""execute a job in the event loop"""
@@ -35,24 +36,29 @@ def execute_in_parallel(
results = run_tasks(
function_to_execute,
iterable,
max_workers=max_workers,
max_threads=max_threads,
)

all_done = aggregtion_function(results)
print(f"Done with {len(all_done)} tasks in parallel")
return all_done


def run_tasks(
function_to_execute,
iterable,
max_workers: int = None,
max_threads: int = None,
):
"""Run tasks in multi-thread or sequentially"""
if max_workers:
if max_threads:
# Use multi-thread
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers,thread_name_prefix="PullingThread") as executor:
with concurrent.futures.ThreadPoolExecutor(
max_threads=max_threads, thread_name_prefix="PullingThread"
) as executor:
futures = [executor.submit(function_to_execute, arg) for arg in iterable]
return [future.result() for future in concurrent.futures.as_completed(futures)]
return [
future.result() for future in concurrent.futures.as_completed(futures)
]
else:
# Or just iterate over all
return [function_to_execute(arg) for arg in iterable]
return [function_to_execute(arg) for arg in iterable]