Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scraper): add presets #105

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion npiai/core/browser/_playwright.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async def start(self):
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(
headless=self.headless,
args=["--disable-gpu", "--single-process"],
# args=["--disable-gpu", "--single-process"],
)

self.context = await self.browser.new_context(
Expand Down
8 changes: 5 additions & 3 deletions npiai/tools/web/page_analyzer/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,12 @@ async def support_infinite_scroll(
(items_selector) => {
let mutateElementsCount = 0;
const threshold = items_selector === '*' ? 10 : 3;
const targetSelector = `${items_selector}, ${items_selector} *`;

const npiScrollObserver = new MutationObserver((records) => {
for (const record of records) {
for (const node of record.addedNodes) {
if (node.nodeType === Node.ELEMENT_NODE && node.matches(items_selector)) {
if (node.nodeType === Node.ELEMENT_NODE && node.matches(targetSelector)) {
mutateElementsCount++;
}
}
Expand Down Expand Up @@ -350,8 +351,9 @@ async def get_pagination_button(self, ctx: Context, url: str) -> str | None:
Follow the instructions to determine whether there is a pagination button on the current page for navigating to the next page:
1. Examine the screenshots, the URL, and the title of the page to understand the context, and then think about what the current page is.
2. Go through the elements array, pay attention to the `role`, `accessibleName`, and `accessibleDescription` properties to grab semantic information of the elements.
3. Check if there is a pagination button on the page. Typically, a pagination button is a button or a link that allows users to navigate to the next page. It usually contains text like "Next", "More", or "Load More".
4. If and only if you are confident that you have found a pagination button, call the tool with the ID of the element to retrieve the CSS selector. If you are not sure, or there is no pagination button, call the tool with -1. **Do not make any assumptions**.
3. Check if there is a pagination button on the page. Typically, a pagination button is a button or a link that allows users to navigate to the next page. It usually contains text like "Next" or "Load More".
4. Buttons that expand the content on the same page are not considered as pagination buttons. Only consider the buttons that navigate to the next page.
5. If and only if you are confident that you have found a pagination button, call the tool with the ID of the element to retrieve the CSS selector. If you are not sure, or there is no pagination button, call the tool with -1. **Do not make any assumptions**.
"""
),
),
Expand Down
166 changes: 83 additions & 83 deletions npiai/tools/web/scraper/app.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,40 @@
import asyncio
import csv
import hashlib
import json
import re
import os
import asyncio
import hashlib
import re
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, AsyncGenerator, Literal, Any, Iterable, Set
from typing_extensions import TypedDict, Annotated
from textwrap import dedent
from playwright.async_api import TimeoutError
from typing import List, AsyncGenerator, Any, Iterable, Set
from markdownify import MarkdownConverter

from litellm.types.completion import (
ChatCompletionSystemMessageParam,
ChatCompletionUserMessageParam,
)
from playwright.async_api import TimeoutError

from npiai import function, BrowserTool, Context
from npiai.core import NavigatorAgent
from npiai.utils import is_cloud_env, llm_tool_call, html_to_markdown

from npiai.utils import (
is_cloud_env,
llm_tool_call,
CompactMarkdownConverter,
)
from .prompts import (
MULTI_COLUMN_INFERENCE_PROMPT,
MULTI_COLUMN_SCRAPING_PROMPT,
SINGLE_COLUMN_INFERENCE_PROMPT,
SINGLE_COLUMN_SCRAPING_PROMPT,
)

ScrapingType = Literal["single", "list-like"]


class Column(TypedDict):
name: Annotated[str, "Name of the column"]
type: Annotated[Literal["text", "link", "image"], "Type of the column"]
description: Annotated[str, "A brief description of the column"]
prompt: Annotated[
str | None, "A step-by-step prompt on how to extract the column data"
]


class SummaryItem(TypedDict):
hash: str
values: Dict[str, str]


class SummaryChunk(TypedDict):
batch_id: int
matched_hashes: List[str]
items: List[SummaryItem]


@dataclass
class ParsedResult:
markdown: str
hashes: List[str]
matched_hashes: List[str]

from .types import (
Column,
ScrapingType,
SummaryItem,
SummaryChunk,
ConversionResult,
)

__ID_COLUMN__ = Column(
name="[[@item_id]]",
Expand All @@ -76,6 +55,8 @@ class Scraper(BrowserTool):
"""
)

markdown_converter: MarkdownConverter = CompactMarkdownConverter()

# The maximum number of items to summarize in a single batch
_batch_size: int

Expand Down Expand Up @@ -152,6 +133,7 @@ async def summarize_stream(
batch_index = 0

results_queue: asyncio.Queue[SummaryChunk] = asyncio.Queue()
lock = asyncio.Lock()

skip_item_hashes_set = set(skip_item_hashes) if skip_item_hashes else None

Expand All @@ -161,16 +143,19 @@ async def run_batch():
if limit != -1 and remaining <= 0:
return

current_index = batch_index
batch_index += 1
async with lock:
current_index = batch_index
batch_index += 1

# calculate the number of items to summarize in the current batch
requested_count = min(self._batch_size, remaining) if limit != -1 else -1
# reduce the remaining count by the number of items in the current batch
# so that the other tasks will not exceed the limit
remaining -= requested_count
# calculate the number of items to summarize in the current batch
requested_count = (
min(self._batch_size, remaining) if limit != -1 else -1
)
# reduce the remaining count by the number of items in the current batch
# so that the other tasks will not exceed the limit
remaining -= requested_count

parsed_result = await self._parse(
parsed_result = await self._convert(
ancestor_selector=ancestor_selector,
items_selector=items_selector,
limit=requested_count,
Expand Down Expand Up @@ -198,12 +183,13 @@ async def run_batch():
await ctx.send_debug_message(f"[{self.name}] No items summarized")
return

items_slice = items[:requested_count] if limit != -1 else items
summarized_count = len(items_slice)
count += summarized_count
# correct the remaining count in case summary returned fewer items than requested
if summarized_count < requested_count:
remaining += requested_count - summarized_count
async with lock:
items_slice = items[:requested_count] if limit != -1 else items
summarized_count = len(items_slice)
count += summarized_count
# recalculate the remaining count in case summary returned fewer items than requested
if summarized_count < requested_count:
remaining += requested_count - summarized_count

await results_queue.put(
{
Expand Down Expand Up @@ -343,7 +329,7 @@ async def infer_columns(
if not ancestor_selector:
ancestor_selector = "body"

parsed_result = await self._parse(
parsed_result = await self._convert(
ancestor_selector=ancestor_selector,
items_selector=items_selector,
limit=10,
Expand Down Expand Up @@ -390,28 +376,30 @@ def callback(columns: List[Column]):

return callback(**res.model_dump())

async def _parse(
async def _convert(
self,
ancestor_selector: str,
items_selector: str | None,
limit: int = -1,
skip_item_hashes: Set[str] | None = None,
) -> ParsedResult | None | None:
) -> ConversionResult | None | None:
async with self._webpage_access_lock:
# convert relative links to absolute links
await self._process_relative_links()

if items_selector is None:
return await self._parse_ancestor(ancestor_selector, skip_item_hashes)
return await self._convert_ancestor(ancestor_selector, skip_item_hashes)
else:
return await self._parse_items(items_selector, limit, skip_item_hashes)
return await self._convert_items(
items_selector, limit, skip_item_hashes
)

async def _parse_items(
async def _convert_items(
self,
items_selector: str,
limit: int = -1,
skip_item_hashes: List[str] | None = None,
) -> ParsedResult | None | None:
) -> ConversionResult | None | None:
"""
Get the markdown content of the items to summarize

Expand All @@ -434,7 +422,7 @@ async def _parse_items(
try:
await locator.first.wait_for(
state="attached",
timeout=30_000,
timeout=3_000,
)
except TimeoutError:
return None
Expand All @@ -444,22 +432,37 @@ async def _parse_items(
matched_hashes = []
count = 0

marking_tasks = []

# use element handles here to snapshot the items
for elem in await locator.element_handles():
html = await elem.evaluate("elem => elem.outerHTML")
markdown, md5 = self._html_to_md_and_hash(html)

# mark the item as visited
marking_tasks.append(
asyncio.create_task(
elem.evaluate(
"elem => elem.setAttribute('data-npi-visited', 'true')"
)
)
html = await elem.evaluate(
"""
async (elem) => {
elem.scrollIntoView();
elem.setAttribute('data-npi-visited', 'true');

const contentLength = elem.textContent?.replace(/\\s/g, '').length || 0;

if (contentLength > 10) {
return elem.outerHTML;
}

// in case the page uses lazy loading,
// wait for the content to be loaded

return new Promise((resolve) => {
setTimeout(() => {
resolve(elem.outerHTML);
}, 300);
});
}
"""
)

if not html:
continue

markdown, md5 = self._html_to_md_and_hash(html)

if skip_item_hashes and md5 in skip_item_hashes:
matched_hashes.append(md5)
continue
Expand All @@ -474,19 +477,17 @@ async def _parse_items(
if not count:
return None

await asyncio.gather(*marking_tasks)

return ParsedResult(
return ConversionResult(
markdown="\n".join(sections),
hashes=hashes,
matched_hashes=matched_hashes,
)

async def _parse_ancestor(
async def _convert_ancestor(
self,
ancestor_selector: str,
skip_item_hashes: Set[str] | None = None,
) -> ParsedResult | None | None:
) -> ConversionResult | None | None:
"""
Get the markdown content of the ancestor element

Expand Down Expand Up @@ -551,22 +552,21 @@ async def _parse_ancestor(
if not count:
return None

return ParsedResult(
return ConversionResult(
markdown="\n".join(sections),
hashes=hashes,
matched_hashes=matched_hashes,
)

@staticmethod
def _html_to_md_and_hash(html):
markdown = html_to_markdown(html)
def _html_to_md_and_hash(self, html: str):
markdown = re.sub(r"\n+", "\n", self.markdown_converter.convert(html)).strip()
md5 = hashlib.md5(markdown.encode()).hexdigest()
return markdown, md5

async def _llm_summarize(
self,
ctx: Context,
parsed_result: ParsedResult,
parsed_result: ConversionResult,
output_columns: List[Column],
scraping_type: ScrapingType,
) -> List[SummaryItem]:
Expand Down
Empty file.
1 change: 1 addition & 0 deletions npiai/tools/web/scraper/presets/linkedin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .app import LinkedinScraper
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio
import json
import time

from npiai.tools.web.scraper.presets.linkedin import LinkedinScraper
from npiai.utils.test_utils import DebugContext
from npiai import Context


async def main():
async with LinkedinScraper(headless=False, batch_size=5) as scraper:
stream = scraper.scrape_posts_stream(
ctx=DebugContext(),
url="https://www.linkedin.com/in/jerry-liu-64390071/recent-activity/all/",
limit=100,
concurrency=10,
)

start = time.monotonic()
count = 0

async for chunk in stream:
count += len(chunk["items"])
print("Chunk:", json.dumps(chunk, indent=2))

end = time.monotonic()
print(f"Summarized {count} items in {end - start:.2f} seconds")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading