Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Sigma Filters] Introducing Sigma Filters (Sigma Defeats) in Alpha / Development Preview #226

Merged
merged 28 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
51bf144
Init Sigma Filters
sifex May 6, 2024
74df6b7
Init Sigma Filters
sifex May 6, 2024
48ee233
wip
sifex May 6, 2024
f08e12c
Updated Sigma Filters with wip
sifex May 25, 2024
43bc557
Merge branch 'refs/heads/main' into sigma-filters
sifex May 25, 2024
6ba223e
linting
sifex May 25, 2024
e4e1764
updating filters again
sifex May 28, 2024
1fc6afd
linting
sifex May 28, 2024
988c035
linting
sifex May 28, 2024
cbf3f39
Moved Filters from a Condition Pipeline to Integrated into the Rule C…
sifex Jun 1, 2024
81d4590
linting
sifex Jun 1, 2024
aa5406d
Fixing some functional operations on filters.py
sifex Jun 1, 2024
7c697ff
Adds some validation coverage plus rule ID matching
sifex Jun 1, 2024
f57c456
Linting
sifex Jun 1, 2024
c69cd4e
Tested with correlation rule
sifex Jun 1, 2024
764a2e2
Testing out correlation fix
sifex Jun 1, 2024
91e549c
Testing out correlation fixes
sifex Jun 1, 2024
1fd16de
Linting
sifex Jun 1, 2024
e230618
Merge branch 'refs/heads/main' into sigma-filters
sifex Jun 1, 2024
583fb3e
Update filters.py
sifex Jun 2, 2024
7bc8f9b
Update filters.py
sifex Jun 2, 2024
0abc699
Wip
sinnwise Jun 2, 2024
961ef2f
Linting
sinnwise Jun 3, 2024
49ddbd7
Renamed `global_filter` to `filter`, removed `SigmaFilterLocation`.
sinnwise Jun 8, 2024
0e7cf14
Minor Reference refactoring
sinnwise Jun 9, 2024
d052385
Ensuring only one condition can be set upon Sigma Filters
sinnwise Jun 9, 2024
7465cc2
Filter condition always as string
thomaspatzke Jun 15, 2024
60bd064
Small improvements
thomaspatzke Jun 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
517 changes: 259 additions & 258 deletions poetry.lock

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions sigma/collection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import dataclass, field
from functools import reduce
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Optional, Union, IO
from uuid import UUID
Expand All @@ -13,6 +14,7 @@
SigmaRuleNotFoundError,
)
from sigma.rule import SigmaRule, SigmaRuleBase
from sigma.filters import SigmaFilter


