diff --git a/changedetectionio/apprise_asset.py b/changedetectionio/apprise_asset.py deleted file mode 100644 index 949569495f2..00000000000 --- a/changedetectionio/apprise_asset.py +++ /dev/null @@ -1,12 +0,0 @@ -from changedetectionio import apprise_plugin -import apprise - -# Create our AppriseAsset and populate it with some of our new values: -# https://github.com/caronc/apprise/wiki/Development_API#the-apprise-asset-object -asset = apprise.AppriseAsset( - image_url_logo='https://raw.githubusercontent.com/dgtlmoon/changedetection.io/master/changedetectionio/static/images/avatar-256x256.png' -) - -asset.app_id = "changedetection.io" -asset.app_desc = "ChangeDetection.io best and simplest website monitoring and change detection" -asset.app_url = "https://changedetection.io" diff --git a/changedetectionio/apprise_plugin/__init__.py b/changedetectionio/apprise_plugin/__init__.py deleted file mode 100644 index dc434673333..00000000000 --- a/changedetectionio/apprise_plugin/__init__.py +++ /dev/null @@ -1,98 +0,0 @@ -# include the decorator -from apprise.decorators import notify -from loguru import logger -from requests.structures import CaseInsensitiveDict - - -@notify(on="delete") -@notify(on="deletes") -@notify(on="get") -@notify(on="gets") -@notify(on="post") -@notify(on="posts") -@notify(on="put") -@notify(on="puts") -def apprise_custom_api_call_wrapper(body, title, notify_type, *args, **kwargs): - import requests - import json - import re - - from urllib.parse import unquote_plus - from apprise.utils.parse import parse_url as apprise_parse_url - - url = kwargs['meta'].get('url') - schema = kwargs['meta'].get('schema').lower().strip() - - # Choose POST, GET etc from requests - method = re.sub(rf's$', '', schema) - requests_method = getattr(requests, method) - - params = CaseInsensitiveDict({}) # Added to requests - auth = None - has_error = False - - # Convert /foobar?+some-header=hello to proper header dictionary - results = apprise_parse_url(url) - - # Add our headers that the user can potentially over-ride if they wish - # to to our returned result set and tidy entries by unquoting them - headers = CaseInsensitiveDict({unquote_plus(x): unquote_plus(y) - for x, y in results['qsd+'].items()}) - - # https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation - # In Apprise, it relies on prefixing each request arg with "-", because it uses say &method=update as a flag for apprise - # but here we are making straight requests, so we need todo convert this against apprise's logic - for k, v in results['qsd'].items(): - if not k.strip('+-') in results['qsd+'].keys(): - params[unquote_plus(k)] = unquote_plus(v) - - # Determine Authentication - auth = '' - if results.get('user') and results.get('password'): - auth = (unquote_plus(results.get('user')), unquote_plus(results.get('user'))) - elif results.get('user'): - auth = (unquote_plus(results.get('user'))) - - # If it smells like it could be JSON and no content-type was already set, offer a default content type. - if body and '{' in body[:100] and not headers.get('Content-Type'): - json_header = 'application/json; charset=utf-8' - try: - # Try if it's JSON - json.loads(body) - headers['Content-Type'] = json_header - except ValueError as e: - logger.warning(f"Could not automatically add '{json_header}' header to the notification because the document failed to parse as JSON: {e}") - pass - - # POSTS -> HTTPS etc - if schema.lower().endswith('s'): - url = re.sub(rf'^{schema}', 'https', results.get('url')) - else: - url = re.sub(rf'^{schema}', 'http', results.get('url')) - - status_str = '' - try: - r = requests_method(url, - auth=auth, - data=body.encode('utf-8') if type(body) is str else body, - headers=headers, - params=params - ) - - if not (200 <= r.status_code < 300): - status_str = f"Error sending '{method.upper()}' request to {url} - Status: {r.status_code}: '{r.reason}'" - logger.error(status_str) - has_error = True - else: - logger.info(f"Sent '{method.upper()}' request to {url}") - has_error = False - - except requests.RequestException as e: - status_str = f"Error sending '{method.upper()}' request to {url} - {str(e)}" - logger.error(status_str) - has_error = True - - if has_error: - raise TypeError(status_str) - - return True diff --git a/changedetectionio/apprise_plugin/assets.py b/changedetectionio/apprise_plugin/assets.py new file mode 100644 index 00000000000..4e6392efc0e --- /dev/null +++ b/changedetectionio/apprise_plugin/assets.py @@ -0,0 +1,16 @@ +from apprise import AppriseAsset + +# Refer to: +# https://github.com/caronc/apprise/wiki/Development_API#the-apprise-asset-object + +APPRISE_APP_ID = "changedetection.io" +APPRISE_APP_DESC = "ChangeDetection.io best and simplest website monitoring and change detection" +APPRISE_APP_URL = "https://changedetection.io" +APPRISE_AVATAR_URL = "https://raw.githubusercontent.com/dgtlmoon/changedetection.io/master/changedetectionio/static/images/avatar-256x256.png" + +apprise_asset = AppriseAsset( + app_id=APPRISE_APP_ID, + app_desc=APPRISE_APP_DESC, + app_url=APPRISE_APP_URL, + image_url_logo=APPRISE_AVATAR_URL, +) diff --git a/changedetectionio/apprise_plugin/custom_handlers.py b/changedetectionio/apprise_plugin/custom_handlers.py new file mode 100644 index 00000000000..1fd28b41616 --- /dev/null +++ b/changedetectionio/apprise_plugin/custom_handlers.py @@ -0,0 +1,112 @@ +import json +import re +from urllib.parse import unquote_plus + +import requests +from apprise.decorators import notify +from apprise.utils.parse import parse_url as apprise_parse_url +from loguru import logger +from requests.structures import CaseInsensitiveDict + +SUPPORTED_HTTP_METHODS = {"get", "post", "put", "delete", "patch", "head"} + + +def notify_supported_methods(func): + for method in SUPPORTED_HTTP_METHODS: + func = notify(on=method)(func) + # Add support for https, for each supported http method + func = notify(on=f"{method}s")(func) + return func + + +def _get_auth(parsed_url: dict) -> str | tuple[str, str]: + user: str | None = parsed_url.get("user") + password: str | None = parsed_url.get("password") + + if user is not None and password is not None: + return (unquote_plus(user), unquote_plus(password)) + + if user is not None: + return unquote_plus(user) + + return "" + + +def _get_headers(parsed_url: dict, body: str) -> CaseInsensitiveDict: + headers = CaseInsensitiveDict( + {unquote_plus(k).title(): unquote_plus(v) for k, v in parsed_url["qsd+"].items()} + ) + + # If Content-Type is not specified, guess if the body is a valid JSON + if headers.get("Content-Type") is None: + try: + json.loads(body) + headers["Content-Type"] = "application/json; charset=utf-8" + except Exception: + pass + + return headers + + +def _get_params(parsed_url: dict) -> CaseInsensitiveDict: + # https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation + # In Apprise, it relies on prefixing each request arg with "-", because it uses say &method=update as a flag for apprise + # but here we are making straight requests, so we need todo convert this against apprise's logic + params = CaseInsensitiveDict( + { + unquote_plus(k): unquote_plus(v) + for k, v in parsed_url["qsd"].items() + if k.strip("-") not in parsed_url["qsd-"] + and k.strip("+") not in parsed_url["qsd+"] + } + ) + + return params + + +@notify_supported_methods +def apprise_http_custom_handler( + body: str, + title: str, + notify_type: str, + meta: dict, + *args, + **kwargs, +) -> bool: + url: str = meta.get("url") + schema: str = meta.get("schema") + method: str = re.sub(r"s$", "", schema).upper() + + # Convert /foobar?+some-header=hello to proper header dictionary + parsed_url: dict[str, str | dict | None] | None = apprise_parse_url(url) + if parsed_url is None: + return False + + auth = _get_auth(parsed_url=parsed_url) + headers = _get_headers(parsed_url=parsed_url, body=body) + params = _get_params(parsed_url=parsed_url) + + url = re.sub(rf"^{schema}", "https" if schema.endswith("s") else "http", parsed_url.get("url")) + + try: + response = requests.request( + method=method, + url=url, + auth=auth, + headers=headers, + params=params, + data=body.encode("utf-8") if isinstance(body, str) else body, + ) + + response.raise_for_status() + + logger.info(f"Successfully sent custom notification to {url}") + return True + + except requests.RequestException as e: + logger.error(f"Remote host error while sending custom notification to {url}: {e}") + return False + + except Exception as e: + logger.error(f"Unexpected error occurred while sending custom notification to {url}: {e}") + return False diff --git a/changedetectionio/blueprint/ui/notification.py b/changedetectionio/blueprint/ui/notification.py index a946917b492..ac23ac3c145 100644 --- a/changedetectionio/blueprint/ui/notification.py +++ b/changedetectionio/blueprint/ui/notification.py @@ -4,6 +4,7 @@ from changedetectionio.store import ChangeDetectionStore from changedetectionio.auth_decorator import login_optionally_required +from changedetectionio.notification import process_notification def construct_blueprint(datastore: ChangeDetectionStore): notification_blueprint = Blueprint('ui_notification', __name__, template_folder="../ui/templates") @@ -17,11 +18,10 @@ def ajax_callback_send_notification_test(watch_uuid=None): # Watch_uuid could be unset in the case it`s used in tag editor, global settings import apprise - from changedetectionio.apprise_asset import asset - apobj = apprise.Apprise(asset=asset) + from ...apprise_plugin.assets import apprise_asset + from ...apprise_plugin.custom_handlers import apprise_http_custom_handler # noqa: F401 + apobj = apprise.Apprise(asset=apprise_asset) - # so that the custom endpoints are registered - from changedetectionio.apprise_plugin import apprise_custom_api_call_wrapper is_global_settings_form = request.args.get('mode', '') == 'global-settings' is_group_settings_form = request.args.get('mode', '') == 'group-settings' @@ -90,7 +90,6 @@ def ajax_callback_send_notification_test(watch_uuid=None): n_object['as_async'] = False n_object.update(watch.extra_notification_token_values()) - from changedetectionio.notification import process_notification sent_obj = process_notification(n_object, datastore) except Exception as e: diff --git a/changedetectionio/forms.py b/changedetectionio/forms.py index 3fd199bb5c9..eae187f47e2 100644 --- a/changedetectionio/forms.py +++ b/changedetectionio/forms.py @@ -305,10 +305,10 @@ def __init__(self, message=None): def __call__(self, form, field): import apprise - apobj = apprise.Apprise() + from .apprise_plugin.assets import apprise_asset + from .apprise_plugin.custom_handlers import apprise_http_custom_handler # noqa: F401 - # so that the custom endpoints are registered - from .apprise_asset import asset + apobj = apprise.Apprise(asset=apprise_asset) for server_url in field.data: url = server_url.strip() diff --git a/changedetectionio/notification.py b/changedetectionio/notification.py index 7eed328d129..857a137d53c 100644 --- a/changedetectionio/notification.py +++ b/changedetectionio/notification.py @@ -4,6 +4,9 @@ import apprise from loguru import logger +from .apprise_plugin.assets import APPRISE_AVATAR_URL +from .apprise_plugin.custom_handlers import apprise_http_custom_handler # noqa: F401 +from .safe_jinja import render as jinja_render valid_tokens = { 'base_url': '', @@ -39,10 +42,6 @@ def process_notification(n_object, datastore): - # so that the custom endpoints are registered - from changedetectionio.apprise_plugin import apprise_custom_api_call_wrapper - - from .safe_jinja import render as jinja_render now = time.time() if n_object.get('notification_timestamp'): logger.trace(f"Time since queued {now-n_object['notification_timestamp']:.3f}s") @@ -66,12 +65,12 @@ def process_notification(n_object, datastore): # raise it as an exception sent_objs = [] - from .apprise_asset import asset + from .apprise_plugin.assets import apprise_asset if 'as_async' in n_object: - asset.async_mode = n_object.get('as_async') + apprise_asset.async_mode = n_object.get('as_async') - apobj = apprise.Apprise(debug=True, asset=asset) + apobj = apprise.Apprise(debug=True, asset=apprise_asset) if not n_object.get('notification_urls'): return None @@ -112,7 +111,7 @@ def process_notification(n_object, datastore): and not url.startswith('get') \ and not url.startswith('delete') \ and not url.startswith('put'): - url += k + 'avatar_url=https://raw.githubusercontent.com/dgtlmoon/changedetection.io/master/changedetectionio/static/images/avatar-256x256.png' + url += k + f"avatar_url={APPRISE_AVATAR_URL}" if url.startswith('tgram://'): # Telegram only supports a limit subset of HTML, remove the '
' we place in. diff --git a/changedetectionio/tests/apprise/test_apprise_asset.py b/changedetectionio/tests/apprise/test_apprise_asset.py new file mode 100644 index 00000000000..6e86868ddcb --- /dev/null +++ b/changedetectionio/tests/apprise/test_apprise_asset.py @@ -0,0 +1,24 @@ +import pytest +from apprise import AppriseAsset + +from changedetectionio.apprise_asset import ( + APPRISE_APP_DESC, + APPRISE_APP_ID, + APPRISE_APP_URL, + APPRISE_AVATAR_URL, +) + + +@pytest.fixture(scope="function") +def apprise_asset() -> AppriseAsset: + from changedetectionio.apprise_asset import apprise_asset + + return apprise_asset + + +def test_apprise_asset_init(apprise_asset: AppriseAsset): + assert isinstance(apprise_asset, AppriseAsset) + assert apprise_asset.app_id == APPRISE_APP_ID + assert apprise_asset.app_desc == APPRISE_APP_DESC + assert apprise_asset.app_url == APPRISE_APP_URL + assert apprise_asset.image_url_logo == APPRISE_AVATAR_URL diff --git a/changedetectionio/tests/apprise/test_apprise_custom_api_call.py b/changedetectionio/tests/apprise/test_apprise_custom_api_call.py new file mode 100644 index 00000000000..45271051d03 --- /dev/null +++ b/changedetectionio/tests/apprise/test_apprise_custom_api_call.py @@ -0,0 +1,211 @@ +import json +from unittest.mock import patch + +import pytest +import requests +from apprise.utils.parse import parse_url as apprise_parse_url + +from ...apprise_plugin.custom_handlers import ( + _get_auth, + _get_headers, + _get_params, + apprise_http_custom_handler, + SUPPORTED_HTTP_METHODS, +) + + +@pytest.mark.parametrize( + "url,expected_auth", + [ + ("get://user:pass@localhost:9999", ("user", "pass")), + ("get://user@localhost:9999", "user"), + ("get://localhost:9999", ""), + ("get://user%20name:pass%20word@localhost:9999", ("user name", "pass word")), + ], +) +def test_get_auth(url, expected_auth): + """Test authentication extraction with various URL formats.""" + parsed_url = apprise_parse_url(url) + assert _get_auth(parsed_url) == expected_auth + + +@pytest.mark.parametrize( + "url,body,expected_content_type", + [ + ( + "get://localhost:9999?+content-type=application/xml", + "test", + "application/xml", + ), + ("get://localhost:9999", '{"key": "value"}', "application/json; charset=utf-8"), + ("get://localhost:9999", "plain text", None), + ("get://localhost:9999?+content-type=text/plain", "test", "text/plain"), + ], +) +def test_get_headers(url, body, expected_content_type): + """Test header extraction and content type detection.""" + parsed_url = apprise_parse_url(url) + headers = _get_headers(parsed_url, body) + + if expected_content_type: + assert headers.get("Content-Type") == expected_content_type + + +@pytest.mark.parametrize( + "url,expected_params", + [ + ("get://localhost:9999?param1=value1", {"param1": "value1"}), + ("get://localhost:9999?param1=value1&-param2=ignored", {"param1": "value1"}), + ("get://localhost:9999?param1=value1&+header=test", {"param1": "value1"}), + ( + "get://localhost:9999?encoded%20param=encoded%20value", + {"encoded param": "encoded value"}, + ), + ], +) +def test_get_params(url, expected_params): + """Test parameter extraction with URL encoding and exclusion logic.""" + parsed_url = apprise_parse_url(url) + params = _get_params(parsed_url) + assert dict(params) == expected_params + + +@pytest.mark.parametrize( + "url,schema,method", + [ + ("get://localhost:9999", "get", "GET"), + ("post://localhost:9999", "post", "POST"), + ("delete://localhost:9999", "delete", "DELETE"), + ], +) +@patch("requests.request") +def test_apprise_custom_api_call_success(mock_request, url, schema, method): + """Test successful API calls with different HTTP methods and schemas.""" + mock_request.return_value.raise_for_status.return_value = None + + meta = {"url": url, "schema": schema} + result = apprise_http_custom_handler( + body="test body", title="Test Title", notify_type="info", meta=meta + ) + + assert result is True + mock_request.assert_called_once() + + call_args = mock_request.call_args + assert call_args[1]["method"] == method.upper() + assert call_args[1]["url"].startswith("http") + + +@patch("requests.request") +def test_apprise_custom_api_call_with_auth(mock_request): + """Test API call with authentication.""" + mock_request.return_value.raise_for_status.return_value = None + + url = "get://user:pass@localhost:9999/secure" + meta = {"url": url, "schema": "get"} + + result = apprise_http_custom_handler( + body=json.dumps({"key": "value"}), + title="Secure Test", + notify_type="info", + meta=meta, + ) + + assert result is True + mock_request.assert_called_once() + call_args = mock_request.call_args + assert call_args[1]["auth"] == ("user", "pass") + + +@pytest.mark.parametrize( + "exception_type,expected_result", + [ + (requests.RequestException, False), + (requests.HTTPError, False), + (Exception, False), + ], +) +@patch("requests.request") +def test_apprise_custom_api_call_failure(mock_request, exception_type, expected_result): + """Test various failure scenarios.""" + url = "get://localhost:9999/error" + meta = {"url": url, "schema": "get"} + + # Simulate different types of exceptions + mock_request.side_effect = exception_type("Error occurred") + + result = apprise_http_custom_handler( + body="error body", title="Error Test", notify_type="error", meta=meta + ) + + assert result == expected_result + + +def test_invalid_url_parsing(): + """Test handling of invalid URL parsing.""" + meta = {"url": "invalid://url", "schema": "invalid"} + result = apprise_http_custom_handler( + body="test", title="Invalid URL", notify_type="info", meta=meta + ) + + assert result is False + + +@pytest.mark.parametrize( + "schema,expected_method", + [ + (http_method, http_method.upper()) + for http_method in SUPPORTED_HTTP_METHODS + ], +) +@patch("requests.request") +def test_http_methods(mock_request, schema, expected_method): + """Test all supported HTTP methods.""" + mock_request.return_value.raise_for_status.return_value = None + + url = f"{schema}://localhost:9999" + + result = apprise_http_custom_handler( + body="test body", + title="Test Title", + notify_type="info", + meta={"url": url, "schema": schema}, + ) + + assert result is True + mock_request.assert_called_once() + + call_args = mock_request.call_args + assert call_args[1]["method"] == expected_method + + +@pytest.mark.parametrize( + "input_schema,expected_method", + [ + (f"{http_method}s", http_method.upper()) + for http_method in SUPPORTED_HTTP_METHODS + ], +) +@patch("requests.request") +def test_https_method_conversion( + mock_request, input_schema, expected_method +): + """Validate that methods ending with 's' use HTTPS and correct HTTP method.""" + mock_request.return_value.raise_for_status.return_value = None + + url = f"{input_schema}://localhost:9999" + + result = apprise_http_custom_handler( + body="test body", + title="Test Title", + notify_type="info", + meta={"url": url, "schema": input_schema}, + ) + + assert result is True + mock_request.assert_called_once() + + call_args = mock_request.call_args + + assert call_args[1]["method"] == expected_method + assert call_args[1]["url"].startswith("https")