Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically apply any XML/RSS namespaces #2621

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changedetectionio/flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,12 @@ def edit_page(uuid):
for p in datastore.proxy_list:
form.proxy.choices.append(tuple((p, datastore.proxy_list[p]['label'])))

# Add some HTML to be used for form validation
if datastore.data['watching'][uuid].history.keys():
timestamp = list(datastore.data['watching'][uuid].history.keys())[-1]
form.last_html_for_form_validation = datastore.data['watching'][uuid].get_fetched_html(timestamp)
else:
form.last_html_for_form_validation = "<html><body></body></html>"

if request.method == 'POST' and form.validate():

Expand Down
52 changes: 21 additions & 31 deletions changedetectionio/forms.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os
import re

import elementpath

from changedetectionio.html_tools import xpath_filter, xpath1_filter
from changedetectionio.strtobool import strtobool

from wtforms import (
Expand Down Expand Up @@ -322,52 +325,39 @@ def __init__(self, message=None, allow_xpath=True, allow_json=True):
self.allow_json = allow_json

def __call__(self, form, field):

from lxml.etree import XPathEvalError
if isinstance(field.data, str):
data = [field.data]
else:
data = field.data

for line in data:
# Nothing to see here
if not len(line.strip()):
return

# Does it look like XPath?
if line.strip()[0] == '/' or line.strip().startswith('xpath:'):
if not self.allow_xpath:
raise ValidationError("XPath not permitted in this field!")
from lxml import etree, html
import elementpath
# xpath 2.0-3.1
from elementpath.xpath3 import XPath3Parser
tree = html.fromstring("<html></html>")
line = line.replace('xpath:', '')
line = line.strip()

try:
elementpath.select(tree, line.strip(), parser=XPath3Parser)
except elementpath.ElementPathError as e:
message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
raise ValidationError(message % (line, str(e)))
except:
raise ValidationError("A system-error occurred when validating your XPath expression")
if not line:
continue

if line.strip().startswith('xpath1:'):
if line.startswith('xpath') or line.startswith('/'):
if not self.allow_xpath:
raise ValidationError("XPath not permitted in this field!")
from lxml import etree, html
tree = html.fromstring("<html></html>")
line = re.sub(r'^xpath1:', '', line)

if line.startswith('xpath1:'):
filter_function = xpath1_filter
else:
line = line.replace('xpath:', '')
filter_function = xpath_filter

try:
tree.xpath(line.strip())
except etree.XPathEvalError as e:
# Call the determined function
res = filter_function(xpath_filter=line, html_content=form.last_html_for_form_validation)
# It's OK if this is an empty result, we just want to check that it doesn't crash the parser
except (elementpath.ElementPathError,XPathEvalError) as e:
message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
raise ValidationError(message % (line, str(e)))
except:
except Exception as e:
raise ValidationError("A system-error occurred when validating your XPath expression")

if 'json:' in line:
elif 'json:' in line:
if not self.allow_json:
raise ValidationError("JSONPath not permitted in this field!")

Expand All @@ -392,7 +382,7 @@ def __call__(self, form, field):
if not self.allow_json:
raise ValidationError("jq not permitted in this field!")

if 'jq:' in line:
elif line.startswith('jq:'):
try:
import jq
except ModuleNotFoundError:
Expand Down
24 changes: 23 additions & 1 deletion changedetectionio/html_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import json
import re

from loguru import logger

# HTML added to be sure each result matching a filter (.example) gets converted to a new line by Inscriptis
TEXT_FILTER_LIST_LINE_SUFFIX = "<br>"
Expand Down Expand Up @@ -108,6 +109,20 @@ def elementpath_tostring(obj):

return str(obj)

def extract_namespaces(xml_content):
"""
Extracts all namespaces from the XML content.
"""
from lxml import etree
from io import BytesIO

it = etree.iterparse(BytesIO(xml_content), events=('start-ns',))
namespaces = {}
for _, ns in it:
prefix, uri = ns
namespaces[prefix] = uri
return namespaces

# Return str Utf-8 of matched rules
def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False, is_rss=False):
from lxml import etree, html
Expand All @@ -123,7 +138,14 @@ def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False
tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser)
html_block = ""

r = elementpath.select(tree, xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'}, parser=XPath3Parser)
# Automatically extract all namespaces from the XML content
namespaces = {'re': 'http://exslt.org/regular-expressions'}
try:
namespaces.update(extract_namespaces(html_content.encode('utf-8')))
except Exception as e:
logger.warning(f"Problem extracting namespaces from HTMl/XML content {str(e)}")

r = elementpath.select(tree, xpath_filter.strip(), namespaces=namespaces, parser=XPath3Parser)
#@note: //title/text() wont work where <title>CDATA..

if type(r) != list:
Expand Down
7 changes: 4 additions & 3 deletions changedetectionio/processors/text_json_diff/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ def run_changedetection(self, watch, skip_when_checksum_same=True):

ctype_header = self.fetcher.get_all_headers().get('content-type', '').lower()
# Go into RSS preprocess for converting CDATA/comment to usable text
if any(substring in ctype_header for substring in ['application/xml', 'application/rss', 'text/xml']):
if '<rss' in self.fetcher.content[:100].lower():
# Ctype_header could be unset if we are just reprocessing the existin content
if any(substring in ctype_header for substring in ['application/xml', 'application/rss', 'text/xml']) or not ctype_header:
top_text = self.fetcher.content[:200].lower().strip()
if '<rss' in top_text or 'search.yahoo.com/mrss/' in top_text:
self.fetcher.content = cdata_in_document_to_text(html_content=self.fetcher.content)
is_rss = True

# source: support, basically treat it as plaintext
if watch.is_source_type_url:
is_html = False
Expand Down
43 changes: 43 additions & 0 deletions changedetectionio/tests/test_rss.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,46 @@ def test_rss_xpath_filtering(client, live_server, measure_memory_usage):
assert b'Some other description' not in res.data # Should NOT be selected by the xpath

res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)

def test_namespace_selectors(live_server, client):
set_original_cdata_xml()
#live_server_setup(live_server)

test_url = url_for('test_endpoint', content_type="application/xml", _external=True)

res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)

assert b"1 Imported" in res.data

wait_for_all_checks(client)

uuid = extract_UUID_from_client(client)
# because it will look for the namespaced stuff during form validation, but on the first check it wont exist..
res = client.post(
url_for("edit_page", uuid=uuid),
data={
"include_filters": "//media:thumbnail/@url",
"fetch_backend": "html_requests",
"headers": "",
"proxy": "no-proxy",
"tags": "",
"url": test_url,
},
follow_redirects=True
)

wait_for_all_checks(client)

res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
assert b'CDATA' not in res.data
assert b'<![' not in res.data
assert b'https://testsite.com/thumbnail-c224e10d81488e818701c981da04869e.jpg' in res.data

res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
Loading