@dataclass
Expand Down Expand Up @@ -48,8 +50,22 @@ def resolve_rule_references(self):
This must be called before referencing rules are converted into queries to make references available.
"""
for rule in self.rules:
# Resolves all rule references in the rules property to actual Sigma rules.
if isinstance(rule, SigmaCorrelationRule):
rule.resolve_rule_references(self)

# Extract all filters from the rules
filters: List[SigmaFilter] = [rule for rule in self.rules if isinstance(rule, SigmaFilter)]
self.rules = [rule for rule in self.rules if not isinstance(rule, SigmaFilter)]

# Apply filters on each rule and replace the rule with the filtered rule
self.rules = (
[reduce(lambda r, f: f.apply_on_rule(r), filters, rule) for rule in self.rules]
sifex marked this conversation as resolved.
Show resolved Hide resolved
if filters
else self.rules
)

# Sort rules by reference order
self.rules = list(sorted(self.rules))

@classmethod
Expand Down Expand Up @@ -87,6 +103,14 @@ def from_dicts(
source,
)
)
elif "filter" in rule: # correlation rule - no global rule merge
parsed_rules.append(
SigmaFilter.from_dict(
rule,
collect_errors,
source,
)
)
else: # merge with global rule and parse as simple rule
parsed_rules.append(
SigmaRule.from_dict(
Expand Down
18 changes: 18 additions & 0 deletions sigma/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,24 @@ class SigmaTimespanError(SigmaCorrelationRuleError):
pass


class SigmaFilterError(SigmaValueError):
"""Error in Sigma rule filter"""

pass


class SigmaFilterConditionError(SigmaFilterError):
"""Error in Sigma rule filter condition"""

pass


class SigmaFilterRuleReferenceError(SigmaFilterError):
"""Error in Sigma rule filter condition"""

pass


class SigmaCollectionError(SigmaError):
"""Error in Sigma collection, e.g. unknown action"""

Expand Down
208 changes: 208 additions & 0 deletions sigma/filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import random
import re
import string
from dataclasses import dataclass, field
from typing import List, Optional, Union
from uuid import UUID

from sigma import exceptions as sigma_exceptions
from sigma.correlations import SigmaCorrelationRule, SigmaRuleReference
from sigma.exceptions import SigmaRuleLocation
from sigma.rule import (
SigmaLogSource,
SigmaDetections,
SigmaDetection,
SigmaRule,
SigmaRuleBase,
)


@dataclass
class SigmaGlobalFilter(SigmaDetections):
rules: List[SigmaRuleReference] = field(default_factory=list)

@classmethod
def from_dict(
cls, detections: dict, source: Optional[SigmaRuleLocation] = None
) -> "SigmaGlobalFilter":
try:
if isinstance(detections["condition"], str):
condition = [detections["condition"]]
else:
raise sigma_exceptions.SigmaFilterConditionError(
"Sigma filter condition must be a string", source=source
)
except KeyError:
raise sigma_exceptions.SigmaFilterConditionError(
"Sigma filter must contain exactly one condition", source=source
)

try:
if isinstance(detections["rules"], list):
rules = [SigmaRuleReference(detection) for detection in detections["rules"]]
elif isinstance(detections["rules"], str):
rules = [SigmaRuleReference(detections["rules"])]
else:
raise sigma_exceptions.SigmaFilterRuleReferenceError(
"Sigma filter rules field must be a list of Sigma rule IDs or rule names",
source=source,
)
except KeyError:
raise sigma_exceptions.SigmaFilterRuleReferenceError(
"Sigma filter must contain at least a rules section", source=source
)

return cls(
detections={
name: SigmaDetection.from_definition(definition, source)
for name, definition in detections.items()
if name
not in (
"condition",
"rules",
) # TODO Fix standard
},
rules=rules,
condition=condition,
source=source,
)

def to_dict(self) -> dict:
d = super().to_dict()
d.update(
{
"rules": self.rules,
}
)

return d


@dataclass
class SigmaFilter(SigmaRuleBase):
"""
SigmaFilter class is used to represent a Sigma filter object.
"""

logsource: SigmaLogSource = field(default_factory=SigmaLogSource)
filter: SigmaGlobalFilter = field(default_factory=SigmaGlobalFilter)

@classmethod
def from_dict(
cls,
sigma_filter: dict,
collect_errors: bool = False,
source: Optional[SigmaRuleLocation] = None,
) -> "SigmaFilter":
"""
Converts from a dictionary object to a SigmaFilter object.
"""
kwargs, errors = super().from_dict(sigma_filter, collect_errors, source)

# parse log source
filter_logsource = None
try:
filter_logsource = SigmaLogSource.from_dict(sigma_filter["logsource"], source)
except KeyError:
errors.append(
sigma_exceptions.SigmaLogsourceError(
"Sigma filter must have a log source", source=source
)
)
except AttributeError:
errors.append(
sigma_exceptions.SigmaLogsourceError(
"Sigma logsource must be a valid YAML map", source=source
)
)
except sigma_exceptions.SigmaError as e:
errors.append(e)

# parse detections
filter_global_filter = None
try:
filter_global_filter = SigmaGlobalFilter.from_dict(sigma_filter["filter"], source)
except KeyError:
errors.append(
sigma_exceptions.SigmaFilterError(
"Sigma filter must have a filter defined", source=source
)
)
except TypeError:
errors.append(
sigma_exceptions.SigmaFilterError(
"Sigma filter must be a dictionary", source=source
)
)
except sigma_exceptions.SigmaError as e:
errors.append(e)

if not collect_errors and errors:
raise errors[0]

return cls(
logsource=filter_logsource,
filter=filter_global_filter,
errors=errors,
**kwargs,
)

def to_dict(self) -> dict:
"""Convert filter object into dict."""
d = super().to_dict()
d.update(
{
"logsource": self.logsource.to_dict(),
"filter": self.filter.to_dict(),
}
)

return d

def _should_apply_on_rule(self, rule: Union[SigmaRule, SigmaCorrelationRule]) -> bool:
from sigma.collection import SigmaCollection

if not self.filter.rules:
return False

# For each rule ID/title in the filter.rules, add the rule to the reference using the resolve method,
# then filter each reference to see if the rule is in the reference
matches = []
for reference in self.filter.rules:
try:
matches.append(SigmaCollection([rule])[reference.reference])
except sigma_exceptions.SigmaRuleNotFoundError:
pass

if all([match is None for match in matches]):
return False

if rule.logsource not in self.logsource:
return False

return True

def apply_on_rule(
self, rule: Union[SigmaRule, SigmaCorrelationRule]
) -> Union[SigmaRule, SigmaCorrelationRule]:
if not self._should_apply_on_rule(rule):
return rule

for original_cond_name, condition in self.filter.detections.items():
cond_name = "_filt_" + ("".join(random.choices(string.ascii_lowercase, k=10)))

# Replace each instance of the original condition name with the new condition name to avoid conflicts
self.filter.condition[0] = re.sub(
rf"[^ ]*{original_cond_name}[^ ]*",
cond_name,
self.filter.condition[0],
)
rule.detection.detections[cond_name] = condition

for i, condition in enumerate(rule.detection.condition):
rule.detection.condition[i] = f"({condition}) and " + f"({self.filter.condition[0]})"

# Reparse the rule to update the parsed conditions
rule.detection.__post_init__()

return rule
97 changes: 97 additions & 0 deletions tests/files/correlation_rule_valid/correlation_rule.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
title: Base rule
description: This is a base rule
id: 5d8fd9da-6916-45ef-8d4d-3fa9d19d1a64
name: base_rule
status: test
level: informational
date: 2023-12-09
logsource:
category: test
detection:
selection:
fieldA: value1
fieldB: value2
condition: selection
---
title: Multiple occurrences of base event
description: This is a correlation rule
id: 4db3cdb5-aac6-4827-a756-d99475865d32
status: test
level: medium
date: 2023-12-09
correlation:
type: event_count
rules:
- base_rule
group-by:
- fieldC
- fieldD
timespan: 15m
condition:
gte: 10
---
title: Multiple occurrences of base event with different values
description: This is a correlation rule
id: 16a288b8-4ed2-440f-9984-7a128e86e006
date: 2023-12-09
status: test
level: medium
correlation:
type: value_count
rules:
- base_rule
group-by:
- fieldC
timespan: 15m
condition:
lt: 10
field: fieldD
---
title: Base rule 1
description: This is a base rule
id: 73a9addc-0cf9-44a4-ad18-e3b7a9d1eeb4
name: base_rule_1
date: 2023-12-09
status: test
level: informational
logsource:
category: test
detection:
selection:
fieldA: value1
fieldB: value2
condition: selection
---
title: Base rule 2
description: This is a base rule
id: fce9c855-2951-4e96-b764-dbc1dfdf4993
name: base_rule_2
date: 2023-12-09
status: test
level: informational
logsource:
category: test
detection:
selection:
fieldA: value3
fieldB: value4
condition: selection
---
title: Temporal correlation rule
description: This is a correlation rule
id: dc48f97e-237d-42f4-a136-39c94cd53a17
date: 2023-12-09
status: test
level: high
correlation:
type: temporal
rules:
- base_rule_1
- base_rule_2
aliases:
field:
base_rule_1: fieldC
base_rule_2: fieldD
group-by:
- fieldC
timespan: 15m
11 changes: 11 additions & 0 deletions tests/files/filter_valid/filter_out_domain_controllers.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
title: Filter Out Domain Controllers
description: Filters out any matching events from COMPANYNAME Domain Controllers
logsource:
product: windows
filter:
rules:
- 951f8d29-1234-1234-1234-0673ff105e6f # CodeIntegrity - Unsigned Kernel Module Loaded
- 5d8fd9da-6916-45ef-8d4d-3fa9d19d1a64
selection:
ComputerName|startswith: 'DC-'
condition: not selection
Loading