From 4260642e5400030403b6e65a9aa8bac015a665a3 Mon Sep 17 00:00:00 2001 From: nskobov <93276498+nskobov@users.noreply.github.com> Date: Mon, 4 Dec 2023 17:51:47 -0600 Subject: [PATCH] adds option to auto disable base (#417) * adds option to auto disable base * fix lint * formatting * log formatting * fmt --- panther_analysis_tool/analysis_utils.py | 23 +++++++++++++++++++++++ panther_analysis_tool/main.py | 12 ++++++++++++ 2 files changed, 35 insertions(+) diff --git a/panther_analysis_tool/analysis_utils.py b/panther_analysis_tool/analysis_utils.py index 351cc2bd..86e76226 100644 --- a/panther_analysis_tool/analysis_utils.py +++ b/panther_analysis_tool/analysis_utils.py @@ -185,6 +185,29 @@ def load_analysis_specs( yield result.spec_filename, result.relative_path, result.analysis_spec, result.error +def disable_all_base_detections(paths: List[str], ignore_files: List[str]) -> None: + analysis_specs = list(load_analysis_specs_ex(paths, ignore_files, roundtrip_yaml=True)) + base_ids_to_disable = set() + base_detection_key = "BaseDetection" + rule_id_key = "RuleID" + enabled_key = "Enabled" + for analysis_spec_res in analysis_specs: + spec: Dict[str, Any] = analysis_spec_res.analysis_spec + base_id = spec.get(base_detection_key, "") + if base_id == "": + continue + base_ids_to_disable.add(base_id) + for base_detection_id in base_ids_to_disable: + for analysis_spec_res in analysis_specs: + rule: Dict[str, Any] = analysis_spec_res.analysis_spec + if rule.get(rule_id_key, "") == base_detection_id: + logging.info( + "Setting %s=False for %s", enabled_key, analysis_spec_res.spec_filename + ) + rule[enabled_key] = False + analysis_spec_res.serialize_to_file() + + @dataclasses.dataclass class LoadAnalysisSpecsResult: """The result of loading analysis specifications from a file.""" diff --git a/panther_analysis_tool/main.py b/panther_analysis_tool/main.py index 1b05adea..a2db1eed 100644 --- a/panther_analysis_tool/main.py +++ b/panther_analysis_tool/main.py @@ -84,6 +84,7 @@ from panther_analysis_tool.analysis_utils import ( ClassifiedAnalysis, ClassifiedAnalysisContainer, + disable_all_base_detections, filter_analysis, get_simple_detections_as_python, load_analysis_specs, @@ -315,6 +316,10 @@ def upload_analysis(backend: BackendClient, args: argparse.Namespace) -> Tuple[i Returns: A tuple of return code and error if applicable. """ + if args.auto_disable_base: + zipargs = ZipArgs.from_args(args) + disable_all_base_detections([zipargs.path], zipargs.ignore_files) + use_async = (not args.no_async) and backend.supports_async_uploads() if args.batch and not use_async: if not args.skip_tests: @@ -1669,6 +1674,13 @@ def setup_parser() -> argparse.ArgumentParser: description=upload_help_text, formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) + upload_parser.add_argument( + "--auto-disable-base", + help="If uploading derived detections, set the corresponding base detection's Enabled status to false prior to upload", + default=False, + required=False, + action="store_true", + ) upload_parser.add_argument( "--max-retries", help="Retry to upload on a failure for a maximum number of times",