Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions python/src/server/services/crawling/strategies/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
Handles batch crawling of multiple URLs in parallel.
"""

import asyncio
from typing import List, Dict, Any, Optional, Callable

from crawl4ai import CrawlerRunConfig, CacheMode, MemoryAdaptiveDispatcher
Expand Down Expand Up @@ -70,10 +69,12 @@ async def crawl_batch_with_progress(
except (ValueError, KeyError, TypeError) as e:
# Critical configuration errors should fail fast in alpha
logger.error(f"Invalid crawl settings format: {e}", exc_info=True)
raise ValueError(f"Failed to load crawler configuration: {e}")
raise ValueError(f"Failed to load crawler configuration: {e}") from e
except Exception as e:
# For non-critical errors (e.g., network issues), use defaults but log prominently
logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True)
logger.error(
f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True
)
batch_size = 50
if max_concurrent is None:
max_concurrent = 10 # Safe default to prevent memory issues
Expand All @@ -91,7 +92,6 @@ async def crawl_batch_with_progress(
cache_mode=CacheMode.BYPASS,
stream=True, # Enable streaming for faster parallel processing
markdown_generator=self.markdown_generator,
wait_for="body", # Simple selector for batch
wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"),
page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "30000")),
delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "1.0")),
Expand Down Expand Up @@ -119,10 +119,11 @@ async def crawl_batch_with_progress(
max_session_permit=max_concurrent,
)

async def report_progress(percentage: int, message: str):
async def report_progress(percentage: int, message: str, **kwargs):
"""Helper to report progress if callback is available"""
if progress_callback:
await progress_callback("crawling", percentage, message)
step_info = {"currentStep": message, "stepMessage": message, **kwargs}
await progress_callback("crawling", percentage, message, step_info=step_info)

Comment on lines +122 to 127
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Progress callback contract differs from recursive strategy. Align on step_info kw param.

Here you pass step_info as a single kwarg. In recursive.py, step_info’s contents are expanded as top-level kwargs, which can cause runtime TypeError if the callback doesn’t accept those names. Let’s standardize on step_info=... in both places.

I’ve proposed the corresponding fix in recursive.py. No change needed here; this comment flags the cross-file inconsistency.

🤖 Prompt for AI Agents
In python/src/server/services/crawling/strategies/batch.py around lines 122 to
127, ensure the progress callback is called with a single step_info kwarg (as
currently implemented: step_info = {...} and await progress_callback("crawling",
percentage, message, step_info=step_info)); leave this implementation unchanged
and confirm the recursive strategy is updated to pass step_info as a single
kwarg as well so both strategies match the same callback contract.

total_urls = len(urls)
await report_progress(start_progress, f"Starting to crawl {total_urls} URLs...")
Expand Down Expand Up @@ -162,7 +163,6 @@ async def report_progress(percentage: int, message: str):
)

# Handle streaming results
j = 0
async for result in batch_results:
processed += 1
if result.success and result.markdown:
Expand Down Expand Up @@ -190,7 +190,6 @@ async def report_progress(percentage: int, message: str):
progress_percentage,
f"Crawled {processed}/{total_urls} pages ({len(successful_results)} successful)",
)
j += 1

await report_progress(
end_progress,
Expand Down
185 changes: 108 additions & 77 deletions python/src/server/services/crawling/strategies/recursive.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

Handles recursive crawling of websites by following internal links.
"""
import asyncio

from typing import List, Dict, Any, Optional, Callable
from urllib.parse import urldefrag

Expand All @@ -17,19 +17,19 @@

class RecursiveCrawlStrategy:
"""Strategy for recursive crawling of websites."""

def __init__(self, crawler, markdown_generator):
"""
Initialize recursive crawl strategy.

Args:
crawler (AsyncWebCrawler): The Crawl4AI crawler instance for web crawling operations
markdown_generator (DefaultMarkdownGenerator): The markdown generator instance for converting HTML to markdown
"""
self.crawler = crawler
self.markdown_generator = markdown_generator
self.url_handler = URLHandler()

