diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6d5d27927139..1eaae1e15160 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -40,6 +40,7 @@ jobs: run: | python -m pip install --upgrade pip wheel pip install -e . + pip install -e .[test,websurfer] python -c "import autogen" pip install pytest mock - name: Test with pytest skipping openai tests diff --git a/autogen/agentchat/contrib/web_surfer.py b/autogen/agentchat/contrib/web_surfer.py index 4877a4d0949d..11ccd800647e 100644 --- a/autogen/agentchat/contrib/web_surfer.py +++ b/autogen/agentchat/contrib/web_surfer.py @@ -1,21 +1,23 @@ -import json +import copy import copy import logging import re -from dataclasses import dataclass -from typing import Dict, List, Optional, Union, Callable, Literal, Tuple -from autogen import Agent, ConversableAgent, AssistantAgent, UserProxyAgent, GroupChatManager, GroupChat, OpenAIWrapper -from autogen.browser_utils import SimpleTextBrowser -from autogen.code_utils import content_str from datetime import datetime -from autogen.token_count_utils import count_token, get_max_token_limit +from typing import Dict, List, Optional, Union, Callable, Literal, Tuple + +from autogen import Agent, ConversableAgent, AssistantAgent, UserProxyAgent, OpenAIWrapper +from autogen.browser_utils import SimpleTextBrowser, HeadlessChromeBrowser from autogen.oai.openai_utils import filter_config +from autogen.token_count_utils import count_token, get_max_token_limit logger = logging.getLogger(__name__) class WebSurferAgent(ConversableAgent): - """(In preview) An agent that acts as a basic web surfer that can search the web and visit web pages.""" + """(In preview) An agent that acts as a basic web surfer that can search the web and visit web pages. + Defaults to a simple text-based browser. + Can be configured to use a headless Chrome browser by providing a browser_config dictionary with the key "headless" set to True. + """ DEFAULT_PROMPT = ( "You are a helpful AI assistant with access to a web browser (via the provided functions). In fact, YOU ARE THE ONLY MEMBER OF YOUR PARTY WITH ACCESS TO A WEB BROWSER, so please help out where you can by performing web searches, navigating pages, and reporting what you find. Today's date is " @@ -84,7 +86,11 @@ def __init__( if browser_config is None: self.browser = SimpleTextBrowser() else: - self.browser = SimpleTextBrowser(**browser_config) + headless = browser_config.pop("headless", False) + if headless: + self.browser = HeadlessChromeBrowser(**browser_config) + else: + self.browser = SimpleTextBrowser(**browser_config) # Create a copy of the llm_config for the inner monologue agents to use, and set them up with function calling if llm_config is None: # Nothing to copy @@ -214,7 +220,7 @@ def _browser_state(): current_page = self.browser.viewport_current_page total_pages = len(self.browser.viewport_pages) - header += f"Viewport position: Showing page {current_page+1} of {total_pages}.\n" + header += f"Viewport position: Showing page {current_page + 1} of {total_pages}.\n" return (header, self.browser.viewport) def _informational_search(query): @@ -225,7 +231,7 @@ def _informational_search(query): def _navigational_search(query): self.browser.visit_page(f"bing: {query}") - # Extract the first linl + # Extract the first link m = re.search(r"\[.*?\]\((http.*?)\)", self.browser.page_content) if m: self.browser.visit_page(m.group(1)) diff --git a/autogen/browser_utils/__init__.py b/autogen/browser_utils/__init__.py new file mode 100644 index 000000000000..cc89c8e7ec25 --- /dev/null +++ b/autogen/browser_utils/__init__.py @@ -0,0 +1,7 @@ +from .simple_text_browser import SimpleTextBrowser +from .headless_chrome_browser import HeadlessChromeBrowser + +__all__ = ( + "SimpleTextBrowser", + "HeadlessChromeBrowser", +) diff --git a/autogen/browser_utils/abstract_browser.py b/autogen/browser_utils/abstract_browser.py new file mode 100644 index 000000000000..123f5fb9863e --- /dev/null +++ b/autogen/browser_utils/abstract_browser.py @@ -0,0 +1,48 @@ +from abc import ABC, abstractmethod +from typing import Optional, Union, Dict + + +class AbstractBrowser(ABC): + """An abstract class for a web browser.""" + + @abstractmethod + def __init__( + self, + start_page: Optional[str] = "about:blank", + viewport_size: Optional[int] = 1024 * 8, + downloads_folder: Optional[Union[str, None]] = None, + bing_api_key: Optional[Union[str, None]] = None, + request_kwargs: Optional[Union[Dict, None]] = None, + ): + pass + + @property + @abstractmethod + def address(self) -> str: + pass + + @abstractmethod + def set_address(self, uri_or_path): + pass + + @property + @abstractmethod + def viewport(self) -> str: + pass + + @property + @abstractmethod + def page_content(self) -> str: + pass + + @abstractmethod + def page_down(self): + pass + + @abstractmethod + def page_up(self): + pass + + @abstractmethod + def visit_page(self, path_or_uri): + pass diff --git a/autogen/browser_utils/headless_chrome_browser.py b/autogen/browser_utils/headless_chrome_browser.py new file mode 100644 index 000000000000..de59cbd07706 --- /dev/null +++ b/autogen/browser_utils/headless_chrome_browser.py @@ -0,0 +1,147 @@ +import re + +from bs4 import BeautifulSoup +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.common.by import By + +from autogen.browser_utils.abstract_browser import AbstractBrowser + +# Optional PDF support +IS_PDF_CAPABLE = False +try: + import pdfminer + import pdfminer.high_level + + IS_PDF_CAPABLE = True +except ModuleNotFoundError: + pass + +# Other optional dependencies +try: + import pathvalidate +except ModuleNotFoundError: + pass + +from typing import Optional, Union, Dict + + +class HeadlessChromeBrowser(AbstractBrowser): + """(In preview) A Selenium powered headless Chrome browser. Suitable for Agentic use.""" + + def __init__( + self, + start_page: Optional[str] = "about:blank", + viewport_size: Optional[int] = 1024 * 8, + downloads_folder: Optional[Union[str, None]] = None, + bing_api_key: Optional[Union[str, None]] = None, + request_kwargs: Optional[Union[Dict, None]] = None, + ): + self.start_page = start_page + self.driver = None + self.viewport_size = viewport_size # Applies only to the standard uri types + self.downloads_folder = downloads_folder + self.history = list() + self.page_title = None + self.viewport_current_page = 0 + self.viewport_pages = list() + self.bing_api_key = bing_api_key + self.request_kwargs = request_kwargs + self._page_content = "" + + self._start_browser() + + def _start_browser(self): + chrome_options = Options() + chrome_options.add_argument("--headless") + self.driver = webdriver.Chrome(options=chrome_options) + self.driver.get(self.start_page) + + @property + def address(self) -> str: + return self.driver.current_url + + def set_address(self, uri_or_path): + if uri_or_path.startswith("bing:"): + self._bing_search(uri_or_path[len("bing:") :].strip()) + else: + self.driver.get(uri_or_path) + + @property + def viewport(self) -> str: + """Return the content of the current viewport.""" + if not self.viewport_pages: + return "" + bounds = self.viewport_pages[self.viewport_current_page] + return self._page_content[bounds[0] : bounds[1]] + + @property + def page_content(self) -> str: + """Return the full contents of the current page.""" + return self._page_content + + def _set_page_content(self, content) -> str: + """Sets the text content of the current page.""" + self._page_content = content + self._split_pages() + if self.viewport_current_page >= len(self.viewport_pages): + self.viewport_current_page = len(self.viewport_pages) - 1 + + def _split_pages(self): + # Split only regular pages + if not self.address.startswith("http:") and not self.address.startswith("https:"): + return + + # Handle empty pages + if len(self._page_content) == 0: + self.viewport_pages = [(0, 0)] + return + + # Break the viewport into pages + self.viewport_pages = [] + start_idx = 0 + while start_idx < len(self._page_content): + end_idx = min(start_idx + self.viewport_size, len(self._page_content)) + self.viewport_pages.append((start_idx, end_idx)) + start_idx = end_idx + + def _process_html(self, html: str) -> str: + """Process the raw HTML content and return the processed text.""" + soup = BeautifulSoup(html, "html.parser") + + # Remove javascript and style blocks + for script in soup(["script", "style"]): + script.extract() + + # Convert to text + text = soup.get_text() + + # Remove excessive blank lines + text = re.sub(r"\n{2,}", "\n\n", text).strip() + + return text + + def _bing_search(self, query): + self.driver.get("https://www.bing.com") + + search_bar = self.driver.find_element(By.NAME, "q") + search_bar.clear() + search_bar.send_keys(query) + search_bar.submit() + + def page_down(self): + """Move the viewport one page down.""" + if self.viewport_current_page < len(self.viewport_pages) - 1: + self.viewport_current_page += 1 + + def page_up(self): + """Move the viewport one page up.""" + if self.viewport_current_page > 0: + self.viewport_current_page -= 1 + + def visit_page(self, path_or_uri): + """Update the address, visit the page, and return the content of the viewport.""" + self.set_address(path_or_uri) + html = self.driver.execute_script("return document.body.innerHTML;") + self._set_page_content(self._process_html(html)) + return self.viewport diff --git a/autogen/browser_utils.py b/autogen/browser_utils/simple_text_browser.py similarity index 98% rename from autogen/browser_utils.py rename to autogen/browser_utils/simple_text_browser.py index 68e39e4ac8e6..db2fb92eb209 100644 --- a/autogen/browser_utils.py +++ b/autogen/browser_utils/simple_text_browser.py @@ -1,15 +1,15 @@ -import json +import io +import mimetypes import os -import requests import re -import markdownify -import io import uuid -import mimetypes from urllib.parse import urljoin, urlparse + +import markdownify +import requests from bs4 import BeautifulSoup -from dataclasses import dataclass -from typing import Dict, List, Optional, Union, Callable, Literal, Tuple + +from autogen.browser_utils.abstract_browser import AbstractBrowser # Optional PDF support IS_PDF_CAPABLE = False @@ -27,8 +27,10 @@ except ModuleNotFoundError: pass +from typing import Optional, Union, Dict + -class SimpleTextBrowser: +class SimpleTextBrowser(AbstractBrowser): """(In preview) An extremely simple text-based web browser comparable to Lynx. Suitable for Agentic use.""" def __init__( diff --git a/setup.py b/setup.py index 3dbdfad05159..fef74a632dfb 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ "teachable": ["chromadb"], "lmm": ["replicate", "pillow"], "graphs": ["networkx~=3.2.1", "matplotlib~=3.8.1"], - "websurfer": ["beautifulsoup4", "markdownify", "pdfminer.six", "pathvalidate"], + "websurfer": ["beautifulsoup4", "markdownify", "pdfminer.six", "pathvalidate", "selenium"], "redis": ["redis"], }, classifiers=[ diff --git a/test/agentchat/contrib/test_web_surfer.py b/test/agentchat/contrib/test_web_surfer.py index 307abefecbca..e1529c2f9db5 100644 --- a/test/agentchat/contrib/test_web_surfer.py +++ b/test/agentchat/contrib/test_web_surfer.py @@ -92,6 +92,53 @@ def test_web_surfer(): response = function_map["summarize_page"]() +@pytest.mark.skipif( + skip_all, + reason="do not run if dependency is not installed", +) +def test_web_surfer_headless(): + page_size = 4096 + web_surfer = WebSurferAgent( + "web_surfer", llm_config=False, browser_config={"viewport_size": page_size, "headless": True} + ) + + # Sneak a peak at the function map, allowing us to call the functions for testing here + function_map = web_surfer._user_proxy._function_map + + # Test some basic navigations + response = function_map["visit_page"](BLOG_POST_URL) + assert f"Address: {BLOG_POST_URL}".strip() in response + # assert f"Title: {BLOG_POST_TITLE}".strip() in response + + # Test scrolling + m = re.search(r"\bViewport position: Showing page 1 of (\d+).", response) + total_pages = int(m.group(1)) + + response = function_map["page_down"]() + assert ( + f"Viewport position: Showing page 2 of {total_pages}." in response + ) # Assumes the content is longer than one screen + + response = function_map["page_up"]() + assert f"Viewport position: Showing page 1 of {total_pages}." in response + + # Try to scroll too far back up + response = function_map["page_up"]() + assert f"Viewport position: Showing page 1 of {total_pages}." in response + + # Try to scroll too far down + for i in range(0, total_pages + 1): + response = function_map["page_down"]() + assert f"Viewport position: Showing page {total_pages} of {total_pages}." in response + + # Test Q&A and summarization -- we don't have a key so we expect it to fail (but it means the code path is correct) + with pytest.raises(AttributeError, match="'NoneType' object has no attribute 'create'"): + response = function_map["answer_from_page"]("When was it founded?") + + with pytest.raises(AttributeError, match="'NoneType' object has no attribute 'create'"): + response = function_map["summarize_page"]() + + @pytest.mark.skipif( skip_oai, reason="do not run if oai is not installed", @@ -110,32 +157,34 @@ def test_web_surfer_oai(): assert len(llm_config["config_list"]) > 0 assert len(summarizer_llm_config["config_list"]) > 0 - page_size = 4096 - web_surfer = WebSurferAgent( - "web_surfer", - llm_config=llm_config, - summarizer_llm_config=summarizer_llm_config, - browser_config={"viewport_size": page_size}, - ) + # run the test with both text and headless browsers + for useHeadlessBrowser in [False, True]: + page_size = 4096 + web_surfer = WebSurferAgent( + "web_surfer", + llm_config=llm_config, + summarizer_llm_config=summarizer_llm_config, + browser_config={"viewport_size": page_size, "headless": useHeadlessBrowser}, + ) - user_proxy = UserProxyAgent( - "user_proxy", - human_input_mode="NEVER", - code_execution_config=False, - default_auto_reply="", - is_termination_msg=lambda x: True, - ) + user_proxy = UserProxyAgent( + "user_proxy", + human_input_mode="NEVER", + code_execution_config=False, + default_auto_reply="", + is_termination_msg=lambda x: True, + ) - # Make some requests that should test function calling - user_proxy.initiate_chat(web_surfer, message="Please visit the page 'https://en.wikipedia.org/wiki/Microsoft'") + # Make some requests that should test function calling + user_proxy.initiate_chat(web_surfer, message="Please visit the page 'https://en.wikipedia.org/wiki/Microsoft'") - user_proxy.initiate_chat(web_surfer, message="Please scroll down.") + user_proxy.initiate_chat(web_surfer, message="Please scroll down.") - user_proxy.initiate_chat(web_surfer, message="Please scroll up.") + user_proxy.initiate_chat(web_surfer, message="Please scroll up.") - user_proxy.initiate_chat(web_surfer, message="When was it founded?") + user_proxy.initiate_chat(web_surfer, message="When was it founded?") - user_proxy.initiate_chat(web_surfer, message="What's this page about?") + user_proxy.initiate_chat(web_surfer, message="What's this page about?") @pytest.mark.skipif( @@ -165,6 +214,32 @@ def test_web_surfer_bing(): assert "Address: https://en.wikipedia.org/wiki/" in response +@pytest.mark.skipif( + skip_oai, + reason="do not run if open ai api key is not available", +) +def test_web_surfer_headless_bing(): + page_size = 4096 + web_surfer = WebSurferAgent( + "web_surfer", + llm_config=False, + browser_config={"viewport_size": page_size, "headless": True}, + ) + + # Sneak a peak at the function map, allowing us to call the functions for testing here + function_map = web_surfer._user_proxy._function_map + + # Test informational queries + response = function_map["informational_web_search"](BING_QUERY) + assert "Address: https://www.bing.com/search?q=Microsoft&form=QBLH" in response + assert "Microsoft – Cloud, Computers, Apps & Gaming" in response + + # Test informational queries + response = function_map["navigational_web_search"](BING_QUERY + " Wikipedia") + assert "Address: https://www.bing.com/search?q=Microsoft+Wikipedia&form=QBLH" in response + assert "Microsoft - Wikipedia" in response + + if __name__ == "__main__": """Runs this file's tests from the command line.""" test_web_surfer() diff --git a/test/browser_utils/test_headless_chrome_browser.py b/test/browser_utils/test_headless_chrome_browser.py new file mode 100644 index 000000000000..359cb4b710cb --- /dev/null +++ b/test/browser_utils/test_headless_chrome_browser.py @@ -0,0 +1,100 @@ +import unittest +from unittest.mock import patch, MagicMock, call +from selenium.webdriver.chrome.webdriver import WebDriver +from selenium.webdriver.common.by import By + +from autogen.browser_utils.headless_chrome_browser import HeadlessChromeBrowser + + +class TestHeadlessChromeBrowser(unittest.TestCase): + @patch.object(WebDriver, "get") + def test_set_address(self, mock_get): + # Arrange + browser = HeadlessChromeBrowser() + + # Act + browser.set_address("https://www.example.com") + + # Assert + self.assertEqual(mock_get.call_count, 2) + self.assertEqual(mock_get.call_args_list[0], call("about:blank")) + self.assertEqual(mock_get.call_args_list[1], call("https://www.example.com")) + + @patch.object(WebDriver, "execute_script") + def test_page_content(self, mock_execute_script): + # Arrange + mock_execute_script.return_value = "

Hello, World!

" + browser = HeadlessChromeBrowser() + browser.visit_page("https://www.example.com") + # Act + content = browser.page_content + + # Assert + self.assertEqual("Hello, World!", content) + + @patch.object(WebDriver, "get") + @patch.object(WebDriver, "find_element") + def test_bing_search(self, mock_find_element, mock_get): + # Arrange + mock_element = MagicMock() + mock_element.submit = MagicMock() + mock_element.clear = MagicMock() + mock_element.send_keys = MagicMock() + mock_find_element.return_value = mock_element + browser = HeadlessChromeBrowser() + + # Act + browser._bing_search("test query") + + # Assert + self.assertEqual(mock_get.call_count, 2) + self.assertEqual(mock_get.call_args_list[0], call("about:blank")) + self.assertEqual(mock_get.call_args_list[1], call("https://www.bing.com")) + mock_find_element.assert_called_once_with(By.NAME, "q") + mock_element.clear.assert_called_once() + mock_element.send_keys.assert_called_once_with("test query") + mock_element.submit.assert_called_once() + + def test_page_up(self): + # Arrange + browser = HeadlessChromeBrowser() + browser._set_page_content("Hello, World!" * 1000) # Set a long page content + browser.viewport_current_page = 1 # Set the current page to 1 + + # Act + browser.page_up() + + # Assert + self.assertEqual(browser.viewport_current_page, 0) # The current page should now be 0 + + def test_page_down(self): + # Arrange + browser = HeadlessChromeBrowser() + browser._set_page_content("Hello, World!" * 1000) # Set a long page content + browser.viewport_current_page = 1 # Set the current page to 0 + + # Act + browser.page_down() + + # Assert + self.assertEqual(browser.viewport_current_page, 1) # The current page should now be 1 + + @patch.object(WebDriver, "get") + @patch.object(WebDriver, "execute_script") + def test_visit_page(self, mock_execute_script, mock_get): + # Arrange + mock_execute_script.return_value = "

Hello, World!

" + browser = HeadlessChromeBrowser() + + # Act + browser.visit_page("https://www.example.com") + + # Assert + self.assertEqual(mock_get.call_count, 2) + self.assertEqual(mock_get.call_args_list[0], call("about:blank")) + self.assertEqual(mock_get.call_args_list[1], call("https://www.example.com")) + self.assertEqual(browser.page_content, "Hello, World!") + + +if __name__ == "__main__": + unittest.main() diff --git a/test/browser_utils/test_simple_text_browser.py b/test/browser_utils/test_simple_text_browser.py new file mode 100644 index 000000000000..56df119a175a --- /dev/null +++ b/test/browser_utils/test_simple_text_browser.py @@ -0,0 +1,138 @@ +import os +import tempfile +import unittest +from unittest.mock import patch, Mock + +import requests + +from autogen.browser_utils.simple_text_browser import SimpleTextBrowser + + +class TestSimpleTextBrowser(unittest.TestCase): + def setUp(self): + self.browser = SimpleTextBrowser() + + def test_init(self): + self.assertEqual(self.browser.start_page, "about:blank") + self.assertEqual(self.browser.viewport_size, 1024 * 8) + self.assertIsNone(self.browser.downloads_folder) + self.assertIsNone(self.browser.bing_api_key) + self.assertIsNone(self.browser.request_kwargs) + + def test_set_address(self): + self.browser.set_address("https://www.example.com") + self.assertEqual(self.browser.address, "https://www.example.com") + + def test_viewport(self): + self.browser.set_address("https://www.example.com") + self.assertIsInstance(self.browser.viewport, str) + + def test_page_content(self): + self.browser.set_address("https://www.example.com") + self.assertIsInstance(self.browser.page_content, str) + + def test_page_down(self): + self.browser.set_address("https://www.example.com") + current_page = self.browser.viewport_current_page + self.browser.page_down() + self.assertEqual( + self.browser.viewport_current_page, min(current_page + 1, len(self.browser.viewport_pages) - 1) + ) + + def test_page_up(self): + self.browser.set_address("https://www.example.com") + self.browser.page_down() + current_page = self.browser.viewport_current_page + self.browser.page_up() + self.assertEqual(self.browser.viewport_current_page, max(current_page - 1, 0)) + + def test_visit_page(self): + content = self.browser.visit_page("https://www.example.com") + self.assertIsInstance(content, str) + + @patch.object(requests, "get") + def test_bing_api_call(self, mock_get): + # Arrange + mock_response = Mock() + expected_result = {"webPages": {"value": []}} + mock_response.json.return_value = expected_result + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + self.browser.bing_api_key = "test_key" + + # Act + result = self.browser._bing_api_call("test_query") + + # Assert + mock_get.assert_called_once_with( + "https://api.bing.microsoft.com/v7.0/search", + headers={"Ocp-Apim-Subscription-Key": "test_key"}, + params={"q": "test_query", "textDecorations": False, "textFormat": "raw"}, + stream=False, + ) + self.assertEqual(result, expected_result) + + @patch.object(SimpleTextBrowser, "_bing_api_call") + def test_bing_search(self, mock_bing_api_call): + # Arrange + expected_result = { + "webPages": {"value": [{"name": "Test Page", "url": "https://www.example.com", "snippet": "Test Snippet"}]} + } + mock_bing_api_call.return_value = expected_result + query = "test_query" + + # Act + self.browser._bing_search(query) + + # Assert + mock_bing_api_call.assert_called_once_with(query) + self.assertIn("Test Page", self.browser.page_content) + self.assertIn("https://www.example.com", self.browser.page_content) + self.assertIn("Test Snippet", self.browser.page_content) + + @patch.object(requests, "get") + def test_fetch_page_text_plain(self, mock_get): + # Arrange + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {"content-type": "text/plain"} + mock_response.iter_content.return_value = iter([b"Test content".decode("utf-8")]) # decode bytes to string + mock_get.return_value = mock_response + url = "https://www.example.com/test.txt" + + # Act + self.browser.set_address(url) + + # Assert + mock_get.assert_called_once_with(url, stream=True) + self.assertEqual(self.browser.page_content, "Test content") # compare with decoded string + + @patch.object(requests, "get") + def test_downloads_folder(self, mock_get): + # Arrange + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {"content-type": "application/octet-stream"} + mock_response.iter_content.return_value = iter([b"Download content"]) + mock_get.return_value = mock_response + url = "https://www.example.com/test.bin" + + with tempfile.TemporaryDirectory() as downloads_folder: + self.browser.downloads_folder = downloads_folder + + # Act + self.browser.set_address(url) + + # Assert + mock_get.assert_called_once_with(url, stream=True) + download_path = os.path.join(downloads_folder, os.listdir(downloads_folder)[0]) + with open(download_path, "rb") as f: + content = f.read() + self.assertEqual(content, b"Download content") + self.assertIn("Downloaded", self.browser.page_content) + self.assertIn(url, self.browser.page_content) + self.assertIn(download_path, self.browser.page_content) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_browser_utils.py b/test/test_browser_utils.py index 8b8759708b95..b5b6841e87f1 100644 --- a/test/test_browser_utils.py +++ b/test/test_browser_utils.py @@ -1,3 +1,5 @@ +import tempfile + import pytest import os import sys @@ -55,96 +57,90 @@ def _rm_folder(path): reason="do not run if dependency is not installed", ) def test_simple_text_browser(): - # Create a downloads folder (removing any leftover ones from prior tests) - downloads_folder = os.path.join(KEY_LOC, "downloads") - if os.path.isdir(downloads_folder): - _rm_folder(downloads_folder) - os.mkdir(downloads_folder) - - # Instantiate the browser - user_agent = "python-requests/" + requests.__version__ - viewport_size = 1024 - browser = SimpleTextBrowser( - downloads_folder=downloads_folder, - viewport_size=viewport_size, - request_kwargs={ - "headers": {"User-Agent": user_agent}, - }, - ) + # Create a temp downloads folder (removing any leftover ones from prior tests) + with tempfile.TemporaryDirectory() as downloads_folder: + # Instantiate the browser + user_agent = "python-requests/" + requests.__version__ + viewport_size = 1024 + browser = SimpleTextBrowser( + downloads_folder=downloads_folder, + viewport_size=viewport_size, + request_kwargs={ + "headers": {"User-Agent": user_agent}, + }, + ) + + # Test that we can visit a page and find what we expect there + top_viewport = browser.visit_page(BLOG_POST_URL) + assert browser.viewport == top_viewport + assert browser.page_title.strip() == BLOG_POST_TITLE.strip() + assert BLOG_POST_STRING in browser.page_content + + # Check if page splitting works + approx_pages = math.ceil(len(browser.page_content) / viewport_size) # May be fewer, since it aligns to word breaks + assert len(browser.viewport_pages) <= approx_pages + assert abs(len(browser.viewport_pages) - approx_pages) <= 1 # allow only a small deviation + assert browser.viewport_pages[0][0] == 0 + assert browser.viewport_pages[-1][1] == len(browser.page_content) + + # Make sure we can reconstruct the full contents from the split pages + buffer = "" + for bounds in browser.viewport_pages: + buffer += browser.page_content[bounds[0] : bounds[1]] + assert buffer == browser.page_content + + # Test scrolling (scroll all the way to the bottom) + for i in range(1, len(browser.viewport_pages)): + browser.page_down() + assert browser.viewport_current_page == i + # Test scrolloing beyond the limits + for i in range(0, 5): + browser.page_down() + assert browser.viewport_current_page == len(browser.viewport_pages) - 1 + + # Test scrolling (scroll all the way to the bottom) + for i in range(len(browser.viewport_pages) - 2, 0, -1): + browser.page_up() + assert browser.viewport_current_page == i + # Test scrolloing beyond the limits + for i in range(0, 5): + browser.page_up() + assert browser.viewport_current_page == 0 + + # Test Wikipedia handling + assert WIKIPEDIA_STRING in browser.visit_page(WIKIPEDIA_URL) + assert WIKIPEDIA_TITLE.strip() == browser.page_title.strip() + + # Visit a plain-text file + response = requests.get(PLAIN_TEXT_URL) + response.raise_for_status() + expected_results = response.text + + browser.visit_page(PLAIN_TEXT_URL) + assert browser.page_content.strip() == expected_results.strip() + + # Directly download an image, and compute its md5 + response = requests.get(IMAGE_URL, stream=True) + response.raise_for_status() + expected_md5 = hashlib.md5(response.raw.read()).hexdigest() + + # Visit an image causing it to be downloaded by the SimpleTextBrowser, then compute its md5 + viewport = browser.visit_page(IMAGE_URL) + m = re.search(r"Downloaded '(.*?)' to '(.*?)'", viewport) + fetched_url = m.group(1) + download_loc = m.group(2) + assert fetched_url == IMAGE_URL + + with open(download_loc, "rb") as fh: + downloaded_md5 = hashlib.md5(fh.read()).hexdigest() + + # MD%s should match + assert expected_md5 == downloaded_md5 + + # Fetch a PDF + viewport = browser.visit_page(PDF_URL) + assert PDF_STRING in viewport - # Test that we can visit a page and find what we expect there - top_viewport = browser.visit_page(BLOG_POST_URL) - assert browser.viewport == top_viewport - assert browser.page_title.strip() == BLOG_POST_TITLE.strip() - assert BLOG_POST_STRING in browser.page_content - - # Check if page splitting works - approx_pages = math.ceil(len(browser.page_content) / viewport_size) # May be fewer, since it aligns to word breaks - assert len(browser.viewport_pages) <= approx_pages - assert abs(len(browser.viewport_pages) - approx_pages) <= 1 # allow only a small deviation - assert browser.viewport_pages[0][0] == 0 - assert browser.viewport_pages[-1][1] == len(browser.page_content) - - # Make sure we can reconstruct the full contents from the split pages - buffer = "" - for bounds in browser.viewport_pages: - buffer += browser.page_content[bounds[0] : bounds[1]] - assert buffer == browser.page_content - - # Test scrolling (scroll all the way to the bottom) - for i in range(1, len(browser.viewport_pages)): - browser.page_down() - assert browser.viewport_current_page == i - # Test scrolloing beyond the limits - for i in range(0, 5): - browser.page_down() - assert browser.viewport_current_page == len(browser.viewport_pages) - 1 - - # Test scrolling (scroll all the way to the bottom) - for i in range(len(browser.viewport_pages) - 2, 0, -1): - browser.page_up() - assert browser.viewport_current_page == i - # Test scrolloing beyond the limits - for i in range(0, 5): - browser.page_up() - assert browser.viewport_current_page == 0 - - # Test Wikipedia handling - assert WIKIPEDIA_STRING in browser.visit_page(WIKIPEDIA_URL) - assert WIKIPEDIA_TITLE.strip() == browser.page_title.strip() - - # Visit a plain-text file - response = requests.get(PLAIN_TEXT_URL) - response.raise_for_status() - expected_results = response.text - - browser.visit_page(PLAIN_TEXT_URL) - assert browser.page_content.strip() == expected_results.strip() - - # Directly download an image, and compute its md5 - response = requests.get(IMAGE_URL, stream=True) - response.raise_for_status() - expected_md5 = hashlib.md5(response.raw.read()).hexdigest() - - # Visit an image causing it to be downloaded by the SimpleTextBrowser, then compute its md5 - viewport = browser.visit_page(IMAGE_URL) - m = re.search(r"Downloaded '(.*?)' to '(.*?)'", viewport) - fetched_url = m.group(1) - download_loc = m.group(2) - assert fetched_url == IMAGE_URL - - with open(download_loc, "rb") as fh: - downloaded_md5 = hashlib.md5(fh.read()).hexdigest() - - # MD%s should match - assert expected_md5 == downloaded_md5 - - # Fetch a PDF - viewport = browser.visit_page(PDF_URL) - assert PDF_STRING in viewport - - # Clean up - _rm_folder(downloads_folder) @pytest.mark.skipif(