diff --git a/detection_rules/cli_utils.py b/detection_rules/cli_utils.py index 96aa258c05c..eb19579fbb7 100644 --- a/detection_rules/cli_utils.py +++ b/detection_rules/cli_utils.py @@ -210,18 +210,11 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos # DEFAULT_PREBUILT_RULES_DIRS[0] is a required directory just as a suggestion suggested_path = Path(DEFAULT_PREBUILT_RULES_DIRS[0]) / contents['name'] path = Path(path or input(f'File path for rule [{suggested_path}]: ') or suggested_path).resolve() - # Inherit maturity from the rule already exists - maturity = "development" - if path.exists(): - rules = RuleCollection() - rules.load_file(path) - if rules: - maturity = rules.rules[0].contents.metadata.maturity - + # Inherit maturity and optionally local dates from the rule if it already exists meta = { - "creation_date": creation_date, - "updated_date": creation_date, - "maturity": maturity, + "creation_date": kwargs.get("creation_date") or creation_date, + "updated_date": kwargs.get("updated_date") or creation_date, + "maturity": "development" or kwargs.get("maturity"), } try: diff --git a/detection_rules/kbwrap.py b/detection_rules/kbwrap.py index 27bcd2e7930..500b445f0ec 100644 --- a/detection_rules/kbwrap.py +++ b/detection_rules/kbwrap.py @@ -24,7 +24,7 @@ from .main import root from .misc import add_params, client_error, kibana_options, get_kibana_client, nested_set from .rule import downgrade_contents_from_rule, TOMLRuleContents, TOMLRule -from .rule_loader import RuleCollection +from .rule_loader import RuleCollection, update_metadata_from_file from .utils import format_command_options, rulename_to_filename RULES_CONFIG = parse_rules_config() @@ -199,12 +199,14 @@ def _process_imported_items(imported_items_list, item_type_description, item_key @click.option("--export-exceptions", "-e", is_flag=True, help="Include exceptions in export") @click.option("--skip-errors", "-s", is_flag=True, help="Skip errors when exporting rules") @click.option("--strip-version", "-sv", is_flag=True, help="Strip the version fields from all rules") +@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule") +@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule") @click.pass_context def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_directory: Optional[Path], exceptions_directory: Optional[Path], default_author: str, rule_id: Optional[Iterable[str]] = None, export_action_connectors: bool = False, - export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False - ) -> List[TOMLRule]: + export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False, + local_creation_date: bool = False, local_updated_date: bool = False) -> List[TOMLRule]: """Export custom rules from Kibana.""" kibana = ctx.obj["kibana"] kibana_include_details = export_exceptions or export_action_connectors @@ -232,6 +234,8 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d return [] rules_results = results + action_connector_results = [] + exception_results = [] if kibana_include_details: # Assign counts to variables rules_count = results[-1]["exported_rules_count"] @@ -259,22 +263,23 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d rule_resource["author"] = rule_resource.get("author") or default_author or [rule_resource.get("created_by")] if isinstance(rule_resource["author"], str): rule_resource["author"] = [rule_resource["author"]] - # Inherit maturity from the rule already exists - maturity = "development" + # Inherit maturity and optionally local dates from the rule if it already exists + params = { + "rule": rule_resource, + "maturity": "development", + } threat = rule_resource.get("threat") first_tactic = threat[0].get("tactic").get("name") if threat else "" rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=first_tactic) - # check if directory / f"{rule_name}" exists - if (directory / f"{rule_name}").exists(): - rules = RuleCollection() - rules.load_file(directory / f"{rule_name}") - if rules: - maturity = rules.rules[0].contents.metadata.maturity - - contents = TOMLRuleContents.from_rule_resource( - rule_resource, maturity=maturity + + save_path = directory / f"{rule_name}" + params.update( + update_metadata_from_file( + save_path, {"creation_date": local_creation_date, "updated_date": local_updated_date} + ) ) - rule = TOMLRule(contents=contents, path=directory / f"{rule_name}") + contents = TOMLRuleContents.from_rule_resource(**params) + rule = TOMLRule(contents=contents, path=save_path) except Exception as e: if skip_errors: print(f'- skipping {rule_resource.get("name")} - {type(e).__name__}') diff --git a/detection_rules/main.py b/detection_rules/main.py index b07e4c77625..1e1dcfcac06 100644 --- a/detection_rules/main.py +++ b/detection_rules/main.py @@ -32,7 +32,7 @@ ) from .rule import TOMLRule, TOMLRuleContents, QueryRuleData from .rule_formatter import toml_write -from .rule_loader import RuleCollection +from .rule_loader import RuleCollection, update_metadata_from_file from .schemas import all_versions, definitions, get_incompatible_fields, get_schema_file from .utils import Ndjson, get_path, get_etc_path, clear_caches, load_dump, load_rule_contents, rulename_to_filename @@ -128,10 +128,13 @@ def generate_rules_index(ctx: click.Context, query, overwrite, save_files=True): @click.option("--skip-errors", "-ske", is_flag=True, help="Skip rule import errors") @click.option("--default-author", "-da", type=str, required=False, help="Default author for rules missing one") @click.option("--strip-none-values", "-snv", is_flag=True, help="Strip None values from the rule") +@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule") +@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule") def import_rules_into_repo(input_file: click.Path, required_only: bool, action_connector_import: bool, exceptions_import: bool, directory: click.Path, save_directory: click.Path, action_connectors_directory: click.Path, exceptions_directory: click.Path, - skip_errors: bool, default_author: str, strip_none_values: bool): + skip_errors: bool, default_author: str, strip_none_values: bool, local_creation_date: bool, + local_updated_date: bool): """Import rules from json, toml, or yaml files containing Kibana exported rule(s).""" errors = [] rule_files = glob.glob(os.path.join(directory, "**", "*.*"), recursive=True) if directory else [] @@ -179,6 +182,12 @@ def import_rules_into_repo(input_file: click.Path, required_only: bool, action_c if isinstance(contents["author"], str): contents["author"] = [contents["author"]] + contents.update( + update_metadata_from_file( + Path(rule_path), {"creation_date": local_creation_date, "updated_date": local_updated_date} + ) + ) + output = rule_prompt( rule_path, required_only=required_only, diff --git a/detection_rules/rule_loader.py b/detection_rules/rule_loader.py index b04d00b794a..a56253686ba 100644 --- a/detection_rules/rule_loader.py +++ b/detection_rules/rule_loader.py @@ -18,7 +18,8 @@ from . import utils from .config import parse_rules_config from .rule import ( - DeprecatedRule, DeprecatedRuleContents, DictRule, TOMLRule, TOMLRuleContents + DeprecatedRule, DeprecatedRuleContents, DictRule, TOMLRule, + TOMLRuleContents ) from .schemas import definitions from .utils import cached, get_path @@ -116,6 +117,20 @@ def load_locks_from_tag(remote: str, tag: str, version_lock: str = 'detection_ru return commit_hash, version, deprecated +def update_metadata_from_file(rule_path: Path, fields_to_update: dict) -> dict: + """Update metadata fields for a rule with local contents.""" + contents = {} + if not rule_path.exists(): + return contents + local_metadata = RuleCollection().load_file(rule_path).contents.metadata.to_dict() + if local_metadata: + contents["maturity"] = local_metadata.get("maturity", "development") + for field_name, should_update in fields_to_update.items(): + if should_update and field_name in local_metadata: + contents[field_name] = local_metadata[field_name] + return contents + + @dataclass class BaseCollection: """Base class for collections.""" diff --git a/pyproject.toml b/pyproject.toml index 1f8c928ef5e..03ac2c7c1eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.0.9" +version = "1.0.10" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12"