Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
e4d3f80
try threads
honzajavorek Jan 29, 2025
f6c197f
use thread pool
honzajavorek Jan 30, 2025
b771eba
threads cannot be reused, the asyncio loop would be closed
honzajavorek Jan 30, 2025
b5e28b6
make sure to have a new loop
honzajavorek Jan 30, 2025
9120952
debug
honzajavorek Jan 30, 2025
3cd8a49
cast magic
honzajavorek Jan 30, 2025
1739af2
cast more magic
honzajavorek Jan 30, 2025
0e8d848
cast async magic
honzajavorek Jan 30, 2025
e87a344
debug
honzajavorek Jan 30, 2025
63c5814
re-raise
honzajavorek Jan 30, 2025
0ff21d4
perform sorcery
honzajavorek Jan 30, 2025
fa0a152
expire
honzajavorek Jan 30, 2025
07beb2c
debug
honzajavorek Jan 30, 2025
905d429
aggressively close connections before doing anything
honzajavorek Jan 30, 2025
92ab6d9
use Twisted reactor to run asyncio
honzajavorek Feb 5, 2025
ca39323
nicer
honzajavorek Feb 5, 2025
d51ace5
make it work, somehow
honzajavorek Feb 5, 2025
ea69f52
use apify from branch
honzajavorek Feb 5, 2025
21120c8
build Plucker on top of current Apify WIP
honzajavorek Feb 5, 2025
278d0bd
try stuff
honzajavorek Feb 5, 2025
f51e92b
update apify code
honzajavorek Feb 5, 2025
016e738
debug
honzajavorek Feb 5, 2025
e361699
this doesn't help
honzajavorek Feb 5, 2025
c225198
read input
honzajavorek Feb 5, 2025
c8f0ef5
implement debug and cache storage
honzajavorek Feb 5, 2025
21974b7
implement cache expiration
honzajavorek Feb 5, 2025
0818a3c
cleanup
honzajavorek Feb 5, 2025
4fa331e
try this
honzajavorek Feb 5, 2025
aa89996
give up on clearing the storages for now
honzajavorek Feb 5, 2025
e35c9f0
remove any artificial limitations
honzajavorek Feb 6, 2025
b723244
longer expiration
honzajavorek Feb 6, 2025
adadd29
remove custom settings
honzajavorek Feb 6, 2025
5c0dbce
remove keys from cache when expired
honzajavorek Feb 6, 2025
1b452f5
implement running spiders just under scrapy
honzajavorek Feb 7, 2025
bee8728
debug never ending looping of the request queue
honzajavorek Feb 7, 2025
6f17aa8
this is better
honzajavorek Feb 7, 2025
e2a74ba
fix parsing error
honzajavorek Feb 7, 2025
90891f0
Merge branch 'main' into honzajavorek/threads
honzajavorek Feb 13, 2025
bbcb50a
test specimen
honzajavorek Feb 7, 2025
81d272a
logging
honzajavorek Feb 7, 2025
2865ed8
update deps
honzajavorek Feb 7, 2025
6380e08
rename track_id to trk
honzajavorek Feb 7, 2025
999d3e7
logging
honzajavorek Feb 7, 2025
ab4939e
imports
honzajavorek Feb 7, 2025
a1354b1
fix types
honzajavorek Feb 7, 2025
94b94b2
format
honzajavorek Feb 7, 2025
f4fa0bd
change how pagination works, fix changes after SDK PR got merged
honzajavorek Feb 13, 2025
7490418
update deps
honzajavorek Feb 13, 2025
36b3912
format code
honzajavorek Feb 13, 2025
77ead8a
clean up logging
honzajavorek Feb 13, 2025
d493099
improve cache
honzajavorek Feb 13, 2025
bc45196
final fixes
honzajavorek Feb 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions jg/plucker/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import gzip
import io
import logging
import pickle
import struct
from time import time

from apify import Configuration
from apify.apify_storage_client import ApifyStorageClient
from apify.scrapy._async_thread import AsyncThread
from apify.storages import KeyValueStore
from scrapy import Request, Spider
from scrapy.http.headers import Headers
from scrapy.http.response import Response
from scrapy.responsetypes import responsetypes
from scrapy.settings import BaseSettings
from scrapy.utils.request import RequestFingerprinterProtocol


logger = logging.getLogger("jg.plucker.cache")


class CacheStorage:
def __init__(self, settings: BaseSettings):
self.expiration_max_items = 100
self.expiration_secs: int = settings.getint("HTTPCACHE_EXPIRATION_SECS")
self.spider: Spider | None = None
self._kv: KeyValueStore | None = None
self._fingerprinter: RequestFingerprinterProtocol | None = None
self._async_thread: AsyncThread | None = None

def open_spider(self, spider: Spider) -> None:
logger.debug("Using Apify key value cache storage", extra={"spider": spider})
self.spider = spider
self._fingerprinter = spider.crawler.request_fingerprinter
kv_name = f"httpcache-{spider.name}"

async def open_kv() -> KeyValueStore:
config = Configuration.get_global_configuration()
if config.is_at_home:
storage_client = ApifyStorageClient.from_config(config)
return await KeyValueStore.open(
name=kv_name, storage_client=storage_client
)
return await KeyValueStore.open(name=kv_name)

logger.debug("Starting background thread for cache storage's event loop")
self._async_thread = AsyncThread()
logger.debug(f"Opening cache storage's {kv_name!r} key value store")
self._kv = self._async_thread.run_coro(open_kv())

def close_spider(self, spider: Spider, current_time: int | None = None) -> None:
assert self._async_thread is not None, "Async thread not initialized"