async def crawl_recursive_with_progress(
self,
start_urls: List[str],
Expand All @@ -39,11 +39,11 @@ async def crawl_recursive_with_progress(
max_concurrent: int = None,
progress_callback: Optional[Callable] = None,
start_progress: int = 10,
end_progress: int = 60
end_progress: int = 60,
) -> List[Dict[str, Any]]:
"""
Recursively crawl internal links from start URLs up to a maximum depth with progress reporting.

Args:
start_urls: List of starting URLs
transform_url_func: Function to transform URLs (e.g., GitHub URLs)
Expand All @@ -53,16 +53,16 @@ async def crawl_recursive_with_progress(
progress_callback: Optional callback for progress updates
start_progress: Starting progress percentage
end_progress: Ending progress percentage

Returns:
List of crawl results
"""
if not self.crawler:
logger.error("No crawler instance available for recursive crawling")
if progress_callback:
await progress_callback('error', 0, 'Crawler not available')
await progress_callback("error", 0, "Crawler not available")
return []

Comment on lines 60 to +65
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fail fast when crawler is unavailable.

Mirror batch strategy: treat this as a hard error instead of returning []. Keep the progress notification, then raise.

         if not self.crawler:
             logger.error("No crawler instance available for recursive crawling")
             if progress_callback:
                 await progress_callback("error", 0, "Crawler not available")
-            return []
+            raise RuntimeError("RecursiveCrawlStrategy: crawler instance is not initialized")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not self.crawler:
logger.error("No crawler instance available for recursive crawling")
if progress_callback:
await progress_callback('error', 0, 'Crawler not available')
await progress_callback("error", 0, "Crawler not available")
return []
if not self.crawler:
logger.error("No crawler instance available for recursive crawling")
if progress_callback:
await progress_callback("error", 0, "Crawler not available")
raise RuntimeError("RecursiveCrawlStrategy: crawler instance is not initialized")
🤖 Prompt for AI Agents
In python/src/server/services/crawling/strategies/recursive.py around lines 60
to 65, the code currently logs and returns an empty list when self.crawler is
missing; instead, mirror the batch strategy by treating this as a hard error:
keep the existing logger.error and await the progress_callback("error", 0,
"Crawler not available") if provided, then raise an appropriate exception (e.g.,
RuntimeError or a CrawlerUnavailableError) to fail fast rather than returning
[].

# Load settings from database - fail fast on configuration errors
try:
settings = await credential_service.get_credentials_by_category("rag_strategy")
Expand All @@ -74,35 +74,38 @@ async def crawl_recursive_with_progress(
except (ValueError, KeyError, TypeError) as e:
# Critical configuration errors should fail fast in alpha
logger.error(f"Invalid crawl settings format: {e}", exc_info=True)
raise ValueError(f"Failed to load crawler configuration: {e}")
raise ValueError(f"Failed to load crawler configuration: {e}") from e
except Exception as e:
# For non-critical errors (e.g., network issues), use defaults but log prominently
logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True)
logger.error(
f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True
)
batch_size = 50
if max_concurrent is None:
max_concurrent = 10 # Safe default to prevent memory issues
memory_threshold = 80.0
check_interval = 0.5
settings = {} # Empty dict for defaults

# Check if start URLs include documentation sites
has_doc_sites = any(is_documentation_site_func(url) for url in start_urls)

if has_doc_sites:
logger.info("Detected documentation sites for recursive crawl, using enhanced configuration")
logger.info(
"Detected documentation sites for recursive crawl, using enhanced configuration"
)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
stream=True, # Enable streaming for faster parallel processing
markdown_generator=self.markdown_generator,
wait_for='body',
wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"),
page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "30000")),
delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "1.0")),
wait_for_images=False, # Skip images for faster crawling
scan_full_page=True, # Trigger lazy loading
exclude_all_images=False,
remove_overlay_elements=True,
process_iframes=True
process_iframes=True,
)
else:
# Configuration for regular recursive crawling
Expand All @@ -113,113 +116,141 @@ async def crawl_recursive_with_progress(
wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"),
page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "45000")),
delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "0.5")),
scan_full_page=True
scan_full_page=True,
)

dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=memory_threshold,
check_interval=check_interval,
max_session_permit=max_concurrent
max_session_permit=max_concurrent,
)

async def report_progress(percentage: int, message: str, **kwargs):
"""Helper to report progress if callback is available"""
if progress_callback:
# Add step information for multi-progress tracking
step_info = {
'currentStep': message,
'stepMessage': message,
**kwargs
}
await progress_callback('crawling', percentage, message, **step_info)

