Skip to content

Excavate module code hygiene #2181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 23, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 108 additions & 102 deletions bbot/modules/internal/excavate.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ def _exclude_key(original_dict, key_to_exclude):


def extract_params_url(parsed_url):
"""
Yields query parameters from a parsed URL.

Args:
parsed_url (ParseResult): The URL to extract parameters from.

Yields:
tuple: Contains the hardcoded HTTP method ('GET'), parsed URL, parameter name,
original value, source (hardcoded to 'direct_url'), and additional parameters
(all parameters excluding the current one).
"""
params = parse_qs(parsed_url.query)
flat_params = {k: v[0] for k, v in params.items()}

Expand Down Expand Up @@ -342,6 +353,7 @@ class excavateTestRule(ExcavateRule):
yara_rule_regex = re.compile(r"(?s)((?:rule\s+\w+\s*{[^{}]*(?:{[^{}]*}[^{}]*)*[^{}]*(?:/\S*?}[^/]*?/)*)*})")

def in_bl(self, value):
# Check if the value is in the blacklist or starts with a blacklisted prefix.
lower_value = value.lower()

if lower_value in self.parameter_blacklist:
Expand All @@ -354,6 +366,7 @@ def in_bl(self, value):
return False

def url_unparse(self, param_type, parsed_url):
# Reconstructs a URL, optionally omitting the query string based on retain_querystring configuration value.
if param_type == "GETPARAM":
querystring = ""
else:
Expand Down Expand Up @@ -598,7 +611,6 @@ async def process(self, yara_results, event, yara_rule_settings, discovery_conte
f"Found Parameter [{parameter_name}] in [{parameterExtractorSubModule.name}] ParameterExtractor Submodule"
)
# If we have a full URL, leave it as-is

if endpoint and endpoint.startswith(("http://", "https://")):
url = endpoint

Expand Down Expand Up @@ -922,6 +934,34 @@ async def extract_yara_rules(self, rules_content):
for r in await self.helpers.re.findall(self.yara_rule_regex, rules_content):
yield r

async def emit_web_parameter(self, host, param_type, name, original_value, url, description, additional_params, event, context):
data = {
"host": host,
"type": param_type,
"name": name,
"original_value": original_value,
"url": url,
"description": description,
"additional_params": additional_params,
}
await self.emit_event(data, "WEB_PARAMETER", event, context=context)

async def emit_custom_parameters(self, event, config_key, param_type, description_suffix):
# Emits WEB_PARAMETER events for custom headers and cookies from the configuration.
custom_params = self.scan.web_config.get(config_key, {})
for param_name, param_value in custom_params.items():
await self.emit_web_parameter(
host=event.parsed_url.hostname,
param_type=param_type,
name=param_name,
original_value=param_value,
url=self.url_unparse(param_type, event.parsed_url),
description=f"HTTP Extracted Parameter [{param_name}] ({description_suffix})",
additional_params=_exclude_key(custom_params, param_name),
event=event,
context=f"Excavate saw a custom {param_type.lower()} set [{param_name}], and emitted a WEB_PARAMETER for it"
)

