Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions detection_rules/kbwrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,26 @@ def _process_imported_items(imported_items_list, item_type_description, item_key
"Use same flag for import-rules to prevent warnings and disable its unit test.")
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
@click.option("--custom-rules-only", "-cro", is_flag=True, help="Only export custom rules")
@click.option(
"--export-query",
"-eq",
type=str,
required=False,
help=(
"Apply a query filter to exporting rules e.g. "
"\"alert.attributes.tags: \\\"test\\\"\" to filter for rules that have the tag \"test\""
)
)
@click.pass_context
def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_directory: Optional[Path],
exceptions_directory: Optional[Path], default_author: str,
rule_id: Optional[Iterable[str]] = None, rule_name: Optional[str] = None,
export_action_connectors: bool = False,
export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False,
no_tactic_filename: bool = False, local_creation_date: bool = False,
local_updated_date: bool = False) -> List[TOMLRule]:
local_updated_date: bool = False, custom_rules_only: bool = False,
export_query: Optional[str] = None) -> List[TOMLRule]:
"""Export custom rules from Kibana."""
kibana = ctx.obj["kibana"]
kibana_include_details = export_exceptions or export_action_connectors
Expand All @@ -227,8 +239,21 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d
if rule_name:
found = RuleResource.find(filter=f"alert.attributes.name:{rule_name}")
rule_id = [r["rule_id"] for r in found]
results = RuleResource.export_rules(list(rule_id), exclude_export_details=not kibana_include_details)
query = (
export_query if not custom_rules_only
else (
f"alert.attributes.params.ruleSource.type: \"internal\""
f"{f' and ({export_query})' if export_query else ''}"
)
)

results = (
RuleResource.bulk_export(rule_ids=list(rule_id), query=query)
if query
else RuleResource.export_rules(
list(rule_id), exclude_export_details=not kibana_include_details
)
)
# Handle Exceptions Directory Location
if results and exceptions_directory:
exceptions_directory.mkdir(parents=True, exist_ok=True)
Expand Down
16 changes: 5 additions & 11 deletions lib/kibana/kibana/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# 2.0.

import datetime
from typing import Any, List, Optional, Type
from typing import List, Optional, Type

import json

Expand Down Expand Up @@ -138,7 +138,7 @@ def bulk_action(
cls, action: definitions.RuleBulkActions, rule_ids: Optional[List[str]] = None, query: Optional[str] = None,
dry_run: Optional[bool] = False, edit_object: Optional[list[definitions.RuleBulkEditActionTypes]] = None,
include_exceptions: Optional[bool] = False, **kwargs
) -> (dict, List['RuleResource']):
) -> dict | List['RuleResource']:
"""Perform a bulk action on rules using the _bulk_action API."""
assert not (rule_ids and query), 'Cannot provide both rule_ids and query'

Expand All @@ -155,17 +155,11 @@ def bulk_action(
data['rule_ids'] = rule_ids
response = Kibana.current().post(cls.BASE_URI + "/_bulk_action", params=params, data=data, **kwargs)

# export returns ndjson, which requires manual parsing since response.json() fails
# export returns ndjson
if action == 'export':
response = [json.loads(r) for r in response.text.splitlines()]
result_ids = [r['rule_id'] for r in response if 'rule_id' in r]
else:
results = response['attributes']['results']
result_ids = [r['rule_id'] for r in results['updated']]
result_ids.extend([r['rule_id'] for r in results['created']])
response = [cls(r) for r in [json.loads(r) for r in response.text.splitlines()]]

rule_resources = cls.export_rules(result_ids)
return response, rule_resources
return response

@classmethod
def bulk_enable(
Expand Down
2 changes: 1 addition & 1 deletion lib/kibana/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection-rules-kibana"
version = "0.4.3"
version = "0.4.4"
description = "Kibana API utilities for Elastic Detection Rules"
license = {text = "Elastic License v2"}
keywords = ["Elastic", "Kibana", "Detection Rules", "Security", "Elasticsearch"]
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.2.12"
version = "1.2.13"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
Loading