1
+ # -*- coding: utf-8 -*-
2
+
3
+ # Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
4
+ # or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
5
+ # https://securityonion.net/license; you may not use this file except in compliance with the
6
+ # Elastic License 2.0.
7
+
8
+
9
+ from time import gmtime , strftime
10
+ import requests ,json
11
+ from elastalert .alerts import Alerter
12
+
13
+ import urllib3
14
+ urllib3 .disable_warnings (urllib3 .exceptions .InsecureRequestWarning )
15
+
16
+ class SecurityOnionESAlerter (Alerter ):
17
+ """
18
+ Use matched data to create alerts in Elasticsearch.
19
+ """
20
+
21
+ required_options = set (['detection_title' , 'sigma_level' ])
22
+ optional_fields = ['sigma_category' , 'sigma_product' , 'sigma_service' ]
23
+
24
+ def alert (self , matches ):
25
+ for match in matches :
26
+ timestamp = strftime ("%Y-%m-%d" 'T' "%H:%M:%S" '.000Z' , gmtime ())
27
+ headers = {"Content-Type" : "application/json" }
28
+
29
+ creds = None
30
+ if 'es_username' in self .rule and 'es_password' in self .rule :
31
+ creds = (self .rule ['es_username' ], self .rule ['es_password' ])
32
+
33
+ # Start building the rule dict
34
+ rule_info = {
35
+ "name" : self .rule ['detection_title' ],
36
+ "uuid" : self .rule ['detection_public_id' ]
37
+ }
38
+
39
+ # Add optional fields if they are present in the rule
40
+ for field in self .optional_fields :
41
+ rule_key = field .split ('_' )[- 1 ] # Assumes field format "sigma_<key>"
42
+ if field in self .rule :
43
+ rule_info [rule_key ] = self .rule [field ]
44
+
45
+ # Construct the payload with the conditional rule_info
46
+ payload = {
47
+ "tags" : "alert" ,
48
+ "rule" : rule_info ,
49
+ "event" : {
50
+ "severity" : self .rule ['event.severity' ],
51
+ "module" : self .rule ['event.module' ],
52
+ "dataset" : self .rule ['event.dataset' ],
53
+ "severity_label" : self .rule ['sigma_level' ]
54
+ },
55
+ "sigma_level" : self .rule ['sigma_level' ],
56
+ "event_data" : match ,
57
+ "@timestamp" : timestamp
58
+ }
59
+ url = f"https://{ self .rule ['es_host' ]} :{ self .rule ['es_port' ]} /logs-playbook.alerts-so/_doc/"
60
+ requests .post (url , data = json .dumps (payload ), headers = headers , verify = False , auth = creds )
61
+
62
+ def get_info (self ):
63
+ return {'type' : 'SecurityOnionESAlerter' }
0 commit comments