logger.info(f"Cleaning up cache items (max {self.expiration_max_items})")
if 0 < self.expiration_secs:
if current_time is None:
current_time = int(time())

async def expire_kv() -> None:
assert self._kv is not None, "Key value store not initialized"
i = 0
async for item in self._kv.iterate_keys():
value = await self._kv.get_value(item.key)
try:
gzip_time = read_gzip_time(value)
except Exception as e:
logger.warning(f"Malformed cache item {item.key}: {e}")
await self._kv.set_value(item.key, None)
else:
if self.expiration_secs < current_time - gzip_time:
logger.debug(f"Expired cache item {item.key}")
await self._kv.set_value(item.key, None)
else:
logger.debug(f"Valid cache item {item.key}")
if i == self.expiration_max_items:
break
i += 1

self._async_thread.run_coro(expire_kv())

logger.debug("Closing cache storage")
try:
self._async_thread.close()
except KeyboardInterrupt:
logger.warning("Shutdown interrupted by KeyboardInterrupt!")
except Exception:
logger.exception("Exception occurred while shutting down cache storage")
finally:
logger.debug("Cache storage closed")

def retrieve_response(
self, spider: Spider, request: Request, current_time: int | None = None
) -> Response | None:
assert self._async_thread is not None, "Async thread not initialized"
assert self._kv is not None, "Key value store not initialized"
assert self._fingerprinter is not None, "Request fingerprinter not initialized"

key = self._fingerprinter.fingerprint(request).hex()
value = self._async_thread.run_coro(self._kv.get_value(key))

if value is None:
logger.debug("Cache miss", extra={"request": request})
return None

if current_time is None:
current_time = int(time())
if 0 < self.expiration_secs < current_time - read_gzip_time(value):
logger.debug("Cache expired", extra={"request": request})
return None

data = from_gzip(value)
url = data["url"]
status = data["status"]
headers = Headers(data["headers"])
body = data["body"]
respcls = responsetypes.from_args(headers=headers, url=url, body=body)

logger.debug("Cache hit", extra={"request": request})
return respcls(url=url, headers=headers, status=status, body=body)

def store_response(
self, spider: Spider, request: Request, response: Response
) -> None:
assert self._async_thread is not None, "Async thread not initialized"
assert self._kv is not None, "Key value store not initialized"
assert self._fingerprinter is not None, "Request fingerprinter not initialized"

key = self._fingerprinter.fingerprint(request).hex()
data = {
"status": response.status,
"url": response.url,
"headers": dict(response.headers),
"body": response.body,
}
value = to_gzip(data)
self._async_thread.run_coro(self._kv.set_value(key, value))


def to_gzip(data: dict, mtime: int | None = None) -> bytes:
with io.BytesIO() as byte_stream:
with gzip.GzipFile(fileobj=byte_stream, mode="wb", mtime=mtime) as gzip_file:
pickle.dump(data, gzip_file, protocol=4)
return byte_stream.getvalue()


def from_gzip(gzip_bytes: bytes) -> dict:
with io.BytesIO(gzip_bytes) as byte_stream:
with gzip.GzipFile(fileobj=byte_stream, mode="rb") as gzip_file:
return pickle.load(gzip_file)


def read_gzip_time(gzip_bytes: bytes) -> int:
header = gzip_bytes[:10]
header_components = struct.unpack("<HBBI2B", header)
return header_components[3]
33 changes: 14 additions & 19 deletions jg/plucker/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import importlib
import json
import logging
Expand All @@ -8,30 +7,21 @@
from pathlib import Path
from typing import IO, Callable, Generator, Type

import click
from apify.scrapy.logging_config import setup_logging
from apify_client import ApifyClient
from apify_shared.consts import ActorJobStatus, ActorSourceType
from pydantic import BaseModel
from scrapy.utils.project import get_project_settings

from jg.plucker.loggers import configure_logging


settings = get_project_settings()
configure_logging(settings, sys.argv)


# ruff: noqa: E402
import click
from scrapy import Item

from jg.plucker.scrapers import (
StatsError,
configure_async,
generate_schema,
get_spider_module_name,
iter_actor_paths,
run_actor,
run_spider,
run_as_actor,
run_as_spider,
start_reactor,
)


Expand All @@ -50,7 +40,12 @@ def __str__(self) -> str:
@click.group()
@click.option("-d", "--debug", default=False, is_flag=True)
def main(debug: bool = False):
pass # --debug is processed in configure_logging()
setup_logging()
level = logging.DEBUG if debug else logging.INFO
logging.getLogger().setLevel(level)
logger.setLevel(level)
for name in ["asyncio", "filelock", "crawlee"]:
logging.getLogger(name).setLevel(logging.WARNING)


@main.command(context_settings={"ignore_unknown_options": True})
Expand Down Expand Up @@ -85,7 +80,6 @@ def crawl(
logger.info("Reading spider params from stdin")
spider_params = json.load(spider_params_f)

configure_async()
try:
if apify:
logger.info(f"Crawling as Apify actor {actor_path}")
Expand All @@ -94,10 +88,11 @@ def crawl(
raise click.BadParameter(
f"Actor {actor_path} not found! Valid actors: {actors}"
)
asyncio.run(run_actor(settings, spider_class, spider_params))
run = run_as_actor(spider_class, spider_params)
else:
logger.info(f"Crawling as Scrapy spider {spider_name!r}")
run_spider(settings, spider_class, spider_params)
run = run_as_spider(spider_class, spider_params)
start_reactor(run)
except StatsError as e:
logger.error(e)
raise click.Abort()
Expand Down
Loading