diff --git a/cs_misp_import/actors.py b/cs_misp_import/actors.py index d052957..d4706e2 100644 --- a/cs_misp_import/actors.py +++ b/cs_misp_import/actors.py @@ -22,7 +22,6 @@ import datetime import logging import os -import time import concurrent.futures try: from pymisp import MISPObject, MISPEvent, ExpandedPyMISP @@ -150,6 +149,20 @@ def create_internal_reference() -> MISPObject: return inter + @staticmethod + def int_ref_handler(evt, kc_name, kc_detail, ref_list, slg, act_name, int_ref, no_slug: bool = False): + misp_object = MISPObject("internal-reference") + misp_object.add_attribute("type", "Adversary detail", disable_correlation=True) + misp_object.add_attribute("identifier", kc_name.title(), disable_correlation=True) + if not isinstance(kc_detail, list): + kc_detail.replace("\t", "").replace(" ", "") + sum_id = misp_object.add_attribute("comment", kc_detail, disable_correlation=True) + ref_list.append(evt.add_object(misp_object)) + if not no_slug: + evt.add_attribute_tag(f"CrowdStrike:adversary:{kc_name.lower().replace(' ', '-')}: {act_name}", sum_id.uuid) + evt.add_attribute_tag(f"CrowdStrike:adversary:{slg}: {kc_name.upper()}", sum_id.uuid) + int_ref.add_reference(misp_object.uuid, "Adversary detail") + def create_event_from_actor(self, actor, act_details) -> MISPEvent(): """Create a MISP event for a valid Actor.""" @@ -187,76 +200,44 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): event.add_attribute('link', details.get('url'), disable_correlation=True) to_reference: list[MISPObject] = [] - adversary_detail = False + internal = None + # Adversary description if details.get('description'): internal = self.create_internal_reference() internal.add_attribute("identifier", "Description", disable_correlation=True) desc_id = internal.add_attribute('comment', details.get('description'), disable_correlation=True) - adversary_detail = True + # Adversary type act_type = details.get("actor_type", None) if act_type: - if not adversary_detail: + if not internal: internal = self.create_internal_reference() - adversary_detail = True - act_type_object = MISPObject("internal-reference") - act_type_object.add_attribute("type", "Adversary detail", disable_correlation=True) - #at_id = act_type_object.add_attribute("identifier", "Actor type", disable_correlation=True) - act_type_object.add_attribute("identifier", "Actor type", disable_correlation=True) - #sum_id = act_type_object.add_attribute('comment', act_type.title()) - act_type_object.add_attribute('comment', act_type.title(), disable_correlation=True) + + self.int_ref_handler(event, "Actor Type", act_type.title(), to_reference, slug, actor_name, internal, False) event.add_tag(f"CrowdStrike:adversary:type: {act_type.upper()}") - to_reference.append(event.add_object(act_type_object)) - #event.add_attribute_tag(f"CrowdStrike:adversary:actor-type: {act_type.upper()}", sum_id.uuid) - #event.add_attribute_tag(f"CrowdStrike:adversary:actor-type: {actor_name}", at_id.uuid) - #event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: ACTOR TYPE", sum_id.uuid) - #event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:actor-type: {act_type.upper()}", at_id.uuid) - #event.add_tag(f"CrowdStrike:adversary:{slug}:actor-type: {act_type.upper()}") - internal.add_reference(act_type_object.uuid, "Adversary detail") + # Adversary motives motives = details.get("motivations", None) if motives: - motive = "\n".join([m.get("value") for m in motives]) - if not adversary_detail: + mlist = [m.get("value") for m in motives] + motive_list_string = "\n".join(mlist) + if not internal: internal = self.create_internal_reference() - adversary_detail = True - motive_object = MISPObject("internal-reference") - motive_object.add_attribute("type", "Adversary detail", disable_correlation=True) - #mot_id = motive_object.add_attribute("identifier", "Motivation", disable_correlation=True) - motive_object.add_attribute("identifier", "Motivation", disable_correlation=True) - #sum_id = motive_object.add_attribute('comment', motive) - motive_object.add_attribute('comment', motive, disable_correlation=True) - to_reference.append(event.add_object(motive_object)) - #event.add_attribute_tag(f"CrowdStrike:adversary:motivation: {actor_name}", mot_id.uuid) - #event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: MOTIVATION", sum_id.uuid) - for m in motives: - if m.get('value'): - #event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:motivation: {m.get('value').upper()}", sum_id.uuid) - #event.add_tag(f"CrowdStrike:adversary:{slug}:motivation: {m.get('value').upper()}") - event.add_tag(f"CrowdStrike:adversary:motivation: {m.get('value').upper()}") - internal.add_reference(motive_object.uuid, "Adversary detail") + self.int_ref_handler(event, "Motivation", motive_list_string, to_reference, slug, actor_name, internal, False) + for mname in mlist: + event.add_tag(f"CrowdStrike:adversary:motivation: {mname.upper()}") + + # Adversary capability cap = details.get("capability", None) if cap: cap_val = cap.get("value") if cap_val: - if not adversary_detail: + if not internal: internal = self.create_internal_reference() - adversary_detail = True - cap_object = MISPObject("internal-reference") - cap_object.add_attribute("type", "Adversary detail", disable_correlation=True) - #cp_id = cap_object.add_attribute("identifier", "Capability", disable_correlation=True) - cap_object.add_attribute("identifier", "Capability", disable_correlation=True) - #sum_id = cap_object.add_attribute('comment', cap_val) - cap_object.add_attribute('comment', cap_val, disable_correlation=True) + + self.int_ref_handler(event, "Capability", cap_val, to_reference, slug, actor_name, internal, False) event.add_tag(f"CrowdStrike:adversary:capability: {cap_val.upper()}") - to_reference.append(event.add_object(cap_object)) - #event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:capability: {cap_val.upper()}", sum_id.uuid) - #event.add_tag(f"CrowdStrike:adversary:{slug}:capability: {cap_val.upper()}") - #event.add_attribute_tag(f"CrowdStrike:adversary:capability: {actor_name}", cp_id.uuid) - #event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: CAPABILITY", sum_id.uuid) - #event.add_attribute_tag(f"CrowdStrike:adversary:capability: {cap_val.upper()}", cp_id.uuid) - internal.add_reference(cap_object.uuid, "Adversary detail") # Set adversary event threat level based upon adversary capability if "BELOW" in cap_val.upper() or "LOW" in cap_val.upper(): event.threat_level_id = 3 @@ -265,8 +246,8 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): else: event.threat_level_id = 2 + # Kill chain elements kill_chain_detail = details.get("kill_chain") - if kill_chain_detail: objectives = kill_chain_detail.get("actions_and_objectives", None) candc = kill_chain_detail.get("command_and_control", None) @@ -276,37 +257,19 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): reconnaissance = kill_chain_detail.get("reconnaissance", None) weaponization = kill_chain_detail.get("weaponization", None) - if not adversary_detail: + if not internal: internal = self.create_internal_reference() - adversary_detail = True + # Kill chain - Objectives if objectives: - objective_object = MISPObject("internal-reference") - objective_object.add_attribute("type", "Adversary detail", disable_correlation=True) - objective_object.add_attribute("identifier", "Objectives", disable_correlation=True) - sum_id = objective_object.add_attribute("comment", objectives.replace("\t", "").replace(" ", ""), disable_correlation=True) - to_reference.append(event.add_object(objective_object)) - event.add_attribute_tag(f"CrowdStrike:adversary:objectives: {actor_name}", sum_id.uuid) - event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: OBJECTIVES", sum_id.uuid) - internal.add_reference(objective_object.uuid, "Adversary detail") + self.int_ref_handler(event, "objectives", objectives, to_reference, slug, actor_name, internal) + # Kill chain - Command and Control if candc: - candc_object = MISPObject("internal-reference") - candc_object.add_attribute("type", "Adversary detail", disable_correlation=True) - candc_object.add_attribute("identifier", "Command and Control", disable_correlation=True) - sum_id = candc_object.add_attribute("comment", candc.replace("\t", "").replace(" ", ""), disable_correlation=True) - to_reference.append(event.add_object(candc_object)) - event.add_attribute_tag(f"CrowdStrike:adversary:command-and-control: {actor_name}", sum_id.uuid) - event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: COMMAND AND CONTROL", sum_id.uuid) - internal.add_reference(candc_object.uuid, "Adversary detail") + self.int_ref_handler(event, "command and control", candc, to_reference, slug, actor_name, internal) + # Kill chain - Delivery if delivery: - delivery_object = MISPObject("internal-reference") - delivery_object.add_attribute("type", "Adversary detail", disable_correlation=True) - delivery_object.add_attribute("identifier", "Delivery", disable_correlation=True) - sum_id = delivery_object.add_attribute("comment", delivery.replace("\t", "").replace(" ", ""), disable_correlation=True) - to_reference.append(event.add_object(delivery_object)) - event.add_attribute_tag(f"CrowdStrike:adversary:delivery: {actor_name}", sum_id.uuid) - event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: DELIVERY", sum_id.uuid) - internal.add_reference(delivery_object.uuid, "Adversary detail") + self.int_ref_handler(event, "delivery", delivery, to_reference, slug, actor_name, internal) + # Kill chain - Exploitation if exploitation: exploitation_object = MISPObject("internal-reference") if exploitation.replace("\t", "".replace(" ", "")) not in ["Unknown", "N/A"]: @@ -322,79 +285,53 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): for exploit in [a.strip() for a in exptt.split(",")]: if len(exploit.split(" ")) <= 4: event.add_attribute_tag(f"CrowdStrike:adversary:exploitation: {exploit.upper()}", ex_id.uuid) - #event.add_attribute_tag(f"CrowdStrike:adversary:{slug}:exploitation: {exploit}", ex_id.uuid) - - internal.add_reference(exploitation_object.uuid, "Adversary detail") - + # Kill chain - Installation if installation: - installation_object = MISPObject("internal-reference") - installation_object.add_attribute("type", "Adversary detail", disable_correlation=True) - installation_object.add_attribute("identifier", "Installation", disable_correlation=True) - sum_id = installation_object.add_attribute("comment", installation.replace("\t", "").replace(" ", ""), disable_correlation=True) - to_reference.append(event.add_object(installation_object)) - event.add_attribute_tag(f"CrowdStrike:adversary:installation: {actor_name}", sum_id.uuid) - event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: INSTALLATION", sum_id.uuid) - internal.add_reference(installation_object.uuid, "Adversary detail") + self.int_ref_handler(event, "installation", installation, to_reference, slug, actor_name, internal) + # Kill chain - Reconnaissance if reconnaissance: - reconnaissance_object = MISPObject("internal-reference") - reconnaissance_object.add_attribute("type", "Adversary detail", disable_correlation=True) - reconnaissance_object.add_attribute("identifier", "Reconnaissance", disable_correlation=True) - sum_id = reconnaissance_object.add_attribute("comment", reconnaissance.replace("\t", "").replace(" ", ""), disable_correlation=True) - to_reference.append(event.add_object(reconnaissance_object)) - event.add_attribute_tag(f"CrowdStrike:adversary:reconnaissance: {actor_name}", sum_id.uuid) - event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: RECONNAISSANCE", sum_id.uuid) - internal.add_reference(reconnaissance_object.uuid, "Adversary detail") + self.int_ref_handler(event, "reconnaissance", reconnaissance, to_reference, slug, actor_name, internal) + # Kill chain - Weaponization if weaponization: - weaponization_object = MISPObject("internal-reference") - weaponization_object.add_attribute("type", "Adversary detail", disable_correlation=True) - weaponization_object.add_attribute("identifier", "Weaponization", disable_correlation=True) - sum_id = weaponization_object.add_attribute("comment", weaponization.replace("\t", "").replace(" ", ""), disable_correlation=True) - to_reference.append(event.add_object(weaponization_object)) - event.add_attribute_tag(f"CrowdStrike:adversary:weaponization: {actor_name}", sum_id.uuid) - event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: WEAPONIZATION", sum_id.uuid) - internal.add_reference(weaponization_object.uuid, "Adversary detail") - for ref in to_reference: - internal.add_reference(ref.uuid, "Adversary detail") - for web in to_reference: - if web.uuid != ref.uuid: - web.add_reference(ref.uuid, "Adversary detail") - if adversary_detail: + self.int_ref_handler(event, "weaponization", weaponization, to_reference, slug, actor_name, internal) + + for ref in to_reference: + internal.add_reference(ref.uuid, "Adversary detail") + for web in to_reference: + if web.uuid != ref.uuid: + web.add_reference(ref.uuid, "Adversary detail") + + if internal: event.add_object(internal) + # Add the description tags if details.get('description'): event.add_attribute_tag(f"CrowdStrike:adversary:description: {actor_name}", desc_id.uuid) event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: DESCRIPTION", desc_id.uuid) had_timestamp = False timestamp_object = MISPObject('timestamp') - - if actor.get('first_activity_date'): - timestamp_object.add_attribute('first-seen', - datetime.datetime.utcfromtimestamp(actor.get('first_activity_date')).isoformat() - ) - had_timestamp = True - - actor_att["first_seen"] = actor.get("first_activity_date") - - else: - self.log.warning("Adversary %s missing field first_activity_date.", actor.get('id')) - - if actor.get('last_activity_date'): - timestamp_object.add_attribute('last-seen', - datetime.datetime.utcfromtimestamp(actor.get('last_activity_date')).isoformat() - ) - had_timestamp = True - - actor_att["last_seen"] = actor.get("last_activity_date") - - - else: - self.log.warning("Adversary %s missing field last_activity_date.", actor.get('id')) - + tsf = None + tsl = None + actor_att["first_seen"] = actor.get("first_activity_date", 0) + if not actor_att["first_seen"]: + self.log.warning("Adversary %s missing field first_activity_date.", actor_name) + actor_att["last_seen"] = actor.get("last_activity_date", 0) + if not actor_att["last_seen"]: + self.log.warning("Adversary %s missing field last_activity_date.", actor_name) if actor_att.get("last_seen", 0) < actor_att.get("first_seen", 0): # Seems counter-intuitive actor_att["first_seen"] = actor.get("last_activity_date") actor_att["last_seen"] = actor.get("first_activity_date") + if actor_att["first_seen"] == 0: + actor_att["first_seen"] = actor_att["last_seen"] + if actor_att["first_seen"]: + tsf = timestamp_object.add_attribute('first-seen', datetime.datetime.utcfromtimestamp(actor_att["first_seen"]).isoformat()) + had_timestamp = True + + if actor_att["last_seen"]: + tsl = timestamp_object.add_attribute('last-seen', datetime.datetime.utcfromtimestamp(actor_att["last_seen"]).isoformat()) + had_timestamp = True ta = event.add_attribute(**actor_att, disable_correlation=True) actor_split = actor_name.split(" ") @@ -402,7 +339,12 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): event.add_attribute_tag(f"CrowdStrike:adversary:branch: {actor_branch}", ta.uuid) if had_timestamp: event.add_object(timestamp_object) - + if tsf: + event.add_attribute_tag(f"CrowdStrike:adversary:first-seen: {actor_name}", tsf.uuid) + event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: FIRST SEEN", tsf.uuid) + if tsl: + event.add_attribute_tag(f"CrowdStrike:adversary:last-seen: {actor_name}", tsl.uuid) + event.add_attribute_tag(f"CrowdStrike:adversary:{slug}: LAST SEEN", tsl.uuid) if actor.get('known_as') or actor.get("origins"): if actor.get("known_as"): known_as_object = MISPObject('organization') @@ -424,13 +366,10 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): victim = None + # Adversary victim location if actor.get("target_countries"): region_list = [c.get('value') for c in actor.get('target_countries', [])] for region in region_list: - #for country in actor.get('target_countries', []): - - #region = country.get('value') - #if region: if not victim: victim = MISPObject("victim") vic = victim.add_attribute('regions', region, disable_correlation=True) @@ -438,11 +377,10 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): vic.add_tag(f"CrowdStrike:adversary:{slug}:target:location: {region.upper()}") #vic.add_tag(f"CrowdStrike:adversary:{slug}:target: LOCATION") + # Adversary victim industry if actor.get("target_industries"): sector_list = [s.get('value') for s in actor.get('target_industries', [])] for sector in sector_list: - #sector = industry.get('value', None) - #if sector: if not victim: victim = MISPObject("victim") vic = victim.add_attribute('sectors', sector, disable_correlation=True) @@ -450,6 +388,7 @@ def create_event_from_actor(self, actor, act_details) -> MISPEvent(): #vic.add_tag(f"CrowdStrike:adversary:{slug}:target: SECTOR") vic.add_tag(f"CrowdStrike:target:sector: {sector.upper()}") event.add_object(victim) + # TYPE Taxonomic tag, all events if confirm_boolean_param(self.settings["TAGGING"].get("taxonomic_TYPE", False)): event.add_tag('type:CYBINT')