diff --git a/samples/hosts/stale_sensors.py b/samples/hosts/stale_sensors.py index be9dca774..a1113906f 100644 --- a/samples/hosts/stale_sensors.py +++ b/samples/hosts/stale_sensors.py @@ -21,8 +21,13 @@ - @morcef, jshcodes@CrowdStrike; 06.05.22 - More reasonable date calcs, Linting, Easier arg parsing Easier base_url handling, renamed grouping_tag to tag - jshcodes@Crowdstrike; 11.02.22 - Added CSV output options and cleaner date outputs. +- nmills@forbarr; 22.05.24 - Fixed deprecation warning on date function, + Added new arg to accept hostname pattern + Batch the call to hide_hosts to avoid API error """ import csv +import os +import re from argparse import ArgumentParser, RawTextHelpFormatter from datetime import datetime, timedelta, timezone from dateutil import parser as dparser @@ -34,7 +39,10 @@ "CrowdStrike FalconPy must be installed in order to use this application.\n" "Please execute `python3 -m pip install crowdstrike-falconpy` and try again." ) from no_falconpy - +# from dotenv import load_dotenv +# load_dotenv() +# clientId = os.getenv('clientid') +# clientSecret = os.getenv('secret') def parse_command_line() -> object: """Parse command-line arguments and return them back as an ArgumentParser object.""" @@ -46,13 +54,15 @@ def parse_command_line() -> object: '-k', '--client_id', help='CrowdStrike Falcon API key ID', - required=True + required=True, + # default=clientId ) parser.add_argument( '-s', '--client_secret', help='CrowdStrike Falcon API key secret', - required=True + required=True, + # default=clientSecret ) parser.add_argument( '-m', @@ -74,7 +84,8 @@ def parse_command_line() -> object: '-d', '--days', help='Number of days since a host was seen before it is considered stale', - required=False + required=False, + default=10 ) parser.add_argument( '-r', @@ -130,9 +141,15 @@ def parse_command_line() -> object: required=False, dest="osfilter" ) + parser.add_argument( + "-p", "--host-pattern", + help="filter hostnames by regex", + default=r".*", + required=False, + dest="hostfilter" + ) return parser.parse_args() - def connect_api(key: str, secret: str, base_url: str, child_cid: str = None) -> Hosts: """Connect to the API and return an instance of the Hosts Service Class.""" return Hosts(client_id=key, client_secret=secret, base_url=base_url, member_cid=child_cid) @@ -156,6 +173,10 @@ def get_hosts(date_filter: str, tag_filter: str, os_filter: str) -> list: if os_filter == "K8s": os_filter = "K8S" filter_string = f"{filter_string} + platform_name:'{os_filter}'" + x = falcon.query_devices_by_filter_scroll( + limit=5000, + filter=filter_string + )["body"]["resources"] return falcon.query_devices_by_filter_scroll( limit=5000, filter=filter_string @@ -164,7 +185,7 @@ def get_hosts(date_filter: str, tag_filter: str, os_filter: str) -> list: def calc_stale_date(num_days: int) -> str: """Calculate the 'stale' datetime based upon the number of days provided by the user.""" - today = datetime.utcnow() + today = datetime.now(timezone.utc) return str(today - timedelta(days=num_days)).replace(" ", "T") @@ -212,14 +233,17 @@ def hide_hosts(id_list: list) -> dict: # List to hold our identified hosts stale = [] # For each stale host identified -try: - for host in get_host_details(get_hosts(STALE_DATE, args.tag, args.osfilter)): - # Retrieve host detail - stale = parse_host_detail(host, stale) -except KeyError as api_error: - raise SystemExit( - "Unable to communicate with CrowdStrike API, check credentials and try again." - ) from api_error +if args.hostfilter == r".*": + pattern = args.hostfilter +else: + pattern = re.escape(args.hostfilter) + print(f"Pattern is: {pattern}") +for host in get_host_details(get_hosts(STALE_DATE, args.tag, args.osfilter)): + # Retrieve host detail + if 'hostname' in host: + if re.findall(pattern, host['hostname']): + print(f"{host['hostname']} matches expected pattern...") + stale = parse_host_detail(host, stale) # If we produced stale host results if stale: @@ -245,12 +269,15 @@ def hide_hosts(id_list: list) -> dict: else: # Remove the hosts host_list = [x[1] for x in stale] - remove_result = hide_hosts(host_list) - if remove_result["status_code"] == 202: - for deleted in remove_result["body"]["resources"]: - print(f"Removed host {deleted['id']}") - else: - for deleted in remove_result["body"]["errors"]: - print(f"[{deleted['code']}] {deleted['message']}") + batch_size = 50 + for i in range(0, len(host_list), batch_size): + batch = host_list[i:i + batch_size] + remove_result = hide_hosts(batch) + if remove_result["status_code"] == 202: + for deleted in remove_result["body"]["resources"]: + print(f"Removed host {deleted['id']}") + else: + for deleted in remove_result["body"]["errors"]: + print(f"[{deleted['code']}] {deleted['message']}") else: - print("No stale hosts identified for the range specified.") + print("No stale hosts identified for the range specified.") \ No newline at end of file