step_info = {"currentStep": message, "stepMessage": message, **kwargs}
await progress_callback("crawling", percentage, message, **step_info)

Comment on lines +132 to +134
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: progress_callback invoked with expanded kwargs; use step_info=... instead.

Expanding step_info as top-level kwargs risks TypeError in callbacks not expecting those names and is inconsistent with batch.py.

Apply this diff:

-                step_info = {"currentStep": message, "stepMessage": message, **kwargs}
-                await progress_callback("crawling", percentage, message, **step_info)
+                step_info = {"currentStep": message, "stepMessage": message, **kwargs}
+                await progress_callback("crawling", percentage, message, step_info=step_info)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
step_info = {"currentStep": message, "stepMessage": message, **kwargs}
await progress_callback("crawling", percentage, message, **step_info)
step_info = {"currentStep": message, "stepMessage": message, **kwargs}
await progress_callback("crawling", percentage, message, step_info=step_info)
🤖 Prompt for AI Agents
In python/src/server/services/crawling/strategies/recursive.py around lines 132
to 134, the code expands step_info into top-level kwargs when calling
progress_callback which can cause TypeError and is inconsistent with batch.py;
instead remove the **step_info/**kwargs expansion and pass the dict as a single
named argument (e.g., step_info=step_info) so the callback receives the info
consistently; update the await call to await progress_callback("crawling",
percentage, message, step_info=step_info) and ensure any leftover **kwargs are
not expanded into the call.

visited = set()

def normalize_url(url):
return urldefrag(url)[0]

current_urls = set([normalize_url(u) for u in start_urls])
results_all = []
total_processed = 0

for depth in range(max_depth):
urls_to_crawl = [normalize_url(url) for url in current_urls if normalize_url(url) not in visited]
urls_to_crawl = [
normalize_url(url) for url in current_urls if normalize_url(url) not in visited
]
if not urls_to_crawl:
break

# Calculate progress for this depth level
depth_start = start_progress + int((depth / max_depth) * (end_progress - start_progress) * 0.8)
depth_end = start_progress + int(((depth + 1) / max_depth) * (end_progress - start_progress) * 0.8)

await report_progress(depth_start, f'Crawling depth {depth + 1}/{max_depth}: {len(urls_to_crawl)} URLs to process')

depth_start = start_progress + int(
(depth / max_depth) * (end_progress - start_progress) * 0.8
)
depth_end = start_progress + int(
((depth + 1) / max_depth) * (end_progress - start_progress) * 0.8
)

await report_progress(
depth_start,
f"Crawling depth {depth + 1}/{max_depth}: {len(urls_to_crawl)} URLs to process",
)

# Use configured batch size for recursive crawling
next_level_urls = set()
depth_successful = 0

for batch_idx in range(0, len(urls_to_crawl), batch_size):
batch_urls = urls_to_crawl[batch_idx:batch_idx + batch_size]
batch_urls = urls_to_crawl[batch_idx : batch_idx + batch_size]
batch_end_idx = min(batch_idx + batch_size, len(urls_to_crawl))
Comment thread
coderabbitai[bot] marked this conversation as resolved.


# Transform URLs and create mapping for this batch
url_mapping = {}
transformed_batch_urls = []
for url in batch_urls:
transformed = transform_url_func(url)
transformed_batch_urls.append(transformed)
url_mapping[transformed] = url

# Calculate progress for this batch within the depth
batch_progress = depth_start + int((batch_idx / len(urls_to_crawl)) * (depth_end - depth_start))
await report_progress(batch_progress,
f'Depth {depth + 1}: crawling URLs {batch_idx + 1}-{batch_end_idx} of {len(urls_to_crawl)}',
totalPages=total_processed + batch_idx,
processedPages=len(results_all))

batch_progress = depth_start + int(
(batch_idx / len(urls_to_crawl)) * (depth_end - depth_start)
)
await report_progress(
batch_progress,
f"Depth {depth + 1}: crawling URLs {batch_idx + 1}-{batch_end_idx} of {len(urls_to_crawl)}",
totalPages=total_processed + batch_idx,
processedPages=len(results_all),
)

# Use arun_many for native parallel crawling with streaming
logger.info(f"Starting parallel crawl of {len(batch_urls)} URLs with arun_many")
batch_results = await self.crawler.arun_many(urls=batch_urls, config=run_config, dispatcher=dispatcher)

batch_results = await self.crawler.arun_many(
urls=transformed_batch_urls, config=run_config, dispatcher=dispatcher
)

# Handle streaming results from arun_many
i = 0
async for result in batch_results:
# Map back to original URL if transformed
original_url = result.url
for orig_url in batch_urls:
if transform_url_func(orig_url) == result.url:
original_url = orig_url
break

# Map back to original URL using the mapping dict
original_url = url_mapping.get(result.url, result.url)

norm_url = normalize_url(original_url)
visited.add(norm_url)
total_processed += 1

if result.success and result.markdown:
results_all.append({
'url': original_url,
'markdown': result.markdown,
'html': result.html # Always use raw HTML for code extraction
"url": original_url,
"markdown": result.markdown,
"html": result.html, # Always use raw HTML for code extraction
})
depth_successful += 1

# Find internal links for next depth
for link in result.links.get("internal", []):
links = getattr(result, "links", {}) or {}
for link in links.get("internal", []):
next_url = normalize_url(link["href"])
# Skip binary files and already visited URLs
if next_url not in visited and not self.url_handler.is_binary_file(next_url):
is_binary = self.url_handler.is_binary_file(next_url)
if next_url not in visited and not is_binary:
next_level_urls.add(next_url)
elif self.url_handler.is_binary_file(next_url):
elif is_binary:
logger.debug(f"Skipping binary file from crawl queue: {next_url}")
else:
logger.warning(f"Failed to crawl {original_url}: {getattr(result, 'error_message', 'Unknown error')}")

logger.warning(
f"Failed to crawl {original_url}: {getattr(result, 'error_message', 'Unknown error')}"
)

# Report progress every few URLs
current_idx = batch_idx + i + 1
if current_idx % 5 == 0 or current_idx == len(urls_to_crawl):
current_progress = depth_start + int((current_idx / len(urls_to_crawl)) * (depth_end - depth_start))
await report_progress(current_progress,
f'Depth {depth + 1}: processed {current_idx}/{len(urls_to_crawl)} URLs ({depth_successful} successful)',
totalPages=total_processed,
processedPages=len(results_all))
current_progress = depth_start + int(
(current_idx / len(urls_to_crawl)) * (depth_end - depth_start)
)
await report_progress(
current_progress,
f"Depth {depth + 1}: processed {current_idx}/{len(urls_to_crawl)} URLs ({depth_successful} successful)",
totalPages=total_processed,
processedPages=len(results_all),
)
Comment on lines +233 to +241
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Same progress callback bug here; align to step_info=...

Apply the same fix to avoid expanding unknown kwargs.

-                        await report_progress(
-                            current_progress,
-                            f"Depth {depth + 1}: processed {current_idx}/{len(urls_to_crawl)} URLs ({depth_successful} successful)",
-                            totalPages=total_processed,
-                            processedPages=len(results_all),
-                        )
+                        await report_progress(
+                            current_progress,
+                            f"Depth {depth + 1}: processed {current_idx}/{len(urls_to_crawl)} URLs ({depth_successful} successful)",
+                            totalPages=total_processed,
+                            processedPages=len(results_all),
+                        )

Note: The call site stays the same; the earlier change in report_progress will ensure these land under step_info.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In python/src/server/services/crawling/strategies/recursive.py around lines 233
to 241, the report_progress call is passing unknown kwargs (totalPages and
processedPages) which will be dropped; update the call to pass those values
inside the step_info dict (e.g., step_info={"totalPages": total_processed,
"processedPages": len(results_all)}) and keep the same message and
current_progress argument so the progress payload lands under step_info as the
other fixes did.

i += 1

current_urls = next_level_urls

# Report completion of this depth
await report_progress(depth_end,
f'Depth {depth + 1} completed: {depth_successful} pages crawled, {len(next_level_urls)} URLs found for next depth')

await report_progress(end_progress, f'Recursive crawling completed: {len(results_all)} total pages crawled across {max_depth} depth levels')
return results_all
await report_progress(
depth_end,
f"Depth {depth + 1} completed: {depth_successful} pages crawled, {len(next_level_urls)} URLs found for next depth",
)

await report_progress(
end_progress,
f"Recursive crawling completed: {len(results_all)} total pages crawled across {max_depth} depth levels",
)
return results_all