-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
158 lines (136 loc) · 5.17 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""
Author: Moiz Lakdawala
"""
import sys
import json
from datetime import datetime, timedelta, timezone
import requests
import os
from configparser import ConfigParser
config = ConfigParser()
config.read(./key.cfg)
PROTECT_INSTANCE = config.get('PROTECT_INSTANCE', 'PROTECT_INSTANCE')
CLIENT_ID = config.get('CLIENT_ID', 'CLIENT_ID')
PASSWORD = onfig.get('Password', 'PASSWORD')
MIN_SEVERITY = "Low" # Valid values: "Informational", "Low", "Medium", "High"
MAX_SEVERITY = "High" # Valid values: "Informational", "Low", "Medium", "High"
JSON_OUTPUT_FILE = f"Jamf_Protect_Alerts_{datetime.utcnow().strftime('%Y-%m-%d')}.json"
from datetime import datetime, timedelta, timezone
def is_within_last_30_minutes(timestamp_str):
"""Checks if the timestamp is within the last 30 minutes from the current time."""
input_timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
current_timestamp = datetime.now(timezone.utc)
time_difference = current_timestamp - input_timestamp
return time_difference <= timedelta(minutes=30)
def get_access_token(protect_instance, client_id, password):
"""Gets a reusable access token to authenticate requests to the Jamf
Protect API"""
token_url = f"https://{protect_instance}.protect.jamfcloud.com/token"
payload = {
"client_id": client_id,
"password": password,
}
resp = requests.post(token_url, json=payload)
resp.raise_for_status()
resp_data = resp.json()
print(
f"Access token granted, valid for {int(resp_data['expires_in'] // 60)} minutes."
)
return resp_data["access_token"]
def make_api_call(protect_instance, access_token, query, variables=None):
"""Sends a GraphQL query to the Jamf Protect API, and returns the
response."""
if variables is None:
variables = {}
api_url = f"https://{protect_instance}.protect.jamfcloud.com/graphql"
payload = {"query": query, "variables": variables}
headers = {"Authorization": access_token}
resp = requests.post(
api_url,
json=payload,
headers=headers,
)
resp.raise_for_status()
return resp.json()
LIST_ALERTS_QUERY = """
query listAlerts(
$min_severity: SEVERITY
$max_severity: SEVERITY
$page_size: Int
$next: String
) {
listAlerts(
input: {
filter: {
severity: { greaterThanOrEqual: $min_severity }
and: { severity: { lessThanOrEqual: $max_severity } }
}
pageSize: $page_size
next: $next
}
) {
items {
json
severity
computer {
hostName
}
created
}
pageInfo {
next
}
}
}
"""
def __main__():
logfile = "/opt/wazuh_logging/jamf_pro/jamf_pro.log"
"""Executes the log fetching and writing process."""
if os.path.isfile(logfile):
os.remove(logfile)
if not set({MIN_SEVERITY, MAX_SEVERITY}).issubset(
{"Informational", "Low", "Medium", "High"}
):
print(
"ERROR: Unexpected value(s) for min/max severity. Expected 'Informational', 'Low', 'Medium', or 'High'."
)
sys.exit(1)
if not all([PROTECT_INSTANCE, CLIENT_ID, PASSWORD]):
print("ERROR: Variables PROTECT_INSTANCE, CLIENT_ID, and PASSWORD must be set.")
sys.exit(1)
# Get the access token
access_token = get_access_token(PROTECT_INSTANCE, CLIENT_ID, PASSWORD)
results = []
next_token = None
page_count = 1
# print("Retrieving paginated results:")
while True:
# print(f" Retrieving page {page_count} of results...")
vars = {
"min_severity": MIN_SEVERITY,
"max_severity": MAX_SEVERITY,
"page_size": 200,
"next": next_token,
}
resp = make_api_call(PROTECT_INSTANCE, access_token, LIST_ALERTS_QUERY, vars)
alerts = resp["data"]["listAlerts"]["items"]
with open(logfile, 'x') as logs_file:
for alert in alerts:
create_time = alert["created"]
result = is_within_last_30_minutes(create_time)
if result:
with open(logfile, "a") as write_file:
log = json.loads(alert["json"])
log['Log_type'] = 'Jamf_Pro'
json.dump(log, write_file)
write_file.write('\n')
else:
print("The given timestamp is not within the last 30 minutes.")
break # exit loop if alert is not within the last 30 minutes
next_token = resp["data"]["listAlerts"]["pageInfo"]["next"]
# results.extend(resp["data"]["listAlerts"]["items"])
if next_token is None:
break # exit loop if there is no more data
page_count += 1
if __name__ == "__main__":
__main__()