Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import tempfile
import threading
import traceback
from asyncio import CancelledError
from collections.abc import AsyncGenerator, Awaitable, Iterable, Sequence
from contextlib import AsyncExitStack, suppress
Expand Down Expand Up @@ -55,6 +56,10 @@
from crawlee.storages import Dataset, KeyValueStore, RequestQueue

from ._context_pipeline import ContextPipeline
from ._logging_utils import (
get_one_line_error_summary_if_possible,
reduce_asyncio_timeout_error_to_relevant_traceback_parts,
)

if TYPE_CHECKING:
import re
Expand Down Expand Up @@ -218,6 +223,7 @@ class BasicCrawler(Generic[TCrawlingContext, TStatisticsState]):
"""

_CRAWLEE_STATE_KEY = 'CRAWLEE_STATE'
_request_handler_timeout_text = 'Request handler timed out after'

def __init__(
self,
Expand Down Expand Up @@ -921,6 +927,10 @@ async def _handle_request_retries(

if self._should_retry_request(context, error):
request.retry_count += 1
self.log.warning(
f'Retrying request to {context.request.url} due to: {error} \n'
f'{get_one_line_error_summary_if_possible(error)}'
)
await self._statistics.error_tracker.add(error=error, context=context)

if self._error_handler:
Expand Down Expand Up @@ -974,7 +984,10 @@ async def _handle_request_error(self, context: TCrawlingContext | BasicCrawlingC
context.session.mark_bad()

async def _handle_failed_request(self, context: TCrawlingContext | BasicCrawlingContext, error: Exception) -> None:
self._logger.exception('Request failed and reached maximum retries', exc_info=error)
self._logger.error(
f'Request to {context.request.url} failed and reached maximum retries\n '
f'{self._get_message_from_error(error)}'
)
await self._statistics.error_tracker.add(error=error, context=context)

if self._failed_request_handler:
Expand All @@ -983,6 +996,32 @@ async def _handle_failed_request(self, context: TCrawlingContext | BasicCrawling
except Exception as e:
raise UserDefinedErrorHandlerError('Exception thrown in user-defined failed request handler') from e

def _get_message_from_error(self, error: Exception) -> str:
"""Get error message summary from exception.

Custom processing to reduce the irrelevant traceback clutter in some cases.
"""
traceback_parts = traceback.format_exception(type(error), value=error, tb=error.__traceback__, chain=True)
used_traceback_parts = traceback_parts

if (
isinstance(error, asyncio.exceptions.TimeoutError)
and self._request_handler_timeout_text in traceback_parts[-1]
):
used_traceback_parts = reduce_asyncio_timeout_error_to_relevant_traceback_parts(error)
used_traceback_parts.append(traceback_parts[-1])

return ''.join(used_traceback_parts).strip('\n')

def _get_only_inner_most_exception(self, error: BaseException) -> BaseException:
"""Get innermost exception by following __cause__ and __context__ attributes of exception."""
if error.__cause__:
return self._get_only_inner_most_exception(error.__cause__)
if error.__context__:
return self._get_only_inner_most_exception(error.__context__)
# No __cause__ and no __context__, this is as deep as it can get.
return error

def _prepare_send_request_function(
self,
session: Session | None,
Expand Down Expand Up @@ -1252,7 +1291,8 @@ async def _run_request_handler(self, context: BasicCrawlingContext) -> None:
await wait_for(
lambda: self._context_pipeline(context, self.router),
timeout=self._request_handler_timeout,
timeout_message=f'Request handler timed out after {self._request_handler_timeout.total_seconds()} seconds',
timeout_message=f'{self._request_handler_timeout_text}'
f' {self._request_handler_timeout.total_seconds()} seconds',
logger=self._logger,
)

Expand Down
64 changes: 64 additions & 0 deletions src/crawlee/crawlers/_basic/_logging_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import asyncio
import re
import traceback


def _get_only_innermost_exception(error: BaseException) -> BaseException:
"""Get innermost exception by following __cause__ and __context__ attributes of exception."""
if error.__cause__:
return _get_only_innermost_exception(error.__cause__)
if error.__context__:
return _get_only_innermost_exception(error.__context__)
# No __cause__ and no __context__, this is as deep as it can get.
return error


def _get_filtered_traceback_parts_for_asyncio_timeout_error(traceback_parts: list[str]) -> list[str]:
"""Extract only the most relevant traceback parts from stack trace."""
ignore_pattern = (
r'([\\/]{1}asyncio[\\/]{1})|' # internal asyncio parts
r'(Traceback \(most recent call last\))|' # common part of the stack trace formatting
r'(asyncio\.exceptions\.CancelledError)' # internal asyncio exception
)
return [
_strip_pep657_highlighting(traceback_part)
for traceback_part in traceback_parts
if not re.findall(ignore_pattern, traceback_part)
]


def _strip_pep657_highlighting(traceback_part: str) -> str:
"""Remove PEP 657 highlighting from the traceback."""
highlight_pattern = r'(\n\s*~*\^+~*\n)$'
return re.sub(highlight_pattern, '\n', traceback_part)


def reduce_asyncio_timeout_error_to_relevant_traceback_parts(
timeout_error: asyncio.exceptions.TimeoutError,
) -> list[str]:
innermost_error_traceback_parts = _get_traceback_parts_for_innermost_exception(timeout_error)
return _get_filtered_traceback_parts_for_asyncio_timeout_error(innermost_error_traceback_parts)


def _get_traceback_parts_for_innermost_exception(error: Exception) -> list[str]:
innermost_error = _get_only_innermost_exception(error)
return traceback.format_exception(
type(innermost_error), value=innermost_error, tb=innermost_error.__traceback__, chain=True
)


def get_one_line_error_summary_if_possible(error: Exception) -> str:
if isinstance(error, asyncio.exceptions.TimeoutError):
most_relevant_part = reduce_asyncio_timeout_error_to_relevant_traceback_parts(error)[-1]
else:
traceback_parts = _get_traceback_parts_for_innermost_exception(error)
# Commonly last traceback part is type of the error, and the second last part is the relevant file.
# If there are not enough traceback parts, then we are not sure how to summarize the error.
relevant_traceback_part_index_from_end = 2
most_relevant_part = _strip_pep657_highlighting(
_get_traceback_parts_for_innermost_exception(error)[-relevant_traceback_part_index_from_end]
if len(traceback_parts) >= relevant_traceback_part_index_from_end
else ''
)

return most_relevant_part.strip('\n ').replace('\n', ', ')
22 changes: 22 additions & 0 deletions tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1323,3 +1323,25 @@ async def test_lock_with_get_robots_txt_file_for_url(server_url: URL) -> None:

# Check that the lock was acquired only once
assert spy.call_count == 1


async def test_reduced_logs_from_timed_out_request_handler(
monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
) -> None:
caplog.set_level(logging.INFO)
crawler = BasicCrawler(configure_logging=False, request_handler_timeout=timedelta(seconds=1))

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
await asyncio.sleep(10) # INJECTED DELAY

await crawler.run([Request.from_url('http://a.com/')])

for record in caplog.records:
if record.funcName == '_handle_failed_request':
full_message = (record.message or '') + (record.exc_text or '')
assert Counter(full_message)['\n'] < 10
assert '# INJECTED DELAY' in full_message
break
else:
raise AssertionError('Expected log message about request handler error was not found.')
Loading