From a903992b6db0cb0a98599e8723feddcd8a37d332 Mon Sep 17 00:00:00 2001 From: KingAkeem Date: Sun, 1 Jul 2018 06:25:20 -0400 Subject: [PATCH 1/3] Refactored tests and TorBot app so that tests no longer need to touch real servers. Also all network utiliy functions are inside of one file. We should build test in the future for our net utilties --- .gitignore | 4 ++ modules/getemails.py | 17 ++---- modules/getweblinks.py | 92 ++------------------------------ modules/net_utils.py | 73 ++++++++++++++++++++++++++ modules/pagereader.py | 107 ++++++++++++-------------------------- tests/test_getemails.py | 34 ++++++++---- tests/test_getweblinks.py | 51 ++++++++++++------ tests/test_pagereader.py | 78 +++++++++------------------ torBot.py | 1 + 9 files changed, 204 insertions(+), 253 deletions(-) create mode 100644 modules/net_utils.py diff --git a/.gitignore b/.gitignore index b92e7ba2..b5b322bc 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,7 @@ tests/__pycache__/ modules/__init__.py .idea/ tests/.ropeproject/ +torBot +*.pyc +tests/.pytest_cache +.pytest_cache diff --git a/modules/getemails.py b/modules/getemails.py index 4b9c03dc..bcbc65d5 100644 --- a/modules/getemails.py +++ b/modules/getemails.py @@ -1,4 +1,5 @@ from modules.bcolors import Bcolors +from modules.net_utils import get_urls_from_page from bs4 import BeautifulSoup @@ -20,20 +21,12 @@ def getMails(soup): if isinstance(type(soup), type(BeautifulSoup)): - emails = [] - links = soup.find_all('a') - for ref in links: - url = ref.get('href') - if url and 'mailto' in url: - """Split email address on""" - email_addr = url.split(':') - if (len(email_addr) > 1): - emails.append(email_addr[1]) + emails = get_urls_from_page(soup, email=True) """Pretty print output as below""" - print ('') - print (b_colors.OKGREEN+'Mails Found - '+b_colors.ENDC+str(len(emails))) - print ('-------------------------------') + print('') + print(b_colors.OKGREEN+'Mails Found - '+b_colors.ENDC+str(len(emails))) + print('-------------------------------') return emails diff --git a/modules/getweblinks.py b/modules/getweblinks.py index ef743a56..b8b58905 100644 --- a/modules/getweblinks.py +++ b/modules/getweblinks.py @@ -1,78 +1,7 @@ -import re -import requests -import tldextract - +from .net_utils import get_urls_from_page, get_url_status from modules import pagereader from bs4 import BeautifulSoup from modules.bcolors import Bcolors -from requests.exceptions import ConnectionError, HTTPError - - -def valid_url(url, extensions=False): - """Checks for any valid url using regular expression matching - - Matches all possible url patterns with the url that is passed and - returns True if it is a url and returns False if it is not. - - Args: - url: string representing url to be checked - - Returns: - bool: True if valid url format and False if not - """ - pattern = r"^https?:\/\/(www\.)?([a-z,A-Z,0-9]*)\.([a-z, A-Z]+)(.*)" - regex = re.compile(pattern) - if not extensions: - if regex.match(url): - return True - return False - - parts = tldextract.extract(url) - valid_sites = list() - for ext in extensions: - if regex.match(url) and '.'+parts.suffix in ext: - valid_sites.append(url) - return valid_sites - - -def valid_onion_url(url): - """Checks for valid onion url using regular expression matching - - Only matches onion urls - - Args: - url: string representing url to be checked - - Returns: - bool: True if valid onion url format, False if not - """ - pattern = r"^https?:\/\/(www\.)?([a-z,A-Z,0-9]*)\.onion/(.*)" - regex = re.compile(pattern) - if regex.match(url): - return True - return False - - -def is_link_alive(link): - """Generator that yields links as they come - - Uses head request because it uses less bandwith than get and timeout is - set to 10 seconds and then link is automatically declared as dead. - - Args: - link: link to be tested - colors: object containing colors for link - - Yields: - string: link with either no color or red which indicates failure - """ - - try: - resp = requests.head(link, timeout=10) - resp.raise_for_status() - return True - except (ConnectionError, HTTPError): - return False def add_green(link): @@ -98,28 +27,17 @@ def get_links(soup, ext=False, live=False): """ b_colors = Bcolors() if isinstance(soup, BeautifulSoup): - websites = [] - - links = soup.find_all('a') - for ref in links: - url = ref.get('href') - if ext: - if url and valid_url(url, ext): - websites.append(url) - else: - if url and valid_onion_url(url): - websites.append(url) - + websites = get_urls_from_page(soup, extension=ext) """Pretty print output as below""" print(''.join((b_colors.OKGREEN, 'Websites Found - ', b_colors.ENDC, str(len(websites))))) print('------------------------------------') - + if live: for link in websites: - if is_link_alive(link): + if get_url_status(link) != 0: coloredlink = add_green(link) - page = pagereader.read_page(link) + page = pagereader.read_first_page(link)[0] if page is not None and page.title is not None: print_row(coloredlink, page.title.string) else: diff --git a/modules/net_utils.py b/modules/net_utils.py new file mode 100644 index 00000000..8d927739 --- /dev/null +++ b/modules/net_utils.py @@ -0,0 +1,73 @@ +import re +import requests + +from requests.exceptions import ConnectionError, HTTPError + + +def check_connection(url): + + print("Attempting to connect to {site}".format(site=url)) + if get_url_status(url) != 0: + return 1 + else: + return 0 + + +def get_url_status(url, headers=False): + """ + Uses head request because it uses less bandwith than get and timeout is + set to 10 seconds and then link is automatically declared as dead. + + Args: + link: link to be tested + colors: object containing colors for link + + Return: + something?: either an int or return value of the connection object's + get request if successful & zero is failure + """ + try: + if headers: + resp = requests.get(url, headers=headers) + else: + resp = requests.get(url) + resp.raise_for_status() + return resp + except (ConnectionError, HTTPError): + return 0 + + +def is_url(url): + pattern = r"^https?:\/\/(www\.)?([a-z,A-Z,0-9]*)\.([a-z, A-Z]+)(.*)" + regex = re.compile(pattern) + if regex.match(url): + return 1 + return 0 + + +def is_onion_url(url): + pattern = r"^https?:\/\/(www\.)?([a-z,A-Z,0-9]*)\.onion/(.*)" + regex = re.compile(pattern) + if regex.match(url): + return 1 + return 0 + + +def get_urls_from_page(page, email=False, extension=False): + urls = [] + anchors_on_page = page.find_all('a') + for anchor_tag in anchors_on_page: + url = anchor_tag.get('href') + if extension: + if url and is_url(url) == 1: + urls.append(url) + elif email: + if url and 'mailto' in url: + email_addr = url.split(':') + if len(email_addr) > 1: + urls.append(email_addr[1]) + else: + if url and is_onion_url(url) == 1: + urls.append(url) + + return urls diff --git a/modules/pagereader.py b/modules/pagereader.py index 8b54bfe8..b8218507 100644 --- a/modules/pagereader.py +++ b/modules/pagereader.py @@ -1,8 +1,6 @@ -import requests - from bs4 import BeautifulSoup +from .net_utils import get_url_status from modules.bcolors import Bcolors -from requests.exceptions import ConnectionError, HTTPError, MissingSchema from sys import exit @@ -16,83 +14,44 @@ def read_first_page(site): attempts_left = 3 err = " " while attempts_left: - try: - if attempts_left == 3: - print(next(connection_msg(site))) - response = requests.get(site, headers=headers) - print("Connection successful.") - page = BeautifulSoup(response.text, 'html.parser') - return page, response - if attempts_left == 2: - print(next(connection_msg('https://'+site))) - response = requests.get('https://'+site, headers=headers) - print("Connection successful.") - page = BeautifulSoup(response.text, 'html.parser') - return page, response - if attempts_left == 1: - print(next(connection_msg('http://'+site))) - response = requests.get('http://'+site, headers=headers) - print("Connection successful.") + + if attempts_left == 3: + print(next(connection_msg(site))) + response = get_url_status(site, headers) + if response != 0: page = BeautifulSoup(response.text, 'html.parser') return page, response - if not attempts_left: - msg = ''.join(("There has been an {err} while attempting to ", - "connect to {site}.")).format(err=err, site=site) - exit(msg) - - except (HTTPError, MissingSchema, ConnectionError) as e: - attempts_left -= 1 - err = e - - if isinstance(err, HTTPError): - print ("There has been an HTTP error after three attempts.") - exit (1) - if isinstance(err, ConnectionError): - print("Got ConnectionError after three attempts... ", - "Please check if the TOR service is running or not.") - exit (1) + else: + attempts_left -= 1 + continue + if attempts_left == 2: + https_url = 'https://'+site + print(next(connection_msg(https_url))) + response = get_url_status(https_url, headers) + if response != 0: + page = BeautifulSoup(response.text, 'html.parser') + return page, response + else: + attempts_left -= 1 + continue -def read_page(site): - headers = {'User-Agent': - 'TorBot - Onion crawler | www.github.com/DedSecInside/TorBot'} - attempts_left = 3 - err = " " - while attempts_left: - try: - if attempts_left == 3: - #print(next(connection_msg(site))) - response = requests.get(site, headers=headers) - #print("Connection successful.") - page = BeautifulSoup(response.text, 'html.parser') - return page - if attempts_left == 2: - #print(next(connection_msg('https://'+site))) - response = requests.get('https://'+site, headers=headers) - #print("Connection successful.") - page = BeautifulSoup(response.text, 'html.parser') - return page - if attempts_left == 1: - #print(next(connection_msg('http://'+site))) - response = requests.get('http://'+site, headers=headers) - #print("Connection successful.") - page = BeautifulSoup(response.text, 'html.parser') - return page - if not attempts_left: - msg = ''.join(("There has been an {err} while attempting to ", - "connect to {site}.")).format(err=err, site=site) - exit(msg) + if attempts_left == 1: + http_url = 'http://'+site + print(next(connection_msg(http_url))) + response = get_url_status(http_url, headers) + if response != 0: + page = BeautifulSoup(response.text, 'html.parser') + return page, response + else: + attempts_left -= 1 + continue - except (HTTPError, MissingSchema, ConnectionError) as e: - attempts_left -= 1 - err = e + if not attempts_left: + msg = ''.join(("There has been an {err} while attempting to ", + "connect to {site}.")).format(err=err, site=site) + exit(msg) - if isinstance(err, HTTPError): - print("There has been an HTTP error after three attempts.") - exit (1) - if isinstance(err, ConnectionError): - print("There has been a connection error after three attempts.") - exit (1) def get_ip(): """Returns users tor ip address diff --git a/tests/test_getemails.py b/tests/test_getemails.py index 3edac352..75a8461d 100644 --- a/tests/test_getemails.py +++ b/tests/test_getemails.py @@ -1,20 +1,32 @@ import sys -import os +sys.path.append('../') -PACKAGE_PARENT = '..' -SCRIPT_DIR = os.path.dirname(os.path.realpath( - os.path.join(os.getcwd(), os.path.expanduser(__file__)))) +import pytest +import modules.getemails as getemails -sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, PACKAGE_PARENT))) +from bs4 import BeautifulSoup +from yattag import Doc -from modules import pagereader, getemails +def test_get_emails(): + test_emails = ['hello@helloaddress.com'] + doc, tag, text, line = Doc().ttl() + doc.asis('') + with tag('html'): + with tag('body'): + for email in test_emails: + line('a', 'test_anchor', href=':'.join(('mailto', email))) -def test_get_emails_successful(): - soup = pagereader.read_first_page('https://www.helloaddress.com/')[0] - test_emails = ["hello@helloaddress.com"] - emails = getemails.getMails(soup) + mock_html = doc.getvalue() + + mock_soup = BeautifulSoup(mock_html, 'html.parser') + emails = getemails.getMails(mock_soup) assert emails == test_emails + +def test_run(): + test_get_emails() + + if __name__ == '__main__': - test_get_emails_successful() + test_run() diff --git a/tests/test_getweblinks.py b/tests/test_getweblinks.py index b5ce301b..578471ee 100644 --- a/tests/test_getweblinks.py +++ b/tests/test_getweblinks.py @@ -1,25 +1,42 @@ -#!/usr/bin/env python - import sys -import os -PACKAGE_PARENT = '..' -SCRIPT_DIR = os.path.dirname(os.path.realpath( - os.path.join(os.getcwd(), os.path.expanduser(__file__)))) +sys.path.append('../') + +import modules.getweblinks as getweblinks +import pytest +import requests_mock + +from bs4 import BeautifulSoup +from yattag import Doc + + +@pytest.fixture +def test_get_links(): + test_data = ['https://aff.ironsocket.com/SH7L', + 'https://aff.ironsocket.com/SH7L', + 'https://wsrs.net/', + 'https://cmsgear.com/'] + + doc, tag, text, line = Doc().ttl() + doc.asis('') + with tag('html'): + with tag('body'): + for data in test_data: + line('a', 'test_anchor', href=data) + + mock_html = doc.getvalue() -sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, PACKAGE_PARENT))) -from modules import getweblinks, pagereader + mock_soup = BeautifulSoup(mock_html, 'html.parser') + with requests_mock.Mocker() as mock_connection: + for data in test_data: + mock_connection.register_uri('GET', data, text='Received') + result = getweblinks.get_links(mock_soup, ext=['.com', '.net']) + assert result == test_data -def test_get_links_successful(): - soup = pagereader.read_first_page('http://www.whatsmyip.net/')[0] - data = ['http://aff.ironsocket.com/SH7L', - 'http://aff.ironsocket.com/SH7L', - 'http://wsrs.net/', - 'http://cmsgear.com/'] - result = getweblinks.get_links(soup, ext=['.com', '.net']) - assert result == data +def test_run(): + test_get_links() if __name__ == '__main__': - test_get_links_successful() + test_run() diff --git a/tests/test_pagereader.py b/tests/test_pagereader.py index 035db004..610b6c0d 100644 --- a/tests/test_pagereader.py +++ b/tests/test_pagereader.py @@ -1,68 +1,42 @@ +import sys +sys.path.append('../') + +import modules.pagereader as pagereader import pytest -import requests import requests_mock -from bs4 import BeautifulSoup -from requests.exceptions import HTTPError, MissingSchema, ConnectionError +from yattag import Doc @pytest.fixture -def test_read_first_page(site): - - with requests_mock.Mocker() as m: - m.get('https://www.test.com', text='This is a dot com site.') - m.get('https://www.test.org', text='This is a dot org site.') - m.get('https://www.test.net', text='This is a dot net site.') - m.get('https://www.test.onion', text='This is a dot onion site.') +def test_read_first_page(): + websites = [] + test_data = [ + ('https://www.test.com', 'This is a dot com site.'), + ('https://www.test.org', 'This is a dot org site.'), + ('https://www.test.net', 'This is a dot net site.'), + ('https://www.test.onion', 'This is a dot onion site.') + ] - m.register_uri('GET', 'https://www.test.cannotbefound', exc=HTTPError) - m.register_uri('GET', 'http://www.test.cannotbefound', exc=HTTPError) + doc, tag, text = Doc().tagtext() - headers = {'User-Agent': - 'TorBot - Onion crawler | www.github.com/DedSecInside/TorBot'} - attempts_left = 3 - err = " " + for data in test_data: + doc.asis('') + with tag('html'): + with tag('body'): + text(data[1]) - # Removed unnecessary code such as printing - while attempts_left: - try: - if attempts_left == 3: - response = requests.get(site, headers=headers) - page = BeautifulSoup(response.text, 'html.parser') - return str(page) - if attempts_left == 2: - response = requests.get('https://'+site, headers=headers) - page = BeautifulSoup(response.text, 'html.parser') - return str(page) - if attempts_left == 1: - response = requests.get('http://'+site, headers=headers) - page = BeautifulSoup(response.text, 'html.parser') - return str(page) - if not attempts_left: - raise err + websites.append(doc.getvalue()) - except (HTTPError, MissingSchema, ConnectionError) as e: - err = e - attempts_left -= 1 - - raise err + with requests_mock.Mocker() as mock_connection: + for i, website in enumerate(websites): + mock_connection.register_uri('GET', test_data[i][0], text=test_data[i][1]) + result = str(pagereader.read_first_page(test_data[i][0])[0]) + assert result == test_data[i][1] def test_run(): - urls = ['www.test.com', 'www.test.org', 'www.test.net', 'www.test.onion', - 'www.test.cannotbefound'] - - with pytest.raises(HTTPError): - for url in urls: - page = test_read_first_page(url) - if url[-4:] == '.com': - assert page == 'This is a dot com site.' - elif url[-4:] == '.org': - assert page == 'This is a dot org site.' - elif url[-4:] == '.net': - assert page == 'This is a dot net site.' - elif url[-6:] == '.onion': - assert page == 'This is a dot onion site.' + test_read_first_page() if __name__ == '__main__': diff --git a/torBot.py b/torBot.py index 29734b43..c0767eef 100644 --- a/torBot.py +++ b/torBot.py @@ -157,6 +157,7 @@ def main(conn=False): if args.url: print("Tor IP Address :", pagereader.get_ip()) html_content, response = pagereader.read_first_page(link) + print("Connection successful.") # -m/--mail if args.mail: emails = getemails.getMails(html_content) From 21c5494b6d16fc38e5c966fec2b4107472e2dbe6 Mon Sep 17 00:00:00 2001 From: KingAkeem Date: Tue, 3 Jul 2018 20:15:31 -0400 Subject: [PATCH 2/3] Fixing errors with CodeFactor --- modules/getweblinks.py | 2 +- modules/net_utils.py | 4 ++-- modules/pagereader.py | 2 +- tests/test_getemails.py | 2 +- tests/test_getweblinks.py | 2 +- tests/test_pagereader.py | 6 ++++-- 6 files changed, 10 insertions(+), 8 deletions(-) diff --git a/modules/getweblinks.py b/modules/getweblinks.py index b8b58905..7108663c 100644 --- a/modules/getweblinks.py +++ b/modules/getweblinks.py @@ -1,4 +1,4 @@ -from .net_utils import get_urls_from_page, get_url_status +from modules.net_utils import get_urls_from_page, get_url_status from modules import pagereader from bs4 import BeautifulSoup from modules.bcolors import Bcolors diff --git a/modules/net_utils.py b/modules/net_utils.py index 8d927739..bd2f1b16 100644 --- a/modules/net_utils.py +++ b/modules/net_utils.py @@ -9,8 +9,8 @@ def check_connection(url): print("Attempting to connect to {site}".format(site=url)) if get_url_status(url) != 0: return 1 - else: - return 0 + + return 0 def get_url_status(url, headers=False): diff --git a/modules/pagereader.py b/modules/pagereader.py index b8218507..f7ebc00c 100644 --- a/modules/pagereader.py +++ b/modules/pagereader.py @@ -1,5 +1,5 @@ from bs4 import BeautifulSoup -from .net_utils import get_url_status +from modules.net_utils import get_url_status from modules.bcolors import Bcolors from sys import exit diff --git a/tests/test_getemails.py b/tests/test_getemails.py index 75a8461d..7082a01c 100644 --- a/tests/test_getemails.py +++ b/tests/test_getemails.py @@ -10,7 +10,7 @@ def test_get_emails(): test_emails = ['hello@helloaddress.com'] - doc, tag, text, line = Doc().ttl() + doc, tag, _, line = Doc().ttl() doc.asis('') with tag('html'): with tag('body'): diff --git a/tests/test_getweblinks.py b/tests/test_getweblinks.py index 578471ee..61903790 100644 --- a/tests/test_getweblinks.py +++ b/tests/test_getweblinks.py @@ -16,7 +16,7 @@ def test_get_links(): 'https://wsrs.net/', 'https://cmsgear.com/'] - doc, tag, text, line = Doc().ttl() + doc, tag, _, line = Doc().ttl() doc.asis('') with tag('html'): with tag('body'): diff --git a/tests/test_pagereader.py b/tests/test_pagereader.py index 610b6c0d..2d38c0f0 100644 --- a/tests/test_pagereader.py +++ b/tests/test_pagereader.py @@ -29,8 +29,10 @@ def test_read_first_page(): websites.append(doc.getvalue()) with requests_mock.Mocker() as mock_connection: - for i, website in enumerate(websites): - mock_connection.register_uri('GET', test_data[i][0], text=test_data[i][1]) + for i in range(len(websites)): + mock_connection.register_uri('GET', + test_data[i][0], + text=test_data[i][1]) result = str(pagereader.read_first_page(test_data[i][0])[0]) assert result == test_data[i][1] From c4ff40e3674ae657ccff26e53c095a7898e34ce3 Mon Sep 17 00:00:00 2001 From: KingAkeem Date: Tue, 3 Jul 2018 20:20:23 -0400 Subject: [PATCH 3/3] Adding yattag to requirements for testing --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index c5818be4..253ef4ed 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,4 @@ termcolor==1.1.0 requests==2.18.4 requests_mock==1.4.0 tldextract==2.2.0 - +yattag==1.10.0