-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
Copy pathServiceNowEventCollector.py
366 lines (290 loc) · 13.9 KB
/
ServiceNowEventCollector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
import demistomock as demisto
from CommonServerPython import * # noqa # pylint: disable=unused-wildcard-import
import urllib3
# Disable insecure warnings
urllib3.disable_warnings()
VENDOR = "servicenow"
PRODUCT = "servicenow"
LOGS_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" # New format for processing events
DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" # ISO8601 format with UTC, default in XSIAM
AUDIT = "audit"
SYSLOG_TRANSACTIONS = "syslog transactions"
URL = {AUDIT: "table/sys_audit", SYSLOG_TRANSACTIONS: "table/syslog_transaction"}
LAST_FETCH_TIME = {AUDIT: "last_fetch_time", SYSLOG_TRANSACTIONS: "last_fetch_time_syslog"}
PREVIOUS_RUN_IDS = {AUDIT: "previous_run_ids", SYSLOG_TRANSACTIONS: "previous_run_ids_syslog"}
""" CLIENT CLASS """
class Client:
def __init__(self, use_oauth, credentials, client_id, client_secret, url, verify, proxy, api_server_url, fetch_limit_audit,
fetch_limit_syslog):
self.sn_client = ServiceNowClient(
credentials=credentials,
use_oauth=use_oauth,
client_id=client_id,
client_secret=client_secret,
url=url,
verify=verify,
headers={},
proxy=proxy,
)
self.fetch_limit_audit = fetch_limit_audit
self.fetch_limit_syslog = fetch_limit_syslog
self.api_server_url = api_server_url
def search_events(self, from_time: str, log_type: str, limit: Optional[int] = None, offset: int = 0):
"""Make a request to the ServiceNow REST API to retrieve audit and syslog transactions logs"""
if limit is None:
limit = self.fetch_limit_audit if log_type == AUDIT else self.fetch_limit_syslog
params = {
"sysparm_limit": limit,
"sysparm_offset": offset,
"sysparm_query": f"sys_created_on>{from_time}",
}
res = self.sn_client.http_request(
method="GET",
full_url=f"{self.api_server_url}{URL[log_type]}",
url_suffix=None,
params=remove_empty_elements(params),
)
return res.get("result")
""" HELPER METHODS """
def handle_log_types(event_types_to_fetch: list) -> list:
"""
Args:
event_types_to_fetch (list of str): A list of event type titles to be converted to log types.
Raises:
InvalidEventTypeError: If any of the event type titles are not found in the titles_to_types mapping.
Returns:
list: A list of log types corresponding to the provided event type titles.
The list contains log types that have a matching title in the titles_to_types mapping.
If an event type title is not found, an exception is raised.
"""
log_types = []
VALID_EVENT_TITLES = ['Audit', 'Syslog Transactions']
titles_to_types = {'Audit': AUDIT, 'Syslog Transactions': SYSLOG_TRANSACTIONS}
for type_title in event_types_to_fetch:
if log_type := titles_to_types.get(type_title):
log_types.append(log_type)
else:
raise DemistoException(
f"'{type_title}' is not valid event type, please select from the following list: {VALID_EVENT_TITLES}")
return log_types
def update_last_run(last_run: dict[str, Any], log_type: str, last_event_time: str, previous_run_ids: list) -> dict:
"""
Update the last run details for a specific log type.
Args:
last_run (dict[str, Any]): Dictionary containing the last run details for different log types.
log_type (str): Type of log to update.
last_event_time (int): Timestamp of the last event fetched.
previous_run_ids (list): List of IDs from the previous fetch to track.
Returns:
Dict[str, Any]: Updated dictionary containing the last run details.
"""
last_run[LAST_FETCH_TIME[log_type]] = last_event_time
last_run[PREVIOUS_RUN_IDS[log_type]] = previous_run_ids
return last_run
def initialize_from_date(last_run: dict[str, Any], log_type: str) -> str:
"""
Initialize the start timestamp for fetching logs based on provided parameters.
Args:
last_run (dict[str, Any]): Dictionary containing the last fetch timestamps for different log types.
log_type (str): Type of log for which to initialize the start timestamp.
Returns:
str: The start timestamp for fetching logs.
"""
start_timestamp = last_run.get(LAST_FETCH_TIME[log_type])
if not start_timestamp:
current_time = datetime.utcnow()
first_fetch_time = current_time - timedelta(minutes=1)
first_fetch_str = first_fetch_time.strftime(LOGS_DATE_FORMAT)
from_date = first_fetch_str
else:
from_date = start_timestamp
return from_date
def add_time_field(events: List[Dict[str, Any]], log_type) -> List[Dict[str, Any]]:
"""
Add a '_time' field to each event and set the source log type.
Args:
events (List[Dict[str, Any]]): List of events to add the '_time' field to.
log_type (str): Type of log to set as the 'source_log_type' for each event.
Returns:
List[Dict[str, Any]]: The list of events with '_time' and 'source_log_type' fields added.
"""
for event in events:
event["_time"] = datetime.strptime(event["sys_created_on"], LOGS_DATE_FORMAT).strftime(DATE_FORMAT)
event["source_log_type"] = log_type
return events
def get_limit(args: dict, client: Client):
"""
Retrieve the limit for the number of logs to fetch, with defaults based on client settings.
Args:
args (dict): Dictionary of arguments potentially containing a "limit" key.
client (Client): Client instance with attributes for default fetch limits.
Returns:
int: The limit for the number of logs to fetch.
"""
limit = arg_to_number(args.get("limit")) or client.fetch_limit_audit or 1000
return limit
def process_and_filter_events(events: list, previous_run_ids: set, from_date: str, log_type: str):
"""
Remove duplicates from events and create a set of last fetched IDs with the same timestamp.
Args:
events (list): List of events fetched from the API.
previous_run_ids (set): Set of event IDs matching the timestamp in the 'from_date' parameter.
from_date (str): Starting date for fetching events, based on the last run's timestamp.
log_type (str): Type of log to set as the 'source_log_type' for each event.
Returns:
tuple: A list of unique events and a set of the last fetched event IDs with the same timestamp.
"""
unique_events = []
from_date_datetime = datetime.strptime(from_date, LOGS_DATE_FORMAT)
duplicates_list = []
for event in events:
create_time = datetime.strptime(event.get("sys_created_on"), LOGS_DATE_FORMAT)
event["_time"] = create_time.strftime(DATE_FORMAT)
event["source_log_type"] = log_type
if event.get("sys_id") in previous_run_ids:
duplicates_list.append(event.get("sys_id"))
continue
if create_time > from_date_datetime:
previous_run_ids = set()
from_date_datetime = create_time
previous_run_ids.add(event.get("sys_id"))
unique_events.append(event)
demisto.debug(f"Filtered out the following event IDs due to duplication: {duplicates_list}.")
demisto.debug(f"Updated last_run with previous_run_ids: {previous_run_ids}.")
return unique_events, previous_run_ids
""" COMMAND METHODS """
def get_events_command(client: Client, args: dict, log_type: str, last_run: dict) -> tuple[list, CommandResults]:
"""
Args:
limit: The maximum number of logs to return.
to_date: date to fetch events from.
from_date: date to fetch events to.
client: Client object.
last_run: Dictionary containing the last fetch timestamps for different log types.
Returns:
Sign on logs from Workday.
"""
types_to_titles = {AUDIT: 'Audit', SYSLOG_TRANSACTIONS: 'Syslog Transactions'}
all_events = []
if arg_from := args.get("from_date"):
from_date = arg_from
else:
from_date = initialize_from_date(last_run, log_type)
offset = args.get("offset", 0)
limit = get_limit(args, client)
logs = client.search_events(from_time=from_date, log_type=log_type, limit=limit, offset=offset)
add_time_field(logs, log_type)
demisto.debug(f"Got a total of {len(logs)} {log_type} events created after {from_date}")
hr = tableToMarkdown(name=f'{types_to_titles[log_type]} Events', t=logs, removeNull=True,
headerTransform=lambda x: string_to_table_header(camel_case_to_underscore(x)))
all_events.extend(logs)
return all_events, CommandResults(readable_output=hr)
def fetch_events_command(client: Client, last_run: dict, log_types: list):
"""
Fetches audit logs from Workday.
Args:
client: Client object.
max_fetch: max logs to fetch set by customer.
last_run: last run object.
Returns:
Audit logs from Workday.
"""
collected_events = []
for log_type in log_types:
previous_run_ids = set(last_run.get(PREVIOUS_RUN_IDS[log_type], set()))
from_date = initialize_from_date(last_run, log_type)
demisto.debug(f"Getting {log_type} Logs {from_date=}.")
new_events = client.search_events(from_date, log_type)
if new_events:
demisto.debug(f"Got {len(new_events)} {log_type} events. Begin processing.")
events, previous_run_ids = process_and_filter_events(
events=new_events, previous_run_ids=previous_run_ids, from_date=from_date, log_type=log_type
)
demisto.debug(f"Done processing {len(events)} {log_type} events.")
last_fetch_time = events[-1].get("sys_created_on") if events else from_date
last_run = update_last_run(last_run, log_type, last_fetch_time, list(previous_run_ids))
collected_events.extend(events)
return collected_events, last_run
def module_of_testing(client: Client, log_types: list) -> str: # pragma: no cover
"""
Test API connectivity and authentication.
Returns "ok" if the connection to the service is successful and the integration functions correctly.
Raises exceptions if the test fails.
Args:
client (Client): Client instance used to test connectivity.
log_types (list): List of log types to test fetching events.
Returns:
str: "ok" if the test passed; any exception raised will indicate failure.
"""
_, _ = fetch_events_command(client, {}, log_types=log_types)
return "ok"
""" MAIN FUNCTION """
def main() -> None: # pragma: no cover
"""main function, parses params and runs command functions"""
command = demisto.command()
args = demisto.args()
params = demisto.params()
server_url = params.get("url")
verify_certificate = params.get("insecure", False)
proxy = params.get("proxy", False)
use_oauth = params.get("use_oauth", False)
client_id = params.get("client_credentials", {}).get("identifier", "")
client_secret = params.get("client_credentials", {}).get("password", "")
credentials = params.get("credentials", {})
user_name = credentials.get("identifier")
password = credentials.get("password")
max_fetch_audit = arg_to_number(params.get("max_fetch")) or 10000
max_fetch_syslog = arg_to_number(params.get("max_fetch_syslog_transactions")) or 10000
event_types_to_fetch = argToList(params.get('event_types_to_fetch', ['Audit']))
log_types = handle_log_types(event_types_to_fetch)
version = params.get("api_version")
if version:
api = f"/api/now/{version}/"
else:
api = "/api/now/"
api_server_url = f"{server_url}{api}"
demisto.debug(f"Command being called is {command}")
try:
client = Client(
use_oauth=use_oauth,
credentials=credentials,
client_id=client_id,
client_secret=client_secret,
url=server_url,
verify=verify_certificate,
proxy=proxy,
api_server_url=api_server_url,
fetch_limit_audit=max_fetch_audit,
fetch_limit_syslog=max_fetch_syslog
)
last_run = demisto.getLastRun()
if client.sn_client.use_oauth and not get_integration_context().get("refresh_token", None):
client.sn_client.login(username=user_name, password=password)
if command == "test-module":
return_results(module_of_testing(client, log_types))
elif command == "service-now-get-audit-logs" or command == "service-now-get-syslog-transactions":
log_type = AUDIT if command == "service-now-get-audit-logs" else SYSLOG_TRANSACTIONS
audit_logs, results = get_events_command(client=client, args=args, log_type=log_type, last_run=last_run)
return_results(results)
if argToBoolean(args.get("should_push_events", True)):
send_events_to_xsiam(audit_logs, vendor=VENDOR, product=PRODUCT)
elif command == "fetch-events":
demisto.debug(f"Starting new fetch with last_run as {last_run}")
events, next_run = fetch_events_command(client=client, last_run=last_run, log_types=log_types)
demisto.debug("Done fetching events, sending to XSIAM.")
if events:
send_events_to_xsiam(events, vendor=VENDOR, product=PRODUCT)
if next_run:
# saves next_run for the time fetch-events is invoked
demisto.debug(f"Setting new last_run to {next_run}")
demisto.setLastRun(next_run)
else:
raise NotImplementedError(f"command {command} is not implemented.")
# Log exceptions and return errors
except Exception as e:
demisto.info(f"here {str(e)}")
return_error(f"Failed to execute {demisto.command()} command.\nError:\n{str(e)}")
from ServiceNowApiModule import * # noqa: E402
""" ENTRY POINT """
if __name__ in ("__main__", "__builtin__", "builtins"):
main()