-
Notifications
You must be signed in to change notification settings - Fork 103
/
Copy path80_siem.conf
182 lines (164 loc) · 5.85 KB
/
80_siem.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#########################################
# From Dsiem plugins #
#########################################
filter {
if [@metadata][siem_data_type] == "normalizedEvent" {
uuid {
target => "event_id"
overwrite => true
}
}
}
output{
if [@metadata][siem_data_type] == "normalizedEvent" {
# to dsiem
http {
format=>"json"
http_method=>"post"
url=>"http://dsiem:8080/events"
}
# to elasticsearch
elasticsearch {
hosts => "elasticsearch:9200"
index => "siem_events-%{+YYYY.MM.dd}"
document_id => "%{[event_id]}"
action => "index"
template => "/etc/logstash/index-template.d/siem_events-template.json"
template_name => "siem_events"
template_overwrite => true
}
}
}
#########################################
# From Dsiem's Filebeat #
#########################################
filter {
if [siem_data_type] == "alarm_events" {
mutate {
add_field => {
"[@metadata][siem_data_type]" => "alarm_events"
}
}
prune {
whitelist_names => [ "@metadata", "@timestamp", "alarm_id", "event_id", "stage" ]
}
}
if [siem_data_type] == "alarms" {
date {
match => [ "created_time", "UNIX" ]
target => "timestamp"
}
date {
match => [ "update_time", "UNIX" ]
target => "updated_time"
}
mutate {
add_field => {
"[@metadata][alarm_id]" => "%{[alarm_id]}"
"[@metadata][siem_data_type]" => "alarms"
}
}
# set target_index to the actual index for an existing ID (perm_index).
# lookup is done against siem_alarms_id_lookup alias which is assigned to all new index
# by default. This alias can then be managed separately to cover, for example, only
# the last 3 indices.
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "siem_alarms_id_lookup"
query => "_id:%{[alarm_id]}"
fields => {
"perm_index" => "[@metadata][target_index]"
}
}
# if previous step failed or couldn't find a match in the case of new ID, then use today's date
if ![@metadata][target_index] {
mutate {
add_field => {
"[@metadata][target_index]" => "siem_alarms-%{+YYYY.MM.dd}"
}
}
}
# elasticsearch filter plugin only search within _source, so the following extra perm_index field is necessary
mutate {
add_field => {
"perm_index" => "%{[@metadata][target_index]}"
}
}
prune {
whitelist_names => [ "timestamp", "@metadata", "title", "status", "kingdom", "category",
"updated_time", "risk", "risk_class", "tag$", "src_ips", "dst_ips", "intel_hits", "vulnerabilities",
"networks", "rules", "custom_data", "^perm_index$" ]
}
# debugging only:
# mutate { add_field => { "alarm_id" => "%{[@metadata][alarm_id]}" }}
# ruby { code => 'logger.info("Dsiem alarm processing: ready to output ID ", "value" => event.get("[@metadata][alarm_id]"))' }
}
}
output {
if [@metadata][siem_data_type] == "alarm_events" {
elasticsearch {
hosts => "elasticsearch:9200"
index => "siem_alarm_events-%{+YYYY.MM.dd}"
template => "/etc/logstash/index-template.d/siem_alarm_events-template.json"
template_name => "siem_alarm_events"
template_overwrite => true
}
}
# This one uses update action and doc_as_upsert to allow partial updates
if [@metadata][siem_data_type] == "alarms" {
# debugging only:
# elasticsearch { hosts => "elasticsearch:9200" index => "siem_alarms_debug" }
elasticsearch {
hosts => "elasticsearch:9200"
index => "%{[@metadata][target_index]}"
document_id => "%{[@metadata][alarm_id]}"
template => "/etc/logstash/index-template.d/siem_alarms-template.json"
template_name => "siem_alarms"
template_overwrite => true
action => "update"
# use doc_as_upsert and script so that:
# - incoming doc is automatically indexed when document_id doesn't yet exist
# - for existing docs, we can selectively discard out-of-order updates and status/tag updates,
# without having to use external versioning
doc_as_upsert => true
script_lang => "painless"
script_type => "inline"
# lower risk value for an incoming update means it's out of order
# the same goes for updated_time, but should only be checked when incoming update
# doesn't have a higher risk
script => '
int incoming_risk = params.event.get("risk");
int existing_risk = ctx._source.risk;
if (incoming_risk < existing_risk) {
ctx.op = "none";
return
} else if (incoming_risk == existing_risk) {
ZonedDateTime old_tm = ZonedDateTime.parse(ctx._source.updated_time);
ZonedDateTime new_tm = ZonedDateTime.parse(params.event.get("updated_time"));
if (new_tm.isBefore(old_tm)) {
ctx.op = "none";
return
}
}
ctx._source.timestamp = params.event.get("timestamp");
ctx._source.updated_time = params.event.get("updated_time");
ctx._source.risk = incoming_risk;
ctx._source.risk_class = params.event.get("risk_class");
ctx._source.src_ips = params.event.get("src_ips");
ctx._source.dst_ips = params.event.get("dst_ips");
ctx._source.rules = params.event.get("rules");
ctx._source.networks = params.event.get("networks");
if (params.event.get("intel_hits") != null) {
ctx._source.intel_hits = params.event.get("intel_hits")
}
if (params.event.get("vulnerabilities") != null) {
ctx._source.vulnerabilities = params.event.get("vulnerabilities")
}
if (params.event.get("custom_data") != null) {
ctx._source.custom_data = params.event.get("custom_data")
}
'
retry_on_conflict => 5
}
}
}