Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions detection_rules/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,11 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos
# DEFAULT_PREBUILT_RULES_DIRS[0] is a required directory just as a suggestion
suggested_path = Path(DEFAULT_PREBUILT_RULES_DIRS[0]) / contents['name']
path = Path(path or input(f'File path for rule [{suggested_path}]: ') or suggested_path).resolve()
# Inherit maturity from the rule already exists
maturity = "development"
if path.exists():
rules = RuleCollection()
rules.load_file(path)
if rules:
maturity = rules.rules[0].contents.metadata.maturity

# Inherit maturity and optionally local dates from the rule if it already exists
meta = {
"creation_date": creation_date,
"updated_date": creation_date,
"maturity": maturity,
"creation_date": kwargs.get("creation_date") or creation_date,
"updated_date": kwargs.get("updated_date") or creation_date,
"maturity": "development" or kwargs.get("maturity"),
}

try:
Expand Down
35 changes: 20 additions & 15 deletions detection_rules/kbwrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from .main import root
from .misc import add_params, client_error, kibana_options, get_kibana_client, nested_set
from .rule import downgrade_contents_from_rule, TOMLRuleContents, TOMLRule
from .rule_loader import RuleCollection
from .rule_loader import RuleCollection, update_metadata_from_file
from .utils import format_command_options, rulename_to_filename

RULES_CONFIG = parse_rules_config()
Expand Down Expand Up @@ -199,12 +199,14 @@ def _process_imported_items(imported_items_list, item_type_description, item_key
@click.option("--export-exceptions", "-e", is_flag=True, help="Include exceptions in export")
@click.option("--skip-errors", "-s", is_flag=True, help="Skip errors when exporting rules")
@click.option("--strip-version", "-sv", is_flag=True, help="Strip the version fields from all rules")
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
@click.pass_context
def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_directory: Optional[Path],
exceptions_directory: Optional[Path], default_author: str,
rule_id: Optional[Iterable[str]] = None, export_action_connectors: bool = False,
export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False
) -> List[TOMLRule]:
export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False,
local_creation_date: bool = False, local_updated_date: bool = False) -> List[TOMLRule]:
"""Export custom rules from Kibana."""
kibana = ctx.obj["kibana"]
kibana_include_details = export_exceptions or export_action_connectors
Expand Down Expand Up @@ -232,6 +234,8 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d
return []

rules_results = results
action_connector_results = []
exception_results = []
if kibana_include_details:
# Assign counts to variables
rules_count = results[-1]["exported_rules_count"]
Expand Down Expand Up @@ -259,22 +263,23 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d
rule_resource["author"] = rule_resource.get("author") or default_author or [rule_resource.get("created_by")]
if isinstance(rule_resource["author"], str):
rule_resource["author"] = [rule_resource["author"]]
# Inherit maturity from the rule already exists
maturity = "development"
# Inherit maturity and optionally local dates from the rule if it already exists
params = {
"rule": rule_resource,
"maturity": "development",
}
threat = rule_resource.get("threat")
first_tactic = threat[0].get("tactic").get("name") if threat else ""
rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=first_tactic)
# check if directory / f"{rule_name}" exists
if (directory / f"{rule_name}").exists():
rules = RuleCollection()
rules.load_file(directory / f"{rule_name}")
if rules:
maturity = rules.rules[0].contents.metadata.maturity

contents = TOMLRuleContents.from_rule_resource(
rule_resource, maturity=maturity

save_path = directory / f"{rule_name}"
params.update(
update_metadata_from_file(
save_path, {"creation_date": local_creation_date, "updated_date": local_updated_date}
)
)
rule = TOMLRule(contents=contents, path=directory / f"{rule_name}")
contents = TOMLRuleContents.from_rule_resource(**params)
rule = TOMLRule(contents=contents, path=save_path)
except Exception as e:
if skip_errors:
print(f'- skipping {rule_resource.get("name")} - {type(e).__name__}')
Expand Down
13 changes: 11 additions & 2 deletions detection_rules/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
)
from .rule import TOMLRule, TOMLRuleContents, QueryRuleData
from .rule_formatter import toml_write
from .rule_loader import RuleCollection
from .rule_loader import RuleCollection, update_metadata_from_file
from .schemas import all_versions, definitions, get_incompatible_fields, get_schema_file
from .utils import Ndjson, get_path, get_etc_path, clear_caches, load_dump, load_rule_contents, rulename_to_filename

Expand Down Expand Up @@ -128,10 +128,13 @@ def generate_rules_index(ctx: click.Context, query, overwrite, save_files=True):
@click.option("--skip-errors", "-ske", is_flag=True, help="Skip rule import errors")
@click.option("--default-author", "-da", type=str, required=False, help="Default author for rules missing one")
@click.option("--strip-none-values", "-snv", is_flag=True, help="Strip None values from the rule")
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
def import_rules_into_repo(input_file: click.Path, required_only: bool, action_connector_import: bool,
exceptions_import: bool, directory: click.Path, save_directory: click.Path,
action_connectors_directory: click.Path, exceptions_directory: click.Path,
skip_errors: bool, default_author: str, strip_none_values: bool):
skip_errors: bool, default_author: str, strip_none_values: bool, local_creation_date: bool,
local_updated_date: bool):
"""Import rules from json, toml, or yaml files containing Kibana exported rule(s)."""
errors = []
rule_files = glob.glob(os.path.join(directory, "**", "*.*"), recursive=True) if directory else []
Expand Down Expand Up @@ -179,6 +182,12 @@ def import_rules_into_repo(input_file: click.Path, required_only: bool, action_c
if isinstance(contents["author"], str):
contents["author"] = [contents["author"]]

contents.update(
update_metadata_from_file(
Path(rule_path), {"creation_date": local_creation_date, "updated_date": local_updated_date}
)
)

output = rule_prompt(
rule_path,
required_only=required_only,
Expand Down
17 changes: 16 additions & 1 deletion detection_rules/rule_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from . import utils
from .config import parse_rules_config
from .rule import (
DeprecatedRule, DeprecatedRuleContents, DictRule, TOMLRule, TOMLRuleContents
DeprecatedRule, DeprecatedRuleContents, DictRule, TOMLRule,
TOMLRuleContents
)
from .schemas import definitions
from .utils import cached, get_path
Expand Down Expand Up @@ -116,6 +117,20 @@ def load_locks_from_tag(remote: str, tag: str, version_lock: str = 'detection_ru
return commit_hash, version, deprecated


def update_metadata_from_file(rule_path: Path, fields_to_update: dict) -> dict:
"""Update metadata fields for a rule with local contents."""
contents = {}
if not rule_path.exists():
return contents
local_metadata = RuleCollection().load_file(rule_path).contents.metadata.to_dict()
if local_metadata:
contents["maturity"] = local_metadata.get("maturity", "development")
for field_name, should_update in fields_to_update.items():
if should_update and field_name in local_metadata:
contents[field_name] = local_metadata[field_name]
return contents


@dataclass
class BaseCollection:
"""Base class for collections."""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.0.9"
version = "1.0.10"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
Loading