-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathisoar_alert_collector.py
339 lines (280 loc) · 13.6 KB
/
isoar_alert_collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# IRIS-SOAR
# Created by: Martin Offermann
# This module is the collector script that handles the main logic of the IRIS-SOAR project.
#
# The main logic is as follows:
#
# - Loop through every installed integration for getting new alert alerts
# - Loop through each of the alerts and check if any playbook is able to handle it
# - - If a playbook is able to handle the alert, it will be executed
# - - If all playbooks are executed, the next alert will be checked
# (Playbooks decide if a alert is a false positive or not and what action should be taken. A playbook can and should make use of the libraries and integrations provided by IRIS-SOAR.)
# - If no playbook is able to handle the alert, it will be logged and the next alert will be checked
import traceback
import json
from dfir_iris_client.session import ClientSession
from dfir_iris_client.alert import Alert
import lib.logging_helper as logging_helper
import lib.class_helper as class_helper # TODO: Implement class_helper.py
from lib.generic_helper import dict_get, del_none_from_dict, make_json_serializable
import lib.config_helper as config_helper
def check_module_exists(module_name, playbook=False):
"""Checks if a module exists.
Args:
module_name (str): The name of the module
Returns:
bool: True if the module exists, False if not
"""
try:
if not playbook:
__import__("integrations." + module_name)
else:
__import__("playbooks." + module_name)
return True
except ModuleNotFoundError:
return False
except ImportError:
return False
def check_module_has_function(module_name, function_name, mlog):
"""Checks if a module has a function.
Args:
module_name (str): The name of the module
function_name (str): The name of the function
Returns:
bool: True if the module has the function, False if not
"""
try:
module = __import__("integrations." + module_name)
integration = getattr(module, module_name)
getattr(integration, function_name)
return True
except AttributeError as e:
mlog.debug("AttributeError: " + str(e))
return False
except ModuleNotFoundError:
mlog.debug("ModuleNotFoundError: " + module_name + " does not exist.")
return False
def main(config, fromDaemon=False, debug=False):
"""Main function of the collector script.
Args:
config (dict): The config dictionary
fromDaemon (bool): If the script was called from the daemon
Returns:
None
"""
# Get the logger
mlog = logging_helper.Log("isoar_collector")
if debug:
mlog.set_level("DEBUG")
mlog.debug("Debug mode enabled.")
# Get every installed integration from config
integrations = config["integrations"] # TODO: Implement this in config_helper.py
mlog.info("Started IRIS-SOAR collector script")
mlog.info("Checking for new alerts...")
AlertList = []
alertFileHistory = []
for integration in integrations:
module_name = integration
integration = integrations[integration] # we want the whole dict not just the name to work with
# Skif IRIS itself:
if module_name == "dfir-iris":
continue
# Check if the module is enabled
if not integration["enabled"]:
mlog.warning("The module " + module_name + " is disabled. Skipping.")
continue
if module_name == "dfir-iris" and integration["alert_provider"]["enabled"] == False:
mlog.warning("The module " + module_name + " has disabled the alert provider. Skipping.")
continue
# Check if the module exists
if not check_module_exists(module_name):
mlog.error("The module " + module_name + " does not exist. Skipping.")
continue
# Check if module provides getting new alerts
if not check_module_has_function(module_name, "irsoar_provide_new_alerts", mlog):
mlog.debug(
"The module " + module_name + " does not provide the function irsoar_provide_new_alerts. Skipping Integration."
)
continue
# Make the actual call to the integration
try:
mlog.info("Calling module " + module_name)
module_import = __import__("integrations." + module_name)
module_import = getattr(module_import, module_name)
integration_config = config["integrations"][module_name]
new_alerts = module_import.irsoar_provide_new_alerts(integration_config)
except Exception as e:
mlog.warning(
"The module "
+ module_name
+ " had an unhandled error when trying to provide new alerts. Error: "
+ traceback.format_exc()
+ ". Skipping Integration."
)
continue
# Check if the returned type is valid
if type(new_alerts) is not list:
mlog.warning("The module " + module_name + " provided invalid alert(s). Skipping Integration.")
continue
# Check if the module provided any alerts
if not new_alerts or len(new_alerts) == 0:
mlog.info("The module " + module_name + " did not provide any alerts.")
continue
else:
mlog.info("The module " + module_name + " provided " + str(len(new_alerts)) + " new alerts.")
for alert in new_alerts:
if not isinstance(alert, class_helper.Alert):
mlog.warning("The module " + module_name + " provided an invalid alert. Skipping.")
else:
mlog.info("Adding new alert " + alert.name + " (" + str(alert.uuid) + ") to the alert array.")
AlertList.append(alert)
# Loop through each alert
for alert in AlertList:
try:
alert_title = alert.name
alert_id = alert.uuid
alertHandled = False
mlog.info("Pushing alert " + alert_title + " (" + str(alert_id) + ") to IRIS as alert.")
# Initiate a session with our API key and host. Session stays the same during all the script run.
session = ClientSession(
apikey=config["integrations"]["dfir-iris"]["api_key"],
host=config["integrations"]["dfir-iris"]["url"],
ssl_verify=False,
)
alert_context_dict = {}
# Try to expand fill context dict fields:
try:
alert: class_helper.Alert = alert
file_dict = alert.file.__dict__() if alert.file else {}
alert_context_dict = del_none_from_dict(file_dict)
# Add the device
device_dict = alert.device.__dict__() if alert.device else {}
alert_context_dict |= del_none_from_dict(device_dict)
# Add the flow
flow_dict = alert.flow.__dict__() if alert.flow else {}
alert_context_dict |= del_none_from_dict(flow_dict)
# Add the log
log_dict = alert.log.__dict__() if alert.log else {}
alert_context_dict |= del_none_from_dict(log_dict)
# Add the process
process_dict = alert.process.__dict__() if alert.process else {}
alert_context_dict |= del_none_from_dict(process_dict)
# Add the threat_intel
threat_intel_dict = alert.threat_intel.__dict__() if alert.threat_intel else {}
alert_context_dict |= del_none_from_dict(threat_intel_dict)
# Add the location
location_dict = alert.location.__dict__() if alert.location else {}
alert_context_dict |= del_none_from_dict(location_dict)
# Add the user
user_dict = alert.user.__dict__() if alert.user else {}
alert_context_dict |= del_none_from_dict(user_dict)
# Add the registry
registry_dict = alert.registry.__dict__() if alert.registry else {}
alert_context_dict |= del_none_from_dict(registry_dict)
# Add the http
http_dict = alert.flow.http.__dict__() if alert.flow and alert.flow.http else {}
alert_context_dict |= del_none_from_dict(http_dict)
# Add the dns
dns_dict = alert.flow.dns_query.__dict__() if alert.flow and alert.flow.dns_query else {}
alert_context_dict |= del_none_from_dict(dns_dict)
# Add 'highlighted fields'
alert_context_dict["highlighted_fields"] = alert.highlighted_fields if alert.highlighted_fields else None
except Exception as e:
mlog.warning("format_results() - Error while trying to format alert_context: " + str(e))
# Add the IOCs
iocs = []
if alert.indicators["ip"]:
for ip in alert.indicators["ip"]:
iocs.append({"ioc_type_id": 79, "ioc_value": str(ip), "ioc_tlp_id": 1})
if alert.indicators["domain"]:
for domain in alert.indicators["domain"]:
iocs.append({"ioc_type_id": 20, "ioc_value": domain, "ioc_tlp_id": 1})
if alert.indicators["url"]:
for url in alert.indicators["url"]:
iocs.append({"ioc_type_id": 141, "ioc_value": url, "ioc_tlp_id": 1})
if alert.indicators["hash"]:
for hash in alert.indicators["hash"]:
iocs.append({"ioc_type_id": 90, "ioc_value": hash, "ioc_tlp_id": 1})
if alert.indicators["email"]:
for email in alert.indicators["email"]:
iocs.append({"ioc_type_id": 22, "ioc_value": email, "ioc_tlp_id": 1})
if alert.indicators["countries"]:
for country in alert.indicators["countries"]:
iocs.append({"ioc_type_id": 96, "ioc_value": country, "ioc_tlp_id": 1})
if alert.indicators["registry"]:
for registry in alert.indicators["registry"]:
iocs.append({"ioc_type_id": 109, "ioc_value": registry, "ioc_tlp_id": 1})
if alert.indicators["other"]:
for other in alert.indicators["other"]:
iocs.append({"ioc_type_id": 96, "ioc_value": other, "ioc_tlp_id": 1})
# Sanitize: Search empty ioc values and remove them
for ioc in iocs:
if ioc["ioc_value"] == "":
iocs.remove(ioc)
alert_severity = 2 # TODO: Implement severity calculation
# Craft asset_id:
asset_id = 3
if alert.device:
if alert.device.type == "host":
if alert.device.os_family == "windows":
asset_id = 9
elif alert.device.os_family == "linux":
asset_id = 4
elif alert.device.os_family == "macos":
asset_id = 6
elif alert.device.os_family == "ios":
asset_id = 8
elif alert.device.os_family == "android":
asset_id = 7
else:
if alert.device.os_family == "windows":
asset_id = 10
elif alert.device.os_family == "linux":
asset_id = 3
# Craft the alert data
alert_data = {
"alert_title": alert_title,
"alert_description": alert.description,
"alert_source": alert.vendor_id.upper(),
"alert_source_ref": str(alert.uuid),
"alert_source_link": alert.url,
"alert_source_content": alert.raw,
"alert_severity_id": alert_severity,
"alert_status_id": 2, # new
"alert_context": alert_context_dict,
"alert_source_event_time": str(alert.timestamp),
"alert_note": "This alert was collected by IRIS-SOAR.",
"alert_tags": "IRIS-SOAR,Security",
"alert_iocs": iocs,
"alert_assets": [
{
"asset_name": alert.device.name if alert.device and alert.device.name else "Unknown",
"asset_type_id": asset_id,
"asset_description": alert.device.description if alert.device and alert.device.description else None,
"asset_ip": str(alert.device.local_ip) if alert.device and alert.device.local_ip else None,
"asset_tags": alert.device.tags if alert.device and alert.device.tags else None,
}
],
"alert_customer_id": 1,
"alert_classification_id": 1,
}
# DEBUG TODO: Remove
alert_data = make_json_serializable(alert_data)
# Initialize the case instance with the session
alert = Alert(session=session)
response = alert.add_alert(alert_data)
mlog.debug("Response: " + str(response))
# Handle errors:
if not response.is_success():
mlog.error("Could not add alert: " + response)
continue
else:
mlog.info("Successfully added alert.")
except Exception as e:
mlog.error("Error while trying to add alert: " + traceback.format_exc())
continue
# Check if the alert was handled correctly
mlog.info("Finished collector script.")
if __name__ == "__main__":
main(config_helper.Config().cfg)
pass