async def setup(self):
self.yara_rules_dict = {}
self.yara_preprocess_dict = {}
Expand Down Expand Up @@ -1029,21 +1069,17 @@ async def search(self, data, event, content_type, discovery_context="HTTP respon
results = extract_func(data)
if results:
for parameter_name, original_value in results:
description = (
f"HTTP Extracted Parameter (speculative from {source_type} content) [{parameter_name}]"
await self.emit_web_parameter(
host=str(event.host),
param_type="SPECULATIVE",
name=parameter_name,
original_value=original_value,
url=str(event.data["url"]),
description=f"HTTP Extracted Parameter (speculative from {source_type} content) [{parameter_name}]",
additional_params={},
event=event,
context=f"excavate's Parameter extractor found a speculative WEB_PARAMETER: {parameter_name} by parsing {source_type} data from {str(event.host)}"
)
data = {
"host": str(event.host),
"type": "SPECULATIVE",
"name": parameter_name,
"original_value": original_value,
"url": str(event.data["url"]),
"additional_params": {},
"assigned_cookies": self.assigned_cookies,
"description": description,
}
context = f"excavate's Parameter extractor found a speculative WEB_PARAMETER: {parameter_name} by parsing {source_type} data from {str(event.host)}"
await self.emit_event(data, "WEB_PARAMETER", event, context=context)
return

# Initialize the list of data items to process
Expand Down Expand Up @@ -1073,72 +1109,40 @@ async def search(self, data, event, content_type, discovery_context="HTTP respon

async def handle_event(self, event):
if event.type == "HTTP_RESPONSE":
# Harvest GET parameters from URL, if it came directly from the target, and parameter extraction is enabled
if (
self.parameter_extraction is True
and self.url_querystring_remove is False
and str(event.parent.parent.module) == "TARGET"
):
self.debug(f"Processing target URL [{urlunparse(event.parsed_url)}] for GET parameters")
for (
method,
parsed_url,
parameter_name,
original_value,
regex_name,
additional_params,
) in extract_params_url(event.parsed_url):
if self.in_bl(parameter_name) is False:
description = f"HTTP Extracted Parameter [{parameter_name}] (Target URL)"
data = {
"host": parsed_url.hostname,
"type": "GETPARAM",
"name": parameter_name,
"original_value": original_value,
"url": self.url_unparse("GETPARAM", parsed_url),
"description": description,
"additional_params": additional_params,
}
context = f"Excavate parsed a URL directly from the scan target for parameters and found [GETPARAM] Parameter Name: [{parameter_name}] and emitted a WEB_PARAMETER for it"
await self.emit_event(data, "WEB_PARAMETER", event, context=context)

# If parameter_extraction is enabled and we assigned custom headers, emit them as WEB_PARAMETER

if self.parameter_extraction is True:
custom_cookies = self.scan.web_config.get("http_cookies", {})
for custom_cookie_name, custom_cookie_value in custom_cookies.items():
description = f"HTTP Extracted Parameter [{custom_cookie_name}] (Custom Cookie)"
data = {
"host": event.parsed_url.hostname,
"type": "COOKIE",
"name": custom_cookie_name,
"original_value": custom_cookie_value,
"url": self.url_unparse("COOKIE", event.parsed_url),
"description": description,
"additional_params": _exclude_key(custom_cookies, custom_cookie_name),
}
context = (
f"Excavate saw a custom cookie set [{custom_cookie_name}], and emitted a WEB_PARAMETER for it"
)
await self.emit_event(data, "WEB_PARAMETER", event, context=context)

custom_headers = self.scan.web_config.get("http_headers", {})
for custom_header_name, custom_header_value in custom_headers.items():
description = f"HTTP Extracted Parameter [{custom_header_name}] (Custom Header)"
data = {
"host": event.parsed_url.hostname,
"type": "HEADER",
"name": custom_header_name,
"original_value": custom_header_value,
"url": self.url_unparse("HEADER", event.parsed_url),
"description": description,
"additional_params": _exclude_key(custom_headers, custom_header_name),
}
context = (
f"Excavate saw a custom header set [{custom_header_name}], and emitted a WEB_PARAMETER for it"
)
await self.emit_event(data, "WEB_PARAMETER", event, context=context)
# if parameter extraction is enabled, and we have custom cookies or headers, emit them as WEB_PARAMETER events
await self.emit_custom_parameters(event, "http_cookies", "COOKIE", "Custom Cookie")
await self.emit_custom_parameters(event, "http_headers", "HEADER", "Custom Header")

# if parameter extraction is enabled, and querystring removal is disabled, and the event is directly from the TARGET, create a WEB
if (
self.url_querystring_remove is False
and str(event.parent.parent.module) == "TARGET"
):
self.debug(f"Processing target URL [{urlunparse(event.parsed_url)}] for GET parameters")
for (
method,
parsed_url,
parameter_name,
original_value,
regex_name,
additional_params,
) in extract_params_url(event.parsed_url):
if self.in_bl(parameter_name) is False:
await self.emit_web_parameter(
host=parsed_url.hostname,
param_type="GETPARAM",
name=parameter_name,
original_value=original_value,
url=self.url_unparse("GETPARAM", parsed_url),
description=f"HTTP Extracted Parameter [{parameter_name}] (Target URL)",
additional_params=additional_params,
event=event,
context=f"Excavate parsed a URL directly from the scan target for parameters and found [GETPARAM] Parameter Name: [{parameter_name}] and emitted a WEB_PARAMETER for it"
)


data = event.data

# process response data
body = event.data.get("body", "")
Expand All @@ -1152,6 +1156,7 @@ async def handle_event(self, event):

for header, header_values in headers.items():
for header_value in header_values:
# Process 'set-cookie' headers to extract and emit cookies as WEB_PARAMETER events.
if header.lower() == "set-cookie" and self.parameter_extraction:
if "=" not in header_value:
self.debug(f"Cookie found without '=': {header_value}")
Expand All @@ -1162,19 +1167,20 @@ async def handle_event(self, event):

if self.in_bl(cookie_name) is False:
self.assigned_cookies[cookie_name] = cookie_value
description = f"Set-Cookie Assigned Cookie [{cookie_name}]"
data = {
"host": str(event.host),
"type": "COOKIE",
"name": cookie_name,
"original_value": cookie_value,
"url": self.url_unparse("COOKIE", event.parsed_url),
"description": description,
}
context = f"Excavate noticed a set-cookie header for cookie [{cookie_name}] and emitted a WEB_PARAMETER for it"
await self.emit_event(data, "WEB_PARAMETER", event, context=context)
await self.emit_web_parameter(
host=str(event.host),
param_type="COOKIE",
name=cookie_name,
original_value=cookie_value,
url=self.url_unparse("COOKIE", event.parsed_url),
description=f"Set-Cookie Assigned Cookie [{cookie_name}]",
additional_params={},
event=event,
context=f"Excavate noticed a set-cookie header for cookie [{cookie_name}] and emitted a WEB_PARAMETER for it"
)
else:
self.debug(f"blocked cookie parameter [{cookie_name}] due to BL match")
# Handle 'location' headers to process and emit redirect URLs as URL_UNVERIFIED events.
if header.lower() == "location":
redirect_location = getattr(event, "redirect_location", "")
if redirect_location:
Expand Down Expand Up @@ -1205,18 +1211,17 @@ async def handle_event(self, event):
additional_params,
) in extract_params_location(header_value, event.parsed_url):
if self.in_bl(parameter_name) is False:
description = f"HTTP Extracted Parameter [{parameter_name}] (Location Header)"
data = {
"host": parsed_url.hostname,
"type": "GETPARAM",
"name": parameter_name,
"original_value": original_value,
"url": self.url_unparse("GETPARAM", parsed_url),
"description": description,
"additional_params": additional_params,
}
context = f"Excavate parsed a location header for parameters and found [GETPARAM] Parameter Name: [{parameter_name}] and emitted a WEB_PARAMETER for it"
await self.emit_event(data, "WEB_PARAMETER", event, context=context)
await self.emit_web_parameter(
host=parsed_url.hostname,
param_type="GETPARAM",
name=parameter_name,
original_value=original_value,
url=self.url_unparse("GETPARAM", parsed_url),
description=f"HTTP Extracted Parameter [{parameter_name}] (Location Header)",
additional_params=additional_params,
event=event,
context=f"Excavate parsed a location header for parameters and found [GETPARAM] Parameter Name: [{parameter_name}] and emitted a WEB_PARAMETER for it"
)
else:
self.warning("location header found but missing redirect_location in HTTP_RESPONSE")
if header.lower() == "content-type":
Expand Down Expand Up @@ -1249,3 +1254,4 @@ async def handle_event(self, event):
content_type="",
discovery_context="Parsed file content",
)

Loading