Skip to content

Commit

Permalink
Merge pull request #130 from mostafa/experimental-pipelines
Browse files Browse the repository at this point in the history
Experimental pipelines
  • Loading branch information
thomaspatzke authored Jul 1, 2023
2 parents 50a948d + a035400 commit 85279fd
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 155 deletions.
195 changes: 114 additions & 81 deletions README.md

Large diffs are not rendered by default.

64 changes: 64 additions & 0 deletions sigma/pipelines/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from abc import abstractmethod
from typing import Optional, Callable

from sigma.processing.pipeline import ProcessingPipeline


class Pipeline:
"""
Base class for all pipelines. This class acts as a class decorator
to register existing pipelines, and also as a base class for all
the new pipelines. The reasoning behind this is discussed in:
https://github.com/SigmaHQ/pySigma/discussions/110#discussioncomment-6179682
"""

def __init__(
self,
func: Optional[Callable[[], ProcessingPipeline]] = None,
):
"""
Initialize the pipeline. If the function is set, then it is a class decorator.
Otherwise, it is an inherited class, so we return the class itself.
Keyword Arguments:
func (Optional[Callable[[], ProcessingPipeline]]): The function to be
decorated. If None, the class is inherited. Defaults to None.
"""
self.func = func

def __call__(self, *args, **kwargs):
"""
When the class is called, we call the function if set,
otherwise we return the class itself.
"""
if getattr(self, "apply") and not self.apply.__isabstractmethod__:
return self.apply(*args, **kwargs)
return self.func(*args, **kwargs) if self.func is not None else self

def __new__(cls, *args, **kwargs):
"""
Use the singleton pattern to ensure that only one instance of the class
is created. This is necessary to ensure that the pipelines are registered
only once if the class is inherited.
Args:
cls ([type]): The class itself.
*args: The arguments to be passed to the class constructor.
Keyword Arguments:
**kwargs: The keyword arguments to be passed to the class constructor.
Returns:
Pipeline: The class instance.
"""
if not hasattr(cls, "_instance"):
cls._instance = super(Pipeline, cls).__new__(cls)
return cls._instance

@abstractmethod
def apply(self, *args, **kwargs):
"""
If the class is inherited, then this method must be implemented to return
a ProcessingPipeline object. Otherwise, this method is not called.
"""
raise NotImplementedError("The apply method must be implemented.")
8 changes: 6 additions & 2 deletions sigma/pipelines/test/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import sys
from .pipeline import dummy_test_pipeline, another_test_pipeline
from .pipeline import dummy_test_pipeline, another_test_pipeline, YetAnotherTestPipeline

__all__ = [
"YetAnotherTestPipeline"
]

if "pytest" in sys.modules:
pipelines = {
"test": dummy_test_pipeline,
"another_test": another_test_pipeline,
}
}
16 changes: 15 additions & 1 deletion sigma/pipelines/test/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from sigma.processing.conditions import LogsourceCondition
from sigma.pipelines.base import Pipeline
from sigma.processing.pipeline import ProcessingItem, ProcessingPipeline
from sigma.processing.transformations import AddConditionTransformation, FieldMappingTransformation


@Pipeline
def dummy_test_pipeline():
return ProcessingPipeline(
name="Test pipeline",
Expand All @@ -24,3 +25,16 @@ def another_test_pipeline():
),
],
)

class YetAnotherTestPipeline(Pipeline):
def apply(self):
return ProcessingPipeline(
name="Yet Another Test pipeline",
allowed_backends={"another"},
items=[
ProcessingItem(
transformation=AddConditionTransformation(conditions={ "EventID": 1 }),
rule_conditions=[ LogsourceCondition(category="process_creation", product="windows") ],
),
],
)
Loading

0 comments on commit 85279fd

Please sign in